h2/proto/streams/
streams.rs

1use super::recv::RecvHeaderBlockError;
2use super::store::{self, Entry, Resolve, Store};
3use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
4use crate::codec::{Codec, SendError, UserError};
5use crate::ext::Protocol;
6use crate::frame::{self, Frame, Reason};
7use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
8use crate::{client, proto, server};
9
10use bytes::{Buf, Bytes};
11use http::{HeaderMap, Request, Response};
12use std::task::{Context, Poll, Waker};
13use tokio::io::AsyncWrite;
14
15use std::sync::{Arc, Mutex};
16use std::{fmt, io};
17
18#[derive(Debug)]
19pub(crate) struct Streams<B, P>
20where
21    P: Peer,
22{
23    /// Holds most of the connection and stream related state for processing
24    /// HTTP/2 frames associated with streams.
25    inner: Arc<Mutex<Inner>>,
26
27    /// This is the queue of frames to be written to the wire. This is split out
28    /// to avoid requiring a `B` generic on all public API types even if `B` is
29    /// not technically required.
30    ///
31    /// Currently, splitting this out requires a second `Arc` + `Mutex`.
32    /// However, it should be possible to avoid this duplication with a little
33    /// bit of unsafe code. This optimization has been postponed until it has
34    /// been shown to be necessary.
35    send_buffer: Arc<SendBuffer<B>>,
36
37    _p: ::std::marker::PhantomData<P>,
38}
39
40// Like `Streams` but with a `peer::Dyn` field instead of a static `P: Peer` type parameter.
41// Ensures that the methods only get one instantiation, instead of two (client and server)
42#[derive(Debug)]
43pub(crate) struct DynStreams<'a, B> {
44    inner: &'a Mutex<Inner>,
45
46    send_buffer: &'a SendBuffer<B>,
47
48    peer: peer::Dyn,
49}
50
51/// Reference to the stream state
52#[derive(Debug)]
53pub(crate) struct StreamRef<B> {
54    opaque: OpaqueStreamRef,
55    send_buffer: Arc<SendBuffer<B>>,
56}
57
58/// Reference to the stream state that hides the send data chunk generic
59pub(crate) struct OpaqueStreamRef {
60    inner: Arc<Mutex<Inner>>,
61    key: store::Key,
62}
63
64/// Fields needed to manage state related to managing the set of streams. This
65/// is mostly split out to make ownership happy.
66///
67/// TODO: better name
68#[derive(Debug)]
69struct Inner {
70    /// Tracks send & recv stream concurrency.
71    counts: Counts,
72
73    /// Connection level state and performs actions on streams
74    actions: Actions,
75
76    /// Stores stream state
77    store: Store,
78
79    /// The number of stream refs to this shared state.
80    refs: usize,
81}
82
83#[derive(Debug)]
84struct Actions {
85    /// Manages state transitions initiated by receiving frames
86    recv: Recv,
87
88    /// Manages state transitions initiated by sending frames
89    send: Send,
90
91    /// Task that calls `poll_complete`.
92    task: Option<Waker>,
93
94    /// If the connection errors, a copy is kept for any StreamRefs.
95    conn_error: Option<proto::Error>,
96}
97
98/// Contains the buffer of frames to be written to the wire.
99#[derive(Debug)]
100struct SendBuffer<B> {
101    inner: Mutex<Buffer<Frame<B>>>,
102}
103
104// ===== impl Streams =====
105
106impl<B, P> Streams<B, P>
107where
108    B: Buf,
109    P: Peer,
110{
111    pub fn new(config: Config) -> Self {
112        let peer = P::r#dyn();
113
114        Streams {
115            inner: Inner::new(peer, config),
116            send_buffer: Arc::new(SendBuffer::new()),
117            _p: ::std::marker::PhantomData,
118        }
119    }
120
121    pub fn set_target_connection_window_size(&mut self, size: WindowSize) -> Result<(), Reason> {
122        let mut me = self.inner.lock().unwrap();
123        let me = &mut *me;
124
125        me.actions
126            .recv
127            .set_target_connection_window(size, &mut me.actions.task)
128    }
129
130    pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
131        let mut me = self.inner.lock().unwrap();
132        let me = &mut *me;
133        me.actions.recv.next_incoming(&mut me.store).map(|key| {
134            let stream = &mut me.store.resolve(key);
135            tracing::trace!(
136                "next_incoming; id={:?}, state={:?}",
137                stream.id,
138                stream.state
139            );
140            // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
141            // the lock, so it can't.
142            me.refs += 1;
143
144            // Pending-accepted remotely-reset streams are counted.
145            if stream.state.is_remote_reset() {
146                me.counts.dec_num_remote_reset_streams();
147            }
148
149            StreamRef {
150                opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
151                send_buffer: self.send_buffer.clone(),
152            }
153        })
154    }
155
156    pub fn send_pending_refusal<T>(
157        &mut self,
158        cx: &mut Context,
159        dst: &mut Codec<T, Prioritized<B>>,
160    ) -> Poll<io::Result<()>>
161    where
162        T: AsyncWrite + Unpin,
163    {
164        let mut me = self.inner.lock().unwrap();
165        let me = &mut *me;
166        me.actions.recv.send_pending_refusal(cx, dst)
167    }
168
169    pub fn clear_expired_reset_streams(&mut self) {
170        let mut me = self.inner.lock().unwrap();
171        let me = &mut *me;
172        me.actions
173            .recv
174            .clear_expired_reset_streams(&mut me.store, &mut me.counts);
175    }
176
177    pub fn poll_complete<T>(
178        &mut self,
179        cx: &mut Context,
180        dst: &mut Codec<T, Prioritized<B>>,
181    ) -> Poll<io::Result<()>>
182    where
183        T: AsyncWrite + Unpin,
184    {
185        let mut me = self.inner.lock().unwrap();
186        me.poll_complete(&self.send_buffer, cx, dst)
187    }
188
189    pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
190        let mut me = self.inner.lock().unwrap();
191        let me = &mut *me;
192
193        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
194        let send_buffer = &mut *send_buffer;
195
196        me.counts.apply_remote_settings(frame);
197
198        me.actions.send.apply_remote_settings(
199            frame,
200            send_buffer,
201            &mut me.store,
202            &mut me.counts,
203            &mut me.actions.task,
204        )
205    }
206
207    pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
208        let mut me = self.inner.lock().unwrap();
209        let me = &mut *me;
210
211        me.actions.recv.apply_local_settings(frame, &mut me.store)
212    }
213
214    pub fn send_request(
215        &mut self,
216        mut request: Request<()>,
217        end_of_stream: bool,
218        pending: Option<&OpaqueStreamRef>,
219    ) -> Result<(StreamRef<B>, bool), SendError> {
220        use super::stream::ContentLength;
221        use http::Method;
222
223        let protocol = request.extensions_mut().remove::<Protocol>();
224
225        // Clear before taking lock, incase extensions contain a StreamRef.
226        request.extensions_mut().clear();
227
228        // TODO: There is a hazard with assigning a stream ID before the
229        // prioritize layer. If prioritization reorders new streams, this
230        // implicitly closes the earlier stream IDs.
231        //
232        // See: hyperium/h2#11
233        let mut me = self.inner.lock().unwrap();
234        let me = &mut *me;
235
236        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
237        let send_buffer = &mut *send_buffer;
238
239        me.actions.ensure_no_conn_error()?;
240        me.actions.send.ensure_next_stream_id()?;
241
242        // The `pending` argument is provided by the `Client`, and holds
243        // a store `Key` of a `Stream` that may have been not been opened
244        // yet.
245        //
246        // If that stream is still pending, the Client isn't allowed to
247        // queue up another pending stream. They should use `poll_ready`.
248        if let Some(stream) = pending {
249            if me.store.resolve(stream.key).is_pending_open {
250                return Err(UserError::Rejected.into());
251            }
252        }
253
254        if me.counts.peer().is_server() {
255            // Servers cannot open streams. PushPromise must first be reserved.
256            return Err(UserError::UnexpectedFrameType.into());
257        }
258
259        let stream_id = me.actions.send.open()?;
260
261        let mut stream = Stream::new(
262            stream_id,
263            me.actions.send.init_window_sz(),
264            me.actions.recv.init_window_sz(),
265        );
266
267        if *request.method() == Method::HEAD {
268            stream.content_length = ContentLength::Head;
269        }
270
271        // Convert the message
272        let headers =
273            client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
274
275        let mut stream = me.store.insert(stream.id, stream);
276
277        let sent = me.actions.send.send_headers(
278            headers,
279            send_buffer,
280            &mut stream,
281            &mut me.counts,
282            &mut me.actions.task,
283        );
284
285        // send_headers can return a UserError, if it does,
286        // we should forget about this stream.
287        if let Err(err) = sent {
288            stream.unlink();
289            stream.remove();
290            return Err(err.into());
291        }
292
293        // Given that the stream has been initialized, it should not be in the
294        // closed state.
295        debug_assert!(!stream.state.is_closed());
296
297        // TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
298        // the lock, so it can't.
299        me.refs += 1;
300
301        let is_full = me.counts.next_send_stream_will_reach_capacity();
302        Ok((
303            StreamRef {
304                opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream),
305                send_buffer: self.send_buffer.clone(),
306            },
307            is_full,
308        ))
309    }
310
311    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
312        self.inner
313            .lock()
314            .unwrap()
315            .actions
316            .send
317            .is_extended_connect_protocol_enabled()
318    }
319}
320
321impl<B> DynStreams<'_, B> {
322    pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> {
323        let mut me = self.inner.lock().unwrap();
324
325        me.recv_headers(self.peer, self.send_buffer, frame)
326    }
327
328    pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), Error> {
329        let mut me = self.inner.lock().unwrap();
330        me.recv_data(self.peer, self.send_buffer, frame)
331    }
332
333    pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), Error> {
334        let mut me = self.inner.lock().unwrap();
335
336        me.recv_reset(self.send_buffer, frame)
337    }
338
339    /// Notify all streams that a connection-level error happened.
340    pub fn handle_error(&mut self, err: proto::Error) -> StreamId {
341        let mut me = self.inner.lock().unwrap();
342        me.handle_error(self.send_buffer, err)
343    }
344
345    pub fn recv_go_away(&mut self, frame: &frame::GoAway) -> Result<(), Error> {
346        let mut me = self.inner.lock().unwrap();
347        me.recv_go_away(self.send_buffer, frame)
348    }
349
350    pub fn last_processed_id(&self) -> StreamId {
351        self.inner.lock().unwrap().actions.recv.last_processed_id()
352    }
353
354    pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), Error> {
355        let mut me = self.inner.lock().unwrap();
356        me.recv_window_update(self.send_buffer, frame)
357    }
358
359    pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), Error> {
360        let mut me = self.inner.lock().unwrap();
361        me.recv_push_promise(self.send_buffer, frame)
362    }
363
364    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
365        let mut me = self.inner.lock().map_err(|_| ())?;
366        me.recv_eof(self.send_buffer, clear_pending_accept)
367    }
368
369    pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
370        let mut me = self.inner.lock().unwrap();
371        me.send_reset(self.send_buffer, id, reason)
372    }
373
374    pub fn send_go_away(&mut self, last_processed_id: StreamId) {
375        let mut me = self.inner.lock().unwrap();
376        me.actions.recv.go_away(last_processed_id);
377    }
378}
379
380impl Inner {
381    fn new(peer: peer::Dyn, config: Config) -> Arc<Mutex<Self>> {
382        Arc::new(Mutex::new(Inner {
383            counts: Counts::new(peer, &config),
384            actions: Actions {
385                recv: Recv::new(peer, &config),
386                send: Send::new(&config),
387                task: None,
388                conn_error: None,
389            },
390            store: Store::new(),
391            refs: 1,
392        }))
393    }
394
395    fn recv_headers<B>(
396        &mut self,
397        peer: peer::Dyn,
398        send_buffer: &SendBuffer<B>,
399        frame: frame::Headers,
400    ) -> Result<(), Error> {
401        let id = frame.stream_id();
402
403        // The GOAWAY process has begun. All streams with a greater ID than
404        // specified as part of GOAWAY should be ignored.
405        if id > self.actions.recv.max_stream_id() {
406            tracing::trace!(
407                "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS",
408                id,
409                self.actions.recv.max_stream_id()
410            );
411            return Ok(());
412        }
413
414        let key = match self.store.find_entry(id) {
415            Entry::Occupied(e) => e.key(),
416            Entry::Vacant(e) => {
417                // Client: it's possible to send a request, and then send
418                // a RST_STREAM while the response HEADERS were in transit.
419                //
420                // Server: we can't reset a stream before having received
421                // the request headers, so don't allow.
422                if !peer.is_server() {
423                    // This may be response headers for a stream we've already
424                    // forgotten about...
425                    if self.actions.may_have_forgotten_stream(peer, id) {
426                        tracing::debug!(
427                            "recv_headers for old stream={:?}, sending STREAM_CLOSED",
428                            id,
429                        );
430                        return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
431                    }
432                }
433
434                match self
435                    .actions
436                    .recv
437                    .open(id, Open::Headers, &mut self.counts)?
438                {
439                    Some(stream_id) => {
440                        let stream = Stream::new(
441                            stream_id,
442                            self.actions.send.init_window_sz(),
443                            self.actions.recv.init_window_sz(),
444                        );
445
446                        e.insert(stream)
447                    }
448                    None => return Ok(()),
449                }
450            }
451        };
452
453        let stream = self.store.resolve(key);
454
455        if stream.state.is_local_error() {
456            // Locally reset streams must ignore frames "for some time".
457            // This is because the remote may have sent trailers before
458            // receiving the RST_STREAM frame.
459            tracing::trace!("recv_headers; ignoring trailers on {:?}", stream.id);
460            return Ok(());
461        }
462
463        let actions = &mut self.actions;
464        let mut send_buffer = send_buffer.inner.lock().unwrap();
465        let send_buffer = &mut *send_buffer;
466
467        self.counts.transition(stream, |counts, stream| {
468            tracing::trace!(
469                "recv_headers; stream={:?}; state={:?}",
470                stream.id,
471                stream.state
472            );
473
474            let res = if stream.state.is_recv_headers() {
475                match actions.recv.recv_headers(frame, stream, counts) {
476                    Ok(()) => Ok(()),
477                    Err(RecvHeaderBlockError::Oversize(resp)) => {
478                        if let Some(resp) = resp {
479                            let sent = actions.send.send_headers(
480                                resp, send_buffer, stream, counts, &mut actions.task);
481                            debug_assert!(sent.is_ok(), "oversize response should not fail");
482
483                            actions.send.schedule_implicit_reset(
484                                stream,
485                                Reason::REFUSED_STREAM,
486                                counts,
487                                &mut actions.task);
488
489                            actions.recv.enqueue_reset_expiration(stream, counts);
490
491                            Ok(())
492                        } else {
493                            Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM))
494                        }
495                    },
496                    Err(RecvHeaderBlockError::State(err)) => Err(err),
497                }
498            } else {
499                if !frame.is_end_stream() {
500                    // Receiving trailers that don't set EOS is a "malformed"
501                    // message. Malformed messages are a stream error.
502                    proto_err!(stream: "recv_headers: trailers frame was not EOS; stream={:?}", stream.id);
503                    return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
504                }
505
506                actions.recv.recv_trailers(frame, stream)
507            };
508
509            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
510        })
511    }
512
513    fn recv_data<B>(
514        &mut self,
515        peer: peer::Dyn,
516        send_buffer: &SendBuffer<B>,
517        frame: frame::Data,
518    ) -> Result<(), Error> {
519        let id = frame.stream_id();
520
521        let stream = match self.store.find_mut(&id) {
522            Some(stream) => stream,
523            None => {
524                // The GOAWAY process has begun. All streams with a greater ID
525                // than specified as part of GOAWAY should be ignored.
526                if id > self.actions.recv.max_stream_id() {
527                    tracing::trace!(
528                        "id ({:?}) > max_stream_id ({:?}), ignoring DATA",
529                        id,
530                        self.actions.recv.max_stream_id()
531                    );
532                    return Ok(());
533                }
534
535                if self.actions.may_have_forgotten_stream(peer, id) {
536                    tracing::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,);
537
538                    let sz = frame.payload().len();
539                    // This should have been enforced at the codec::FramedRead layer, so
540                    // this is just a sanity check.
541                    assert!(sz <= super::MAX_WINDOW_SIZE as usize);
542                    let sz = sz as WindowSize;
543
544                    self.actions.recv.ignore_data(sz)?;
545                    return Err(Error::library_reset(id, Reason::STREAM_CLOSED));
546                }
547
548                proto_err!(conn: "recv_data: stream not found; id={:?}", id);
549                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
550            }
551        };
552
553        let actions = &mut self.actions;
554        let mut send_buffer = send_buffer.inner.lock().unwrap();
555        let send_buffer = &mut *send_buffer;
556
557        self.counts.transition(stream, |counts, stream| {
558            let sz = frame.payload().len();
559            let res = actions.recv.recv_data(frame, stream);
560
561            // Any stream error after receiving a DATA frame means
562            // we won't give the data to the user, and so they can't
563            // release the capacity. We do it automatically.
564            if let Err(Error::Reset(..)) = res {
565                actions
566                    .recv
567                    .release_connection_capacity(sz as WindowSize, &mut None);
568            }
569            actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
570        })
571    }
572
573    fn recv_reset<B>(
574        &mut self,
575        send_buffer: &SendBuffer<B>,
576        frame: frame::Reset,
577    ) -> Result<(), Error> {
578        let id = frame.stream_id();
579
580        if id.is_zero() {
581            proto_err!(conn: "recv_reset: invalid stream ID 0");
582            return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
583        }
584
585        // The GOAWAY process has begun. All streams with a greater ID than
586        // specified as part of GOAWAY should be ignored.
587        if id > self.actions.recv.max_stream_id() {
588            tracing::trace!(
589                "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM",
590                id,
591                self.actions.recv.max_stream_id()
592            );
593            return Ok(());
594        }
595
596        let stream = match self.store.find_mut(&id) {
597            Some(stream) => stream,
598            None => {
599                // TODO: Are there other error cases?
600                self.actions
601                    .ensure_not_idle(self.counts.peer(), id)
602                    .map_err(Error::library_go_away)?;
603
604                return Ok(());
605            }
606        };
607
608        let mut send_buffer = send_buffer.inner.lock().unwrap();
609        let send_buffer = &mut *send_buffer;
610
611        let actions = &mut self.actions;
612
613        self.counts.transition(stream, |counts, stream| {
614            actions.recv.recv_reset(frame, stream, counts)?;
615            actions.send.handle_error(send_buffer, stream, counts);
616            assert!(stream.state.is_closed());
617            Ok(())
618        })
619    }
620
621    fn recv_window_update<B>(
622        &mut self,
623        send_buffer: &SendBuffer<B>,
624        frame: frame::WindowUpdate,
625    ) -> Result<(), Error> {
626        let id = frame.stream_id();
627
628        let mut send_buffer = send_buffer.inner.lock().unwrap();
629        let send_buffer = &mut *send_buffer;
630
631        if id.is_zero() {
632            self.actions
633                .send
634                .recv_connection_window_update(frame, &mut self.store, &mut self.counts)
635                .map_err(Error::library_go_away)?;
636        } else {
637            // The remote may send window updates for streams that the local now
638            // considers closed. It's ok...
639            if let Some(mut stream) = self.store.find_mut(&id) {
640                // This result is ignored as there is nothing to do when there
641                // is an error. The stream is reset by the function on error and
642                // the error is informational.
643                let _ = self.actions.send.recv_stream_window_update(
644                    frame.size_increment(),
645                    send_buffer,
646                    &mut stream,
647                    &mut self.counts,
648                    &mut self.actions.task,
649                );
650            } else {
651                self.actions
652                    .ensure_not_idle(self.counts.peer(), id)
653                    .map_err(Error::library_go_away)?;
654            }
655        }
656
657        Ok(())
658    }
659
660    fn handle_error<B>(&mut self, send_buffer: &SendBuffer<B>, err: proto::Error) -> StreamId {
661        let actions = &mut self.actions;
662        let counts = &mut self.counts;
663        let mut send_buffer = send_buffer.inner.lock().unwrap();
664        let send_buffer = &mut *send_buffer;
665
666        let last_processed_id = actions.recv.last_processed_id();
667
668        self.store.for_each(|stream| {
669            counts.transition(stream, |counts, stream| {
670                actions.recv.handle_error(&err, &mut *stream);
671                actions.send.handle_error(send_buffer, stream, counts);
672            })
673        });
674
675        actions.conn_error = Some(err);
676
677        last_processed_id
678    }
679
680    fn recv_go_away<B>(
681        &mut self,
682        send_buffer: &SendBuffer<B>,
683        frame: &frame::GoAway,
684    ) -> Result<(), Error> {
685        let actions = &mut self.actions;
686        let counts = &mut self.counts;
687        let mut send_buffer = send_buffer.inner.lock().unwrap();
688        let send_buffer = &mut *send_buffer;
689
690        let last_stream_id = frame.last_stream_id();
691
692        actions.send.recv_go_away(last_stream_id)?;
693
694        let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
695
696        self.store.for_each(|stream| {
697            if stream.id > last_stream_id {
698                counts.transition(stream, |counts, stream| {
699                    actions.recv.handle_error(&err, &mut *stream);
700                    actions.send.handle_error(send_buffer, stream, counts);
701                })
702            }
703        });
704
705        actions.conn_error = Some(err);
706
707        Ok(())
708    }
709
710    fn recv_push_promise<B>(
711        &mut self,
712        send_buffer: &SendBuffer<B>,
713        frame: frame::PushPromise,
714    ) -> Result<(), Error> {
715        let id = frame.stream_id();
716        let promised_id = frame.promised_id();
717
718        // First, ensure that the initiating stream is still in a valid state.
719        let parent_key = match self.store.find_mut(&id) {
720            Some(stream) => {
721                // The GOAWAY process has begun. All streams with a greater ID
722                // than specified as part of GOAWAY should be ignored.
723                if id > self.actions.recv.max_stream_id() {
724                    tracing::trace!(
725                        "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE",
726                        id,
727                        self.actions.recv.max_stream_id()
728                    );
729                    return Ok(());
730                }
731
732                // The stream must be receive open
733                if !stream.state.ensure_recv_open()? {
734                    proto_err!(conn: "recv_push_promise: initiating stream is not opened");
735                    return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
736                }
737
738                stream.key()
739            }
740            None => {
741                proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state");
742                return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
743            }
744        };
745
746        // TODO: Streams in the reserved states do not count towards the concurrency
747        // limit. However, it seems like there should be a cap otherwise this
748        // could grow in memory indefinitely.
749
750        // Ensure that we can reserve streams
751        self.actions.recv.ensure_can_reserve()?;
752
753        // Next, open the stream.
754        //
755        // If `None` is returned, then the stream is being refused. There is no
756        // further work to be done.
757        if self
758            .actions
759            .recv
760            .open(promised_id, Open::PushPromise, &mut self.counts)?
761            .is_none()
762        {
763            return Ok(());
764        }
765
766        // Try to handle the frame and create a corresponding key for the pushed stream
767        // this requires a bit of indirection to make the borrow checker happy.
768        let child_key: Option<store::Key> = {
769            // Create state for the stream
770            let stream = self.store.insert(promised_id, {
771                Stream::new(
772                    promised_id,
773                    self.actions.send.init_window_sz(),
774                    self.actions.recv.init_window_sz(),
775                )
776            });
777
778            let actions = &mut self.actions;
779
780            self.counts.transition(stream, |counts, stream| {
781                let stream_valid = actions.recv.recv_push_promise(frame, stream);
782
783                match stream_valid {
784                    Ok(()) => Ok(Some(stream.key())),
785                    _ => {
786                        let mut send_buffer = send_buffer.inner.lock().unwrap();
787                        actions
788                            .reset_on_recv_stream_err(
789                                &mut *send_buffer,
790                                stream,
791                                counts,
792                                stream_valid,
793                            )
794                            .map(|()| None)
795                    }
796                }
797            })?
798        };
799        // If we're successful, push the headers and stream...
800        if let Some(child) = child_key {
801            let mut ppp = self.store[parent_key].pending_push_promises.take();
802            ppp.push(&mut self.store.resolve(child));
803
804            let parent = &mut self.store.resolve(parent_key);
805            parent.pending_push_promises = ppp;
806            parent.notify_recv();
807        };
808
809        Ok(())
810    }
811
812    fn recv_eof<B>(
813        &mut self,
814        send_buffer: &SendBuffer<B>,
815        clear_pending_accept: bool,
816    ) -> Result<(), ()> {
817        let actions = &mut self.actions;
818        let counts = &mut self.counts;
819        let mut send_buffer = send_buffer.inner.lock().unwrap();
820        let send_buffer = &mut *send_buffer;
821
822        if actions.conn_error.is_none() {
823            actions.conn_error = Some(
824                io::Error::new(
825                    io::ErrorKind::BrokenPipe,
826                    "connection closed because of a broken pipe",
827                )
828                .into(),
829            );
830        }
831
832        tracing::trace!("Streams::recv_eof");
833
834        self.store.for_each(|stream| {
835            counts.transition(stream, |counts, stream| {
836                actions.recv.recv_eof(stream);
837
838                // This handles resetting send state associated with the
839                // stream
840                actions.send.handle_error(send_buffer, stream, counts);
841            })
842        });
843
844        actions.clear_queues(clear_pending_accept, &mut self.store, counts);
845        Ok(())
846    }
847
848    fn poll_complete<T, B>(
849        &mut self,
850        send_buffer: &SendBuffer<B>,
851        cx: &mut Context,
852        dst: &mut Codec<T, Prioritized<B>>,
853    ) -> Poll<io::Result<()>>
854    where
855        T: AsyncWrite + Unpin,
856        B: Buf,
857    {
858        let mut send_buffer = send_buffer.inner.lock().unwrap();
859        let send_buffer = &mut *send_buffer;
860
861        // Send WINDOW_UPDATE frames first
862        //
863        // TODO: It would probably be better to interleave updates w/ data
864        // frames.
865        ready!(self
866            .actions
867            .recv
868            .poll_complete(cx, &mut self.store, &mut self.counts, dst))?;
869
870        // Send any other pending frames
871        ready!(self.actions.send.poll_complete(
872            cx,
873            send_buffer,
874            &mut self.store,
875            &mut self.counts,
876            dst
877        ))?;
878
879        // Nothing else to do, track the task
880        self.actions.task = Some(cx.waker().clone());
881
882        Poll::Ready(Ok(()))
883    }
884
885    fn send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason) {
886        let key = match self.store.find_entry(id) {
887            Entry::Occupied(e) => e.key(),
888            Entry::Vacant(e) => {
889                // Resetting a stream we don't know about? That could be OK...
890                //
891                // 1. As a server, we just received a request, but that request
892                //    was bad, so we're resetting before even accepting it.
893                //    This is totally fine.
894                //
895                // 2. The remote may have sent us a frame on new stream that
896                //    it's *not* supposed to have done, and thus, we don't know
897                //    the stream. In that case, sending a reset will "open" the
898                //    stream in our store. Maybe that should be a connection
899                //    error instead? At least for now, we need to update what
900                //    our vision of the next stream is.
901                if self.counts.peer().is_local_init(id) {
902                    // We normally would open this stream, so update our
903                    // next-send-id record.
904                    self.actions.send.maybe_reset_next_stream_id(id);
905                } else {
906                    // We normally would recv this stream, so update our
907                    // next-recv-id record.
908                    self.actions.recv.maybe_reset_next_stream_id(id);
909                }
910
911                let stream = Stream::new(id, 0, 0);
912
913                e.insert(stream)
914            }
915        };
916
917        let stream = self.store.resolve(key);
918        let mut send_buffer = send_buffer.inner.lock().unwrap();
919        let send_buffer = &mut *send_buffer;
920        self.actions.send_reset(
921            stream,
922            reason,
923            Initiator::Library,
924            &mut self.counts,
925            send_buffer,
926        );
927    }
928}
929
930impl<B> Streams<B, client::Peer>
931where
932    B: Buf,
933{
934    pub fn poll_pending_open(
935        &mut self,
936        cx: &Context,
937        pending: Option<&OpaqueStreamRef>,
938    ) -> Poll<Result<(), crate::Error>> {
939        let mut me = self.inner.lock().unwrap();
940        let me = &mut *me;
941
942        me.actions.ensure_no_conn_error()?;
943        me.actions.send.ensure_next_stream_id()?;
944
945        if let Some(pending) = pending {
946            let mut stream = me.store.resolve(pending.key);
947            tracing::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open);
948            if stream.is_pending_open {
949                stream.wait_send(cx);
950                return Poll::Pending;
951            }
952        }
953        Poll::Ready(Ok(()))
954    }
955}
956
957impl<B, P> Streams<B, P>
958where
959    P: Peer,
960{
961    pub fn as_dyn(&self) -> DynStreams<B> {
962        let Self {
963            inner,
964            send_buffer,
965            _p,
966        } = self;
967        DynStreams {
968            inner,
969            send_buffer,
970            peer: P::r#dyn(),
971        }
972    }
973
974    /// This function is safe to call multiple times.
975    ///
976    /// A `Result` is returned to avoid panicking if the mutex is poisoned.
977    pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
978        self.as_dyn().recv_eof(clear_pending_accept)
979    }
980
981    pub(crate) fn max_send_streams(&self) -> usize {
982        self.inner.lock().unwrap().counts.max_send_streams()
983    }
984
985    pub(crate) fn max_recv_streams(&self) -> usize {
986        self.inner.lock().unwrap().counts.max_recv_streams()
987    }
988
989    #[cfg(feature = "unstable")]
990    pub fn num_active_streams(&self) -> usize {
991        let me = self.inner.lock().unwrap();
992        me.store.num_active_streams()
993    }
994
995    pub fn has_streams(&self) -> bool {
996        let me = self.inner.lock().unwrap();
997        me.counts.has_streams()
998    }
999
1000    pub fn has_streams_or_other_references(&self) -> bool {
1001        let me = self.inner.lock().unwrap();
1002        me.counts.has_streams() || me.refs > 1
1003    }
1004
1005    #[cfg(feature = "unstable")]
1006    pub fn num_wired_streams(&self) -> usize {
1007        let me = self.inner.lock().unwrap();
1008        me.store.num_wired_streams()
1009    }
1010}
1011
1012// no derive because we don't need B and P to be Clone.
1013impl<B, P> Clone for Streams<B, P>
1014where
1015    P: Peer,
1016{
1017    fn clone(&self) -> Self {
1018        self.inner.lock().unwrap().refs += 1;
1019        Streams {
1020            inner: self.inner.clone(),
1021            send_buffer: self.send_buffer.clone(),
1022            _p: ::std::marker::PhantomData,
1023        }
1024    }
1025}
1026
1027impl<B, P> Drop for Streams<B, P>
1028where
1029    P: Peer,
1030{
1031    fn drop(&mut self) {
1032        if let Ok(mut inner) = self.inner.lock() {
1033            inner.refs -= 1;
1034            if inner.refs == 1 {
1035                if let Some(task) = inner.actions.task.take() {
1036                    task.wake();
1037                }
1038            }
1039        }
1040    }
1041}
1042
1043// ===== impl StreamRef =====
1044
1045impl<B> StreamRef<B> {
1046    pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
1047    where
1048        B: Buf,
1049    {
1050        let mut me = self.opaque.inner.lock().unwrap();
1051        let me = &mut *me;
1052
1053        let stream = me.store.resolve(self.opaque.key);
1054        let actions = &mut me.actions;
1055        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1056        let send_buffer = &mut *send_buffer;
1057
1058        me.counts.transition(stream, |counts, stream| {
1059            // Create the data frame
1060            let mut frame = frame::Data::new(stream.id, data);
1061            frame.set_end_stream(end_stream);
1062
1063            // Send the data frame
1064            actions
1065                .send
1066                .send_data(frame, send_buffer, stream, counts, &mut actions.task)
1067        })
1068    }
1069
1070    pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
1071        let mut me = self.opaque.inner.lock().unwrap();
1072        let me = &mut *me;
1073
1074        let stream = me.store.resolve(self.opaque.key);
1075        let actions = &mut me.actions;
1076        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1077        let send_buffer = &mut *send_buffer;
1078
1079        me.counts.transition(stream, |counts, stream| {
1080            // Create the trailers frame
1081            let frame = frame::Headers::trailers(stream.id, trailers);
1082
1083            // Send the trailers frame
1084            actions
1085                .send
1086                .send_trailers(frame, send_buffer, stream, counts, &mut actions.task)
1087        })
1088    }
1089
1090    pub fn send_reset(&mut self, reason: Reason) {
1091        let mut me = self.opaque.inner.lock().unwrap();
1092        let me = &mut *me;
1093
1094        let stream = me.store.resolve(self.opaque.key);
1095        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1096        let send_buffer = &mut *send_buffer;
1097
1098        me.actions
1099            .send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer);
1100    }
1101
1102    pub fn send_response(
1103        &mut self,
1104        mut response: Response<()>,
1105        end_of_stream: bool,
1106    ) -> Result<(), UserError> {
1107        // Clear before taking lock, incase extensions contain a StreamRef.
1108        response.extensions_mut().clear();
1109        let mut me = self.opaque.inner.lock().unwrap();
1110        let me = &mut *me;
1111
1112        let stream = me.store.resolve(self.opaque.key);
1113        let actions = &mut me.actions;
1114        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1115        let send_buffer = &mut *send_buffer;
1116
1117        me.counts.transition(stream, |counts, stream| {
1118            let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
1119
1120            actions
1121                .send
1122                .send_headers(frame, send_buffer, stream, counts, &mut actions.task)
1123        })
1124    }
1125
1126    pub fn send_push_promise(
1127        &mut self,
1128        mut request: Request<()>,
1129    ) -> Result<StreamRef<B>, UserError> {
1130        // Clear before taking lock, incase extensions contain a StreamRef.
1131        request.extensions_mut().clear();
1132        let mut me = self.opaque.inner.lock().unwrap();
1133        let me = &mut *me;
1134
1135        let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1136        let send_buffer = &mut *send_buffer;
1137
1138        let actions = &mut me.actions;
1139        let promised_id = actions.send.reserve_local()?;
1140
1141        let child_key = {
1142            let mut child_stream = me.store.insert(
1143                promised_id,
1144                Stream::new(
1145                    promised_id,
1146                    actions.send.init_window_sz(),
1147                    actions.recv.init_window_sz(),
1148                ),
1149            );
1150            child_stream.state.reserve_local()?;
1151            child_stream.is_pending_push = true;
1152            child_stream.key()
1153        };
1154
1155        let pushed = {
1156            let mut stream = me.store.resolve(self.opaque.key);
1157
1158            let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
1159
1160            actions
1161                .send
1162                .send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
1163        };
1164
1165        if let Err(err) = pushed {
1166            let mut child_stream = me.store.resolve(child_key);
1167            child_stream.unlink();
1168            child_stream.remove();
1169            return Err(err);
1170        }
1171
1172        me.refs += 1;
1173        let opaque =
1174            OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
1175
1176        Ok(StreamRef {
1177            opaque,
1178            send_buffer: self.send_buffer.clone(),
1179        })
1180    }
1181
1182    /// Called by the server after the stream is accepted. Given that clients
1183    /// initialize streams by sending HEADERS, the request will always be
1184    /// available.
1185    ///
1186    /// # Panics
1187    ///
1188    /// This function panics if the request isn't present.
1189    pub fn take_request(&self) -> Request<()> {
1190        let mut me = self.opaque.inner.lock().unwrap();
1191        let me = &mut *me;
1192
1193        let mut stream = me.store.resolve(self.opaque.key);
1194        me.actions.recv.take_request(&mut stream)
1195    }
1196
1197    /// Called by a client to see if the current stream is pending open
1198    pub fn is_pending_open(&self) -> bool {
1199        let mut me = self.opaque.inner.lock().unwrap();
1200        me.store.resolve(self.opaque.key).is_pending_open
1201    }
1202
1203    /// Request capacity to send data
1204    pub fn reserve_capacity(&mut self, capacity: WindowSize) {
1205        let mut me = self.opaque.inner.lock().unwrap();
1206        let me = &mut *me;
1207
1208        let mut stream = me.store.resolve(self.opaque.key);
1209
1210        me.actions
1211            .send
1212            .reserve_capacity(capacity, &mut stream, &mut me.counts)
1213    }
1214
1215    /// Returns the stream's current send capacity.
1216    pub fn capacity(&self) -> WindowSize {
1217        let mut me = self.opaque.inner.lock().unwrap();
1218        let me = &mut *me;
1219
1220        let mut stream = me.store.resolve(self.opaque.key);
1221
1222        me.actions.send.capacity(&mut stream)
1223    }
1224
1225    /// Request to be notified when the stream's capacity increases
1226    pub fn poll_capacity(&mut self, cx: &Context) -> Poll<Option<Result<WindowSize, UserError>>> {
1227        let mut me = self.opaque.inner.lock().unwrap();
1228        let me = &mut *me;
1229
1230        let mut stream = me.store.resolve(self.opaque.key);
1231
1232        me.actions.send.poll_capacity(cx, &mut stream)
1233    }
1234
1235    /// Request to be notified for if a `RST_STREAM` is received for this stream.
1236    pub(crate) fn poll_reset(
1237        &mut self,
1238        cx: &Context,
1239        mode: proto::PollReset,
1240    ) -> Poll<Result<Reason, crate::Error>> {
1241        let mut me = self.opaque.inner.lock().unwrap();
1242        let me = &mut *me;
1243
1244        let mut stream = me.store.resolve(self.opaque.key);
1245
1246        me.actions
1247            .send
1248            .poll_reset(cx, &mut stream, mode)
1249            .map_err(From::from)
1250    }
1251
1252    pub fn clone_to_opaque(&self) -> OpaqueStreamRef {
1253        self.opaque.clone()
1254    }
1255
1256    pub fn stream_id(&self) -> StreamId {
1257        self.opaque.stream_id()
1258    }
1259}
1260
1261impl<B> Clone for StreamRef<B> {
1262    fn clone(&self) -> Self {
1263        StreamRef {
1264            opaque: self.opaque.clone(),
1265            send_buffer: self.send_buffer.clone(),
1266        }
1267    }
1268}
1269
1270// ===== impl OpaqueStreamRef =====
1271
1272impl OpaqueStreamRef {
1273    fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
1274        stream.ref_inc();
1275        OpaqueStreamRef {
1276            inner,
1277            key: stream.key(),
1278        }
1279    }
1280    /// Called by a client to check for a received response.
1281    pub fn poll_response(&mut self, cx: &Context) -> Poll<Result<Response<()>, proto::Error>> {
1282        let mut me = self.inner.lock().unwrap();
1283        let me = &mut *me;
1284
1285        let mut stream = me.store.resolve(self.key);
1286
1287        me.actions.recv.poll_response(cx, &mut stream)
1288    }
1289    /// Called by a client to check for a pushed request.
1290    pub fn poll_pushed(
1291        &mut self,
1292        cx: &Context,
1293    ) -> Poll<Option<Result<(Request<()>, OpaqueStreamRef), proto::Error>>> {
1294        let mut me = self.inner.lock().unwrap();
1295        let me = &mut *me;
1296
1297        let mut stream = me.store.resolve(self.key);
1298        me.actions
1299            .recv
1300            .poll_pushed(cx, &mut stream)
1301            .map_ok(|(h, key)| {
1302                me.refs += 1;
1303                let opaque_ref =
1304                    OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
1305                (h, opaque_ref)
1306            })
1307    }
1308
1309    pub fn is_end_stream(&self) -> bool {
1310        let mut me = self.inner.lock().unwrap();
1311        let me = &mut *me;
1312
1313        let stream = me.store.resolve(self.key);
1314
1315        me.actions.recv.is_end_stream(&stream)
1316    }
1317
1318    pub fn poll_data(&mut self, cx: &Context) -> Poll<Option<Result<Bytes, proto::Error>>> {
1319        let mut me = self.inner.lock().unwrap();
1320        let me = &mut *me;
1321
1322        let mut stream = me.store.resolve(self.key);
1323
1324        me.actions.recv.poll_data(cx, &mut stream)
1325    }
1326
1327    pub fn poll_trailers(&mut self, cx: &Context) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1328        let mut me = self.inner.lock().unwrap();
1329        let me = &mut *me;
1330
1331        let mut stream = me.store.resolve(self.key);
1332
1333        me.actions.recv.poll_trailers(cx, &mut stream)
1334    }
1335
1336    pub(crate) fn available_recv_capacity(&self) -> isize {
1337        let me = self.inner.lock().unwrap();
1338        let me = &*me;
1339
1340        let stream = &me.store[self.key];
1341        stream.recv_flow.available().into()
1342    }
1343
1344    pub(crate) fn used_recv_capacity(&self) -> WindowSize {
1345        let me = self.inner.lock().unwrap();
1346        let me = &*me;
1347
1348        let stream = &me.store[self.key];
1349        stream.in_flight_recv_data
1350    }
1351
1352    /// Releases recv capacity back to the peer. This may result in sending
1353    /// WINDOW_UPDATE frames on both the stream and connection.
1354    pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
1355        let mut me = self.inner.lock().unwrap();
1356        let me = &mut *me;
1357
1358        let mut stream = me.store.resolve(self.key);
1359
1360        me.actions
1361            .recv
1362            .release_capacity(capacity, &mut stream, &mut me.actions.task)
1363    }
1364
1365    /// Clear the receive queue and set the status to no longer receive data frames.
1366    pub(crate) fn clear_recv_buffer(&mut self) {
1367        let mut me = self.inner.lock().unwrap();
1368        let me = &mut *me;
1369
1370        let mut stream = me.store.resolve(self.key);
1371        stream.is_recv = false;
1372        me.actions.recv.clear_recv_buffer(&mut stream);
1373    }
1374
1375    pub fn stream_id(&self) -> StreamId {
1376        self.inner.lock().unwrap().store[self.key].id
1377    }
1378}
1379
1380impl fmt::Debug for OpaqueStreamRef {
1381    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1382        use std::sync::TryLockError::*;
1383
1384        match self.inner.try_lock() {
1385            Ok(me) => {
1386                let stream = &me.store[self.key];
1387                fmt.debug_struct("OpaqueStreamRef")
1388                    .field("stream_id", &stream.id)
1389                    .field("ref_count", &stream.ref_count)
1390                    .finish()
1391            }
1392            Err(Poisoned(_)) => fmt
1393                .debug_struct("OpaqueStreamRef")
1394                .field("inner", &"<Poisoned>")
1395                .finish(),
1396            Err(WouldBlock) => fmt
1397                .debug_struct("OpaqueStreamRef")
1398                .field("inner", &"<Locked>")
1399                .finish(),
1400        }
1401    }
1402}
1403
1404impl Clone for OpaqueStreamRef {
1405    fn clone(&self) -> Self {
1406        // Increment the ref count
1407        let mut inner = self.inner.lock().unwrap();
1408        inner.store.resolve(self.key).ref_inc();
1409        inner.refs += 1;
1410
1411        OpaqueStreamRef {
1412            inner: self.inner.clone(),
1413            key: self.key,
1414        }
1415    }
1416}
1417
1418impl Drop for OpaqueStreamRef {
1419    fn drop(&mut self) {
1420        drop_stream_ref(&self.inner, self.key);
1421    }
1422}
1423
1424// TODO: Move back in fn above
1425fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
1426    let mut me = match inner.lock() {
1427        Ok(inner) => inner,
1428        Err(_) => {
1429            if ::std::thread::panicking() {
1430                tracing::trace!("StreamRef::drop; mutex poisoned");
1431                return;
1432            } else {
1433                panic!("StreamRef::drop; mutex poisoned");
1434            }
1435        }
1436    };
1437
1438    let me = &mut *me;
1439    me.refs -= 1;
1440    let mut stream = me.store.resolve(key);
1441
1442    tracing::trace!("drop_stream_ref; stream={:?}", stream);
1443
1444    // decrement the stream's ref count by 1.
1445    stream.ref_dec();
1446
1447    let actions = &mut me.actions;
1448
1449    // If the stream is not referenced and it is already
1450    // closed (does not have to go through logic below
1451    // of canceling the stream), we should notify the task
1452    // (connection) so that it can close properly
1453    if stream.ref_count == 0 && stream.is_closed() {
1454        if let Some(task) = actions.task.take() {
1455            task.wake();
1456        }
1457    }
1458
1459    me.counts.transition(stream, |counts, stream| {
1460        maybe_cancel(stream, actions, counts);
1461
1462        if stream.ref_count == 0 {
1463            // Release any recv window back to connection, no one can access
1464            // it anymore.
1465            actions
1466                .recv
1467                .release_closed_capacity(stream, &mut actions.task);
1468
1469            // We won't be able to reach our push promises anymore
1470            let mut ppp = stream.pending_push_promises.take();
1471            while let Some(promise) = ppp.pop(stream.store_mut()) {
1472                counts.transition(promise, |counts, stream| {
1473                    maybe_cancel(stream, actions, counts);
1474                });
1475            }
1476        }
1477    });
1478}
1479
1480fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
1481    if stream.is_canceled_interest() {
1482        // Server is allowed to early respond without fully consuming the client input stream
1483        // But per the RFC, must send a RST_STREAM(NO_ERROR) in such cases. https://www.rfc-editor.org/rfc/rfc7540#section-8.1
1484        // Some other http2 implementation may interpret other error code as fatal if not respected (i.e: nginx https://trac.nginx.org/nginx/ticket/2376)
1485        let reason = if counts.peer().is_server()
1486            && stream.state.is_send_closed()
1487            && stream.state.is_recv_streaming()
1488        {
1489            Reason::NO_ERROR
1490        } else {
1491            Reason::CANCEL
1492        };
1493
1494        actions
1495            .send
1496            .schedule_implicit_reset(stream, reason, counts, &mut actions.task);
1497        actions.recv.enqueue_reset_expiration(stream, counts);
1498    }
1499}
1500
1501// ===== impl SendBuffer =====
1502
1503impl<B> SendBuffer<B> {
1504    fn new() -> Self {
1505        let inner = Mutex::new(Buffer::new());
1506        SendBuffer { inner }
1507    }
1508}
1509
1510// ===== impl Actions =====
1511
1512impl Actions {
1513    fn send_reset<B>(
1514        &mut self,
1515        stream: store::Ptr,
1516        reason: Reason,
1517        initiator: Initiator,
1518        counts: &mut Counts,
1519        send_buffer: &mut Buffer<Frame<B>>,
1520    ) {
1521        counts.transition(stream, |counts, stream| {
1522            self.send.send_reset(
1523                reason,
1524                initiator,
1525                send_buffer,
1526                stream,
1527                counts,
1528                &mut self.task,
1529            );
1530            self.recv.enqueue_reset_expiration(stream, counts);
1531            // if a RecvStream is parked, ensure it's notified
1532            stream.notify_recv();
1533        });
1534    }
1535
1536    fn reset_on_recv_stream_err<B>(
1537        &mut self,
1538        buffer: &mut Buffer<Frame<B>>,
1539        stream: &mut store::Ptr,
1540        counts: &mut Counts,
1541        res: Result<(), Error>,
1542    ) -> Result<(), Error> {
1543        if let Err(Error::Reset(stream_id, reason, initiator)) = res {
1544            debug_assert_eq!(stream_id, stream.id);
1545
1546            if counts.can_inc_num_local_error_resets() {
1547                counts.inc_num_local_error_resets();
1548
1549                // Reset the stream.
1550                self.send
1551                    .send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
1552                Ok(())
1553            } else {
1554                tracing::warn!(
1555                    "reset_on_recv_stream_err; locally-reset streams reached limit ({:?})",
1556                    counts.max_local_error_resets().unwrap(),
1557                );
1558                Err(Error::library_go_away_data(
1559                    Reason::ENHANCE_YOUR_CALM,
1560                    "too_many_internal_resets",
1561                ))
1562            }
1563        } else {
1564            res
1565        }
1566    }
1567
1568    fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
1569        if peer.is_local_init(id) {
1570            self.send.ensure_not_idle(id)
1571        } else {
1572            self.recv.ensure_not_idle(id)
1573        }
1574    }
1575
1576    fn ensure_no_conn_error(&self) -> Result<(), proto::Error> {
1577        if let Some(ref err) = self.conn_error {
1578            Err(err.clone())
1579        } else {
1580            Ok(())
1581        }
1582    }
1583
1584    /// Check if we possibly could have processed and since forgotten this stream.
1585    ///
1586    /// If we send a RST_STREAM for a stream, we will eventually "forget" about
1587    /// the stream to free up memory. It's possible that the remote peer had
1588    /// frames in-flight, and by the time we receive them, our own state is
1589    /// gone. We *could* tear everything down by sending a GOAWAY, but it
1590    /// is more likely to be latency/memory constraints that caused this,
1591    /// and not a bad actor. So be less catastrophic, the spec allows
1592    /// us to send another RST_STREAM of STREAM_CLOSED.
1593    fn may_have_forgotten_stream(&self, peer: peer::Dyn, id: StreamId) -> bool {
1594        if id.is_zero() {
1595            return false;
1596        }
1597        if peer.is_local_init(id) {
1598            self.send.may_have_created_stream(id)
1599        } else {
1600            self.recv.may_have_created_stream(id)
1601        }
1602    }
1603
1604    fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) {
1605        self.recv.clear_queues(clear_pending_accept, store, counts);
1606        self.send.clear_queues(store, counts);
1607    }
1608}