h2/
server.rs

1//! Server implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 server requires the caller to manage accepting the
6//! connections as well as getting the connections to a state that is ready to
7//! begin the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpListener`] to accept
11//! connections, but usually it means using either ALPN or HTTP/1.1 protocol
12//! upgrades.
13//!
14//! Once a connection is obtained, it is passed to [`handshake`],
15//! which will begin the [HTTP/2 handshake]. This returns a future that
16//! completes once the handshake process is performed and HTTP/2 streams may
17//! be received.
18//!
19//! [`handshake`] uses default configuration values. There are a number of
20//! settings that can be changed by using [`Builder`] instead.
21//!
22//! # Inbound streams
23//!
24//! The [`Connection`] instance is used to accept inbound HTTP/2 streams. It
25//! does this by implementing [`futures::Stream`]. When a new stream is
26//! received, a call to [`Connection::accept`] will return `(request, response)`.
27//! The `request` handle (of type [`http::Request<RecvStream>`]) contains the
28//! HTTP request head as well as provides a way to receive the inbound data
29//! stream and the trailers. The `response` handle (of type [`SendResponse`])
30//! allows responding to the request, stream the response payload, send
31//! trailers, and send push promises.
32//!
33//! The send ([`SendStream`]) and receive ([`RecvStream`]) halves of the stream
34//! can be operated independently.
35//!
36//! # Managing the connection
37//!
38//! The [`Connection`] instance is used to manage connection state. The caller
39//! is required to call either [`Connection::accept`] or
40//! [`Connection::poll_close`] in order to advance the connection state. Simply
41//! operating on [`SendStream`] or [`RecvStream`] will have no effect unless the
42//! connection state is advanced.
43//!
44//! It is not required to call **both** [`Connection::accept`] and
45//! [`Connection::poll_close`]. If the caller is ready to accept a new stream,
46//! then only [`Connection::accept`] should be called. When the caller **does
47//! not** want to accept a new stream, [`Connection::poll_close`] should be
48//! called.
49//!
50//! The [`Connection`] instance should only be dropped once
51//! [`Connection::poll_close`] returns `Ready`. Once [`Connection::accept`]
52//! returns `Ready(None)`, there will no longer be any more inbound streams. At
53//! this point, only [`Connection::poll_close`] should be called.
54//!
55//! # Shutting down the server
56//!
57//! Graceful shutdown of the server is [not yet
58//! implemented](https://github.com/hyperium/h2/issues/69).
59//!
60//! # Example
61//!
62//! A basic HTTP/2 server example that runs over TCP and assumes [prior
63//! knowledge], i.e. both the client and the server assume that the TCP socket
64//! will use the HTTP/2 protocol without prior negotiation.
65//!
66//! ```no_run
67//! use h2::server;
68//! use http::{Response, StatusCode};
69//! use tokio::net::TcpListener;
70//!
71//! #[tokio::main]
72//! pub async fn main() {
73//!     let mut listener = TcpListener::bind("127.0.0.1:5928").await.unwrap();
74//!
75//!     // Accept all incoming TCP connections.
76//!     loop {
77//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
78//!             // Spawn a new task to process each connection.
79//!             tokio::spawn(async {
80//!                 // Start the HTTP/2 connection handshake
81//!                 let mut h2 = server::handshake(socket).await.unwrap();
82//!                 // Accept all inbound HTTP/2 streams sent over the
83//!                 // connection.
84//!                 while let Some(request) = h2.accept().await {
85//!                     let (request, mut respond) = request.unwrap();
86//!                     println!("Received request: {:?}", request);
87//!
88//!                     // Build a response with no body
89//!                     let response = Response::builder()
90//!                         .status(StatusCode::OK)
91//!                         .body(())
92//!                         .unwrap();
93//!
94//!                     // Send the response back to the client
95//!                     respond.send_response(response, true)
96//!                         .unwrap();
97//!                 }
98//!
99//!             });
100//!         }
101//!     }
102//! }
103//! ```
104//!
105//! [prior knowledge]: http://httpwg.org/specs/rfc7540.html#known-http
106//! [`handshake`]: fn.handshake.html
107//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
108//! [`Builder`]: struct.Builder.html
109//! [`Connection`]: struct.Connection.html
110//! [`Connection::poll`]: struct.Connection.html#method.poll
111//! [`Connection::poll_close`]: struct.Connection.html#method.poll_close
112//! [`futures::Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
113//! [`http::Request<RecvStream>`]: ../struct.RecvStream.html
114//! [`RecvStream`]: ../struct.RecvStream.html
115//! [`SendStream`]: ../struct.SendStream.html
116//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
117
118use crate::codec::{Codec, UserError};
119use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
120use crate::proto::{self, Config, Error, Prioritized};
121use crate::{FlowControl, PingPong, RecvStream, SendStream};
122
123use bytes::{Buf, Bytes};
124use http::{HeaderMap, Method, Request, Response};
125use std::future::Future;
126use std::pin::Pin;
127use std::task::{Context, Poll};
128use std::time::Duration;
129use std::{fmt, io};
130use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
131use tracing::instrument::{Instrument, Instrumented};
132
133/// In progress HTTP/2 connection handshake future.
134///
135/// This type implements `Future`, yielding a `Connection` instance once the
136/// handshake has completed.
137///
138/// The handshake is completed once the connection preface is fully received
139/// from the client **and** the initial settings frame is sent to the client.
140///
141/// The handshake future does not wait for the initial settings frame from the
142/// client.
143///
144/// See [module] level docs for more details.
145///
146/// [module]: index.html
147#[must_use = "futures do nothing unless polled"]
148pub struct Handshake<T, B: Buf = Bytes> {
149    /// The config to pass to Connection::new after handshake succeeds.
150    builder: Builder,
151    /// The current state of the handshake.
152    state: Handshaking<T, B>,
153    /// Span tracking the handshake
154    span: tracing::Span,
155}
156
157/// Accepts inbound HTTP/2 streams on a connection.
158///
159/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
160/// implements the HTTP/2 server logic for that connection. It is responsible
161/// for receiving inbound streams initiated by the client as well as driving the
162/// internal state forward.
163///
164/// `Connection` values are created by calling [`handshake`]. Once a
165/// `Connection` value is obtained, the caller must call [`poll`] or
166/// [`poll_close`] in order to drive the internal connection state forward.
167///
168/// See [module level] documentation for more details
169///
170/// [module level]: index.html
171/// [`handshake`]: struct.Connection.html#method.handshake
172/// [`poll`]: struct.Connection.html#method.poll
173/// [`poll_close`]: struct.Connection.html#method.poll_close
174///
175/// # Examples
176///
177/// ```
178/// # use tokio::io::{AsyncRead, AsyncWrite};
179/// # use h2::server;
180/// # use h2::server::*;
181/// #
182/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
183/// let mut server = server::handshake(my_io).await.unwrap();
184/// while let Some(request) = server.accept().await {
185///     tokio::spawn(async move {
186///         let (request, respond) = request.unwrap();
187///         // Process the request and send the response back to the client
188///         // using `respond`.
189///     });
190/// }
191/// # }
192/// #
193/// # pub fn main() {}
194/// ```
195#[must_use = "streams do nothing unless polled"]
196pub struct Connection<T, B: Buf> {
197    connection: proto::Connection<T, Peer, B>,
198}
199
200/// Builds server connections with custom configuration values.
201///
202/// Methods can be chained in order to set the configuration values.
203///
204/// The server is constructed by calling [`handshake`] and passing the I/O
205/// handle that will back the HTTP/2 server.
206///
207/// New instances of `Builder` are obtained via [`Builder::new`].
208///
209/// See function level documentation for details on the various server
210/// configuration settings.
211///
212/// [`Builder::new`]: struct.Builder.html#method.new
213/// [`handshake`]: struct.Builder.html#method.handshake
214///
215/// # Examples
216///
217/// ```
218/// # use tokio::io::{AsyncRead, AsyncWrite};
219/// # use h2::server::*;
220/// #
221/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
222/// # -> Handshake<T>
223/// # {
224/// // `server_fut` is a future representing the completion of the HTTP/2
225/// // handshake.
226/// let server_fut = Builder::new()
227///     .initial_window_size(1_000_000)
228///     .max_concurrent_streams(1000)
229///     .handshake(my_io);
230/// # server_fut
231/// # }
232/// #
233/// # pub fn main() {}
234/// ```
235#[derive(Clone, Debug)]
236pub struct Builder {
237    /// Time to keep locally reset streams around before reaping.
238    reset_stream_duration: Duration,
239
240    /// Maximum number of locally reset streams to keep at a time.
241    reset_stream_max: usize,
242
243    /// Maximum number of remotely reset streams to allow in the pending
244    /// accept queue.
245    pending_accept_reset_stream_max: usize,
246
247    /// Initial `Settings` frame to send as part of the handshake.
248    settings: Settings,
249
250    /// Initial target window size for new connections.
251    initial_target_connection_window_size: Option<u32>,
252
253    /// Maximum amount of bytes to "buffer" for writing per stream.
254    max_send_buffer_size: usize,
255
256    /// Maximum number of locally reset streams due to protocol error across
257    /// the lifetime of the connection.
258    ///
259    /// When this gets exceeded, we issue GOAWAYs.
260    local_max_error_reset_streams: Option<usize>,
261}
262
263/// Send a response back to the client
264///
265/// A `SendResponse` instance is provided when receiving a request and is used
266/// to send the associated response back to the client. It is also used to
267/// explicitly reset the stream with a custom reason.
268///
269/// It will also be used to initiate push promises linked with the associated
270/// stream.
271///
272/// If the `SendResponse` instance is dropped without sending a response, then
273/// the HTTP/2 stream will be reset.
274///
275/// See [module] level docs for more details.
276///
277/// [module]: index.html
278#[derive(Debug)]
279pub struct SendResponse<B: Buf> {
280    inner: proto::StreamRef<B>,
281}
282
283/// Send a response to a promised request
284///
285/// A `SendPushedResponse` instance is provided when promising a request and is used
286/// to send the associated response to the client. It is also used to
287/// explicitly reset the stream with a custom reason.
288///
289/// It can not be used to initiate push promises.
290///
291/// If the `SendPushedResponse` instance is dropped without sending a response, then
292/// the HTTP/2 stream will be reset.
293///
294/// See [module] level docs for more details.
295///
296/// [module]: index.html
297pub struct SendPushedResponse<B: Buf> {
298    inner: SendResponse<B>,
299}
300
301// Manual implementation necessary because of rust-lang/rust#26925
302impl<B: Buf + fmt::Debug> fmt::Debug for SendPushedResponse<B> {
303    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
304        write!(f, "SendPushedResponse {{ {:?} }}", self.inner)
305    }
306}
307
308/// Stages of an in-progress handshake.
309enum Handshaking<T, B: Buf> {
310    /// State 1. Connection is flushing pending SETTINGS frame.
311    Flushing(Instrumented<Flush<T, Prioritized<B>>>),
312    /// State 2. Connection is waiting for the client preface.
313    ReadingPreface(Instrumented<ReadPreface<T, Prioritized<B>>>),
314    /// State 3. Handshake is done, polling again would panic.
315    Done,
316}
317
318/// Flush a Sink
319struct Flush<T, B> {
320    codec: Option<Codec<T, B>>,
321}
322
323/// Read the client connection preface
324struct ReadPreface<T, B> {
325    codec: Option<Codec<T, B>>,
326    pos: usize,
327}
328
329#[derive(Debug)]
330pub(crate) struct Peer;
331
332const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
333
334/// Creates a new configured HTTP/2 server with default configuration
335/// values backed by `io`.
336///
337/// It is expected that `io` already be in an appropriate state to commence
338/// the [HTTP/2 handshake]. See [Handshake] for more details.
339///
340/// Returns a future which resolves to the [`Connection`] instance once the
341/// HTTP/2 handshake has been completed. The returned [`Connection`]
342/// instance will be using default configuration values. Use [`Builder`] to
343/// customize the configuration values used by a [`Connection`] instance.
344///
345/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
346/// [Handshake]: ../index.html#handshake
347/// [`Connection`]: struct.Connection.html
348///
349/// # Examples
350///
351/// ```
352/// # use tokio::io::{AsyncRead, AsyncWrite};
353/// # use h2::server;
354/// # use h2::server::*;
355/// #
356/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
357/// # {
358/// let connection = server::handshake(my_io).await.unwrap();
359/// // The HTTP/2 handshake has completed, now use `connection` to
360/// // accept inbound HTTP/2 streams.
361/// # }
362/// #
363/// # pub fn main() {}
364/// ```
365pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
366where
367    T: AsyncRead + AsyncWrite + Unpin,
368{
369    Builder::new().handshake(io)
370}
371
372// ===== impl Connection =====
373
374impl<T, B> Connection<T, B>
375where
376    T: AsyncRead + AsyncWrite + Unpin,
377    B: Buf,
378{
379    fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
380        let span = tracing::trace_span!("server_handshake");
381        let entered = span.enter();
382
383        // Create the codec.
384        let mut codec = Codec::new(io);
385
386        if let Some(max) = builder.settings.max_frame_size() {
387            codec.set_max_recv_frame_size(max as usize);
388        }
389
390        if let Some(max) = builder.settings.max_header_list_size() {
391            codec.set_max_recv_header_list_size(max as usize);
392        }
393
394        // Send initial settings frame.
395        codec
396            .buffer(builder.settings.clone().into())
397            .expect("invalid SETTINGS frame");
398
399        // Create the handshake future.
400        let state =
401            Handshaking::Flushing(Flush::new(codec).instrument(tracing::trace_span!("flush")));
402
403        drop(entered);
404
405        Handshake {
406            builder,
407            state,
408            span,
409        }
410    }
411
412    /// Accept the next incoming request on this connection.
413    pub async fn accept(
414        &mut self,
415    ) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
416        futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await
417    }
418
419    #[doc(hidden)]
420    pub fn poll_accept(
421        &mut self,
422        cx: &mut Context<'_>,
423    ) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
424        // Always try to advance the internal state. Getting Pending also is
425        // needed to allow this function to return Pending.
426        if self.poll_closed(cx)?.is_ready() {
427            // If the socket is closed, don't return anything
428            // TODO: drop any pending streams
429            return Poll::Ready(None);
430        }
431
432        if let Some(inner) = self.connection.next_incoming() {
433            tracing::trace!("received incoming");
434            let (head, _) = inner.take_request().into_parts();
435            let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
436
437            let request = Request::from_parts(head, body);
438            let respond = SendResponse { inner };
439
440            return Poll::Ready(Some(Ok((request, respond))));
441        }
442
443        Poll::Pending
444    }
445
446    /// Sets the target window size for the whole connection.
447    ///
448    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
449    /// frame will be immediately sent to the remote, increasing the connection
450    /// level window by `size - current_value`.
451    ///
452    /// If `size` is less than the current value, nothing will happen
453    /// immediately. However, as window capacity is released by
454    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
455    /// out until the number of "in flight" bytes drops below `size`.
456    ///
457    /// The default value is 65,535.
458    ///
459    /// See [`FlowControl`] documentation for more details.
460    ///
461    /// [`FlowControl`]: ../struct.FlowControl.html
462    /// [library level]: ../index.html#flow-control
463    pub fn set_target_window_size(&mut self, size: u32) {
464        assert!(size <= proto::MAX_WINDOW_SIZE);
465        self.connection.set_target_window_size(size);
466    }
467
468    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
469    /// flow control for received data.
470    ///
471    /// The `SETTINGS` will be sent to the remote, and only applied once the
472    /// remote acknowledges the change.
473    ///
474    /// This can be used to increase or decrease the window size for existing
475    /// streams.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if a previous call is still pending acknowledgement
480    /// from the remote endpoint.
481    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
482        assert!(size <= proto::MAX_WINDOW_SIZE);
483        self.connection.set_initial_window_size(size)?;
484        Ok(())
485    }
486
487    /// Enables the [extended CONNECT protocol].
488    ///
489    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if a previous call is still pending acknowledgement
494    /// from the remote endpoint.
495    pub fn enable_connect_protocol(&mut self) -> Result<(), crate::Error> {
496        self.connection.set_enable_connect_protocol()?;
497        Ok(())
498    }
499
500    /// Returns `Ready` when the underlying connection has closed.
501    ///
502    /// If any new inbound streams are received during a call to `poll_closed`,
503    /// they will be queued and returned on the next call to [`poll_accept`].
504    ///
505    /// This function will advance the internal connection state, driving
506    /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
507    ///
508    /// See [here](index.html#managing-the-connection) for more details.
509    ///
510    /// [`poll_accept`]: struct.Connection.html#method.poll_accept
511    /// [`RecvStream`]: ../struct.RecvStream.html
512    /// [`SendStream`]: ../struct.SendStream.html
513    pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
514        self.connection.poll(cx).map_err(Into::into)
515    }
516
517    #[doc(hidden)]
518    #[deprecated(note = "renamed to poll_closed")]
519    pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
520        self.poll_closed(cx)
521    }
522
523    /// Sets the connection to a GOAWAY state.
524    ///
525    /// Does not terminate the connection. Must continue being polled to close
526    /// connection.
527    ///
528    /// After flushing the GOAWAY frame, the connection is closed. Any
529    /// outstanding streams do not prevent the connection from closing. This
530    /// should usually be reserved for shutting down when something bad
531    /// external to `h2` has happened, and open streams cannot be properly
532    /// handled.
533    ///
534    /// For graceful shutdowns, see [`graceful_shutdown`](Connection::graceful_shutdown).
535    pub fn abrupt_shutdown(&mut self, reason: Reason) {
536        self.connection.go_away_from_user(reason);
537    }
538
539    /// Starts a [graceful shutdown][1] process.
540    ///
541    /// Must continue being polled to close connection.
542    ///
543    /// It's possible to receive more requests after calling this method, since
544    /// they might have been in-flight from the client already. After about
545    /// 1 RTT, no new requests should be accepted. Once all active streams
546    /// have completed, the connection is closed.
547    ///
548    /// [1]: http://httpwg.org/specs/rfc7540.html#GOAWAY
549    pub fn graceful_shutdown(&mut self) {
550        self.connection.go_away_gracefully();
551    }
552
553    /// Takes a `PingPong` instance from the connection.
554    ///
555    /// # Note
556    ///
557    /// This may only be called once. Calling multiple times will return `None`.
558    pub fn ping_pong(&mut self) -> Option<PingPong> {
559        self.connection.take_user_pings().map(PingPong::new)
560    }
561
562    /// Returns the maximum number of concurrent streams that may be initiated
563    /// by the server on this connection.
564    ///
565    /// This limit is configured by the client peer by sending the
566    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
567    /// This method returns the currently acknowledged value received from the
568    /// remote.
569    ///
570    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
571    pub fn max_concurrent_send_streams(&self) -> usize {
572        self.connection.max_send_streams()
573    }
574
575    /// Returns the maximum number of concurrent streams that may be initiated
576    /// by the client on this connection.
577    ///
578    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
579    /// parameter][1] sent in a `SETTINGS` frame that has been
580    /// acknowledged by the remote peer. The value to be sent is configured by
581    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
582    /// with the remote peer.
583    ///
584    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
585    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
586    pub fn max_concurrent_recv_streams(&self) -> usize {
587        self.connection.max_recv_streams()
588    }
589
590    // Could disappear at anytime.
591    #[doc(hidden)]
592    #[cfg(feature = "unstable")]
593    pub fn num_wired_streams(&self) -> usize {
594        self.connection.num_wired_streams()
595    }
596}
597
598#[cfg(feature = "stream")]
599impl<T, B> futures_core::Stream for Connection<T, B>
600where
601    T: AsyncRead + AsyncWrite + Unpin,
602    B: Buf,
603{
604    type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
605
606    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
607        self.poll_accept(cx)
608    }
609}
610
611impl<T, B> fmt::Debug for Connection<T, B>
612where
613    T: fmt::Debug,
614    B: fmt::Debug + Buf,
615{
616    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
617        fmt.debug_struct("Connection")
618            .field("connection", &self.connection)
619            .finish()
620    }
621}
622
623// ===== impl Builder =====
624
625impl Builder {
626    /// Returns a new server builder instance initialized with default
627    /// configuration values.
628    ///
629    /// Configuration methods can be chained on the return value.
630    ///
631    /// # Examples
632    ///
633    /// ```
634    /// # use tokio::io::{AsyncRead, AsyncWrite};
635    /// # use h2::server::*;
636    /// #
637    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
638    /// # -> Handshake<T>
639    /// # {
640    /// // `server_fut` is a future representing the completion of the HTTP/2
641    /// // handshake.
642    /// let server_fut = Builder::new()
643    ///     .initial_window_size(1_000_000)
644    ///     .max_concurrent_streams(1000)
645    ///     .handshake(my_io);
646    /// # server_fut
647    /// # }
648    /// #
649    /// # pub fn main() {}
650    /// ```
651    pub fn new() -> Builder {
652        Builder {
653            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
654            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
655            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
656            settings: Settings::default(),
657            initial_target_connection_window_size: None,
658            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
659
660            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
661        }
662    }
663
664    /// Indicates the initial window size (in octets) for stream-level
665    /// flow control for received data.
666    ///
667    /// The initial window of a stream is used as part of flow control. For more
668    /// details, see [`FlowControl`].
669    ///
670    /// The default value is 65,535.
671    ///
672    /// [`FlowControl`]: ../struct.FlowControl.html
673    ///
674    /// # Examples
675    ///
676    /// ```
677    /// # use tokio::io::{AsyncRead, AsyncWrite};
678    /// # use h2::server::*;
679    /// #
680    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
681    /// # -> Handshake<T>
682    /// # {
683    /// // `server_fut` is a future representing the completion of the HTTP/2
684    /// // handshake.
685    /// let server_fut = Builder::new()
686    ///     .initial_window_size(1_000_000)
687    ///     .handshake(my_io);
688    /// # server_fut
689    /// # }
690    /// #
691    /// # pub fn main() {}
692    /// ```
693    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
694        self.settings.set_initial_window_size(Some(size));
695        self
696    }
697
698    /// Indicates the initial window size (in octets) for connection-level flow control
699    /// for received data.
700    ///
701    /// The initial window of a connection is used as part of flow control. For more details,
702    /// see [`FlowControl`].
703    ///
704    /// The default value is 65,535.
705    ///
706    /// [`FlowControl`]: ../struct.FlowControl.html
707    ///
708    /// # Examples
709    ///
710    /// ```
711    /// # use tokio::io::{AsyncRead, AsyncWrite};
712    /// # use h2::server::*;
713    /// #
714    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
715    /// # -> Handshake<T>
716    /// # {
717    /// // `server_fut` is a future representing the completion of the HTTP/2
718    /// // handshake.
719    /// let server_fut = Builder::new()
720    ///     .initial_connection_window_size(1_000_000)
721    ///     .handshake(my_io);
722    /// # server_fut
723    /// # }
724    /// #
725    /// # pub fn main() {}
726    /// ```
727    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
728        self.initial_target_connection_window_size = Some(size);
729        self
730    }
731
732    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
733    /// configured server is able to accept.
734    ///
735    /// The sender may send data frames that are **smaller** than this value,
736    /// but any data larger than `max` will be broken up into multiple `DATA`
737    /// frames.
738    ///
739    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
740    ///
741    /// # Examples
742    ///
743    /// ```
744    /// # use tokio::io::{AsyncRead, AsyncWrite};
745    /// # use h2::server::*;
746    /// #
747    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
748    /// # -> Handshake<T>
749    /// # {
750    /// // `server_fut` is a future representing the completion of the HTTP/2
751    /// // handshake.
752    /// let server_fut = Builder::new()
753    ///     .max_frame_size(1_000_000)
754    ///     .handshake(my_io);
755    /// # server_fut
756    /// # }
757    /// #
758    /// # pub fn main() {}
759    /// ```
760    ///
761    /// # Panics
762    ///
763    /// This function panics if `max` is not within the legal range specified
764    /// above.
765    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
766        self.settings.set_max_frame_size(Some(max));
767        self
768    }
769
770    /// Sets the max size of received header frames.
771    ///
772    /// This advisory setting informs a peer of the maximum size of header list
773    /// that the sender is prepared to accept, in octets. The value is based on
774    /// the uncompressed size of header fields, including the length of the name
775    /// and value in octets plus an overhead of 32 octets for each header field.
776    ///
777    /// This setting is also used to limit the maximum amount of data that is
778    /// buffered to decode HEADERS frames.
779    ///
780    /// # Examples
781    ///
782    /// ```
783    /// # use tokio::io::{AsyncRead, AsyncWrite};
784    /// # use h2::server::*;
785    /// #
786    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787    /// # -> Handshake<T>
788    /// # {
789    /// // `server_fut` is a future representing the completion of the HTTP/2
790    /// // handshake.
791    /// let server_fut = Builder::new()
792    ///     .max_header_list_size(16 * 1024)
793    ///     .handshake(my_io);
794    /// # server_fut
795    /// # }
796    /// #
797    /// # pub fn main() {}
798    /// ```
799    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
800        self.settings.set_max_header_list_size(Some(max));
801        self
802    }
803
804    /// Sets the maximum number of concurrent streams.
805    ///
806    /// The maximum concurrent streams setting only controls the maximum number
807    /// of streams that can be initiated by the remote peer. In other words,
808    /// when this setting is set to 100, this does not limit the number of
809    /// concurrent streams that can be created by the caller.
810    ///
811    /// It is recommended that this value be no smaller than 100, so as to not
812    /// unnecessarily limit parallelism. However, any value is legal, including
813    /// 0. If `max` is set to 0, then the remote will not be permitted to
814    /// initiate streams.
815    ///
816    /// Note that streams in the reserved state, i.e., push promises that have
817    /// been reserved but the stream has not started, do not count against this
818    /// setting.
819    ///
820    /// Also note that if the remote *does* exceed the value set here, it is not
821    /// a protocol level error. Instead, the `h2` library will immediately reset
822    /// the stream.
823    ///
824    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
825    ///
826    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
827    ///
828    /// # Examples
829    ///
830    /// ```
831    /// # use tokio::io::{AsyncRead, AsyncWrite};
832    /// # use h2::server::*;
833    /// #
834    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
835    /// # -> Handshake<T>
836    /// # {
837    /// // `server_fut` is a future representing the completion of the HTTP/2
838    /// // handshake.
839    /// let server_fut = Builder::new()
840    ///     .max_concurrent_streams(1000)
841    ///     .handshake(my_io);
842    /// # server_fut
843    /// # }
844    /// #
845    /// # pub fn main() {}
846    /// ```
847    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
848        self.settings.set_max_concurrent_streams(Some(max));
849        self
850    }
851
852    /// Sets the maximum number of concurrent locally reset streams.
853    ///
854    /// When a stream is explicitly reset by either calling
855    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
856    /// before completing the stream, the HTTP/2 specification requires that
857    /// any further frames received for that stream must be ignored for "some
858    /// time".
859    ///
860    /// In order to satisfy the specification, internal state must be maintained
861    /// to implement the behavior. This state grows linearly with the number of
862    /// streams that are locally reset.
863    ///
864    /// The `max_concurrent_reset_streams` setting configures sets an upper
865    /// bound on the amount of state that is maintained. When this max value is
866    /// reached, the oldest reset stream is purged from memory.
867    ///
868    /// Once the stream has been fully purged from memory, any additional frames
869    /// received for that stream will result in a connection level protocol
870    /// error, forcing the connection to terminate.
871    ///
872    /// The default value is 10.
873    ///
874    /// # Examples
875    ///
876    /// ```
877    /// # use tokio::io::{AsyncRead, AsyncWrite};
878    /// # use h2::server::*;
879    /// #
880    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
881    /// # -> Handshake<T>
882    /// # {
883    /// // `server_fut` is a future representing the completion of the HTTP/2
884    /// // handshake.
885    /// let server_fut = Builder::new()
886    ///     .max_concurrent_reset_streams(1000)
887    ///     .handshake(my_io);
888    /// # server_fut
889    /// # }
890    /// #
891    /// # pub fn main() {}
892    /// ```
893    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
894        self.reset_stream_max = max;
895        self
896    }
897
898    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
899    ///
900    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
901    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
902    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
903    ///
904    /// When the number of local resets exceeds this threshold, the server will issue GOAWAYs with an error code of
905    /// `ENHANCE_YOUR_CALM` to the client.
906    ///
907    /// If you really want to disable this, supply [`Option::None`] here.
908    /// Disabling this is not recommended and may expose you to DOS attacks.
909    ///
910    /// The default value is currently 1024, but could change.
911    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
912        self.local_max_error_reset_streams = max;
913        self
914    }
915
916    /// Sets the maximum number of pending-accept remotely-reset streams.
917    ///
918    /// Streams that have been received by the peer, but not accepted by the
919    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
920    /// could send a request and then shortly after, realize it is not needed,
921    /// sending a CANCEL.
922    ///
923    /// However, since those streams are now "closed", they don't count towards
924    /// the max concurrent streams. So, they will sit in the accept queue,
925    /// using memory.
926    ///
927    /// When the number of remotely-reset streams sitting in the pending-accept
928    /// queue reaches this maximum value, a connection error with the code of
929    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
930    /// `Future`.
931    ///
932    /// The default value is currently 20, but could change.
933    ///
934    /// # Examples
935    ///
936    ///
937    /// ```
938    /// # use tokio::io::{AsyncRead, AsyncWrite};
939    /// # use h2::server::*;
940    /// #
941    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
942    /// # -> Handshake<T>
943    /// # {
944    /// // `server_fut` is a future representing the completion of the HTTP/2
945    /// // handshake.
946    /// let server_fut = Builder::new()
947    ///     .max_pending_accept_reset_streams(100)
948    ///     .handshake(my_io);
949    /// # server_fut
950    /// # }
951    /// #
952    /// # pub fn main() {}
953    /// ```
954    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
955        self.pending_accept_reset_stream_max = max;
956        self
957    }
958
959    /// Sets the maximum send buffer size per stream.
960    ///
961    /// Once a stream has buffered up to (or over) the maximum, the stream's
962    /// flow control will not "poll" additional capacity. Once bytes for the
963    /// stream have been written to the connection, the send buffer capacity
964    /// will be freed up again.
965    ///
966    /// The default is currently ~400KB, but may change.
967    ///
968    /// # Panics
969    ///
970    /// This function panics if `max` is larger than `u32::MAX`.
971    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
972        assert!(max <= std::u32::MAX as usize);
973        self.max_send_buffer_size = max;
974        self
975    }
976
977    /// Sets the maximum number of concurrent locally reset streams.
978    ///
979    /// When a stream is explicitly reset by either calling
980    /// [`SendResponse::send_reset`] or by dropping a [`SendResponse`] instance
981    /// before completing the stream, the HTTP/2 specification requires that
982    /// any further frames received for that stream must be ignored for "some
983    /// time".
984    ///
985    /// In order to satisfy the specification, internal state must be maintained
986    /// to implement the behavior. This state grows linearly with the number of
987    /// streams that are locally reset.
988    ///
989    /// The `reset_stream_duration` setting configures the max amount of time
990    /// this state will be maintained in memory. Once the duration elapses, the
991    /// stream state is purged from memory.
992    ///
993    /// Once the stream has been fully purged from memory, any additional frames
994    /// received for that stream will result in a connection level protocol
995    /// error, forcing the connection to terminate.
996    ///
997    /// The default value is 30 seconds.
998    ///
999    /// # Examples
1000    ///
1001    /// ```
1002    /// # use tokio::io::{AsyncRead, AsyncWrite};
1003    /// # use h2::server::*;
1004    /// # use std::time::Duration;
1005    /// #
1006    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1007    /// # -> Handshake<T>
1008    /// # {
1009    /// // `server_fut` is a future representing the completion of the HTTP/2
1010    /// // handshake.
1011    /// let server_fut = Builder::new()
1012    ///     .reset_stream_duration(Duration::from_secs(10))
1013    ///     .handshake(my_io);
1014    /// # server_fut
1015    /// # }
1016    /// #
1017    /// # pub fn main() {}
1018    /// ```
1019    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
1020        self.reset_stream_duration = dur;
1021        self
1022    }
1023
1024    /// Enables the [extended CONNECT protocol].
1025    ///
1026    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1027    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1028        self.settings.set_enable_connect_protocol(Some(1));
1029        self
1030    }
1031
1032    /// Creates a new configured HTTP/2 server backed by `io`.
1033    ///
1034    /// It is expected that `io` already be in an appropriate state to commence
1035    /// the [HTTP/2 handshake]. See [Handshake] for more details.
1036    ///
1037    /// Returns a future which resolves to the [`Connection`] instance once the
1038    /// HTTP/2 handshake has been completed.
1039    ///
1040    /// This function also allows the caller to configure the send payload data
1041    /// type. See [Outbound data type] for more details.
1042    ///
1043    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1044    /// [Handshake]: ../index.html#handshake
1045    /// [`Connection`]: struct.Connection.html
1046    /// [Outbound data type]: ../index.html#outbound-data-type.
1047    ///
1048    /// # Examples
1049    ///
1050    /// Basic usage:
1051    ///
1052    /// ```
1053    /// # use tokio::io::{AsyncRead, AsyncWrite};
1054    /// # use h2::server::*;
1055    /// #
1056    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1057    /// # -> Handshake<T>
1058    /// # {
1059    /// // `server_fut` is a future representing the completion of the HTTP/2
1060    /// // handshake.
1061    /// let server_fut = Builder::new()
1062    ///     .handshake(my_io);
1063    /// # server_fut
1064    /// # }
1065    /// #
1066    /// # pub fn main() {}
1067    /// ```
1068    ///
1069    /// Configures the send-payload data type. In this case, the outbound data
1070    /// type will be `&'static [u8]`.
1071    ///
1072    /// ```
1073    /// # use tokio::io::{AsyncRead, AsyncWrite};
1074    /// # use h2::server::*;
1075    /// #
1076    /// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1077    /// # -> Handshake<T, &'static [u8]>
1078    /// # {
1079    /// // `server_fut` is a future representing the completion of the HTTP/2
1080    /// // handshake.
1081    /// let server_fut: Handshake<_, &'static [u8]> = Builder::new()
1082    ///     .handshake(my_io);
1083    /// # server_fut
1084    /// # }
1085    /// #
1086    /// # pub fn main() {}
1087    /// ```
1088    pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
1089    where
1090        T: AsyncRead + AsyncWrite + Unpin,
1091        B: Buf,
1092    {
1093        Connection::handshake2(io, self.clone())
1094    }
1095}
1096
1097impl Default for Builder {
1098    fn default() -> Builder {
1099        Builder::new()
1100    }
1101}
1102
1103// ===== impl SendResponse =====
1104
1105impl<B: Buf> SendResponse<B> {
1106    /// Send a response to a client request.
1107    ///
1108    /// On success, a [`SendStream`] instance is returned. This instance can be
1109    /// used to stream the response body and send trailers.
1110    ///
1111    /// If a body or trailers will be sent on the returned [`SendStream`]
1112    /// instance, then `end_of_stream` must be set to `false` when calling this
1113    /// function.
1114    ///
1115    /// The [`SendResponse`] instance is already associated with a received
1116    /// request.  This function may only be called once per instance and only if
1117    /// [`send_reset`] has not been previously called.
1118    ///
1119    /// [`SendResponse`]: #
1120    /// [`SendStream`]: ../struct.SendStream.html
1121    /// [`send_reset`]: #method.send_reset
1122    pub fn send_response(
1123        &mut self,
1124        response: Response<()>,
1125        end_of_stream: bool,
1126    ) -> Result<SendStream<B>, crate::Error> {
1127        self.inner
1128            .send_response(response, end_of_stream)
1129            .map(|_| SendStream::new(self.inner.clone()))
1130            .map_err(Into::into)
1131    }
1132
1133    /// Push a request and response to the client
1134    ///
1135    /// On success, a [`SendResponse`] instance is returned.
1136    ///
1137    /// [`SendResponse`]: #
1138    pub fn push_request(
1139        &mut self,
1140        request: Request<()>,
1141    ) -> Result<SendPushedResponse<B>, crate::Error> {
1142        self.inner
1143            .send_push_promise(request)
1144            .map(|inner| SendPushedResponse {
1145                inner: SendResponse { inner },
1146            })
1147            .map_err(Into::into)
1148    }
1149
1150    /// Send a stream reset to the peer.
1151    ///
1152    /// This essentially cancels the stream, including any inbound or outbound
1153    /// data streams.
1154    ///
1155    /// If this function is called before [`send_response`], a call to
1156    /// [`send_response`] will result in an error.
1157    ///
1158    /// If this function is called while a [`SendStream`] instance is active,
1159    /// any further use of the instance will result in an error.
1160    ///
1161    /// This function should only be called once.
1162    ///
1163    /// [`send_response`]: #method.send_response
1164    /// [`SendStream`]: ../struct.SendStream.html
1165    pub fn send_reset(&mut self, reason: Reason) {
1166        self.inner.send_reset(reason)
1167    }
1168
1169    /// Polls to be notified when the client resets this stream.
1170    ///
1171    /// If stream is still open, this returns `Poll::Pending`, and
1172    /// registers the task to be notified if a `RST_STREAM` is received.
1173    ///
1174    /// If a `RST_STREAM` frame is received for this stream, calling this
1175    /// method will yield the `Reason` for the reset.
1176    ///
1177    /// # Error
1178    ///
1179    /// Calling this method after having called `send_response` will return
1180    /// a user error.
1181    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1182        self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders)
1183    }
1184
1185    /// Returns the stream ID of the response stream.
1186    ///
1187    /// # Panics
1188    ///
1189    /// If the lock on the stream store has been poisoned.
1190    pub fn stream_id(&self) -> crate::StreamId {
1191        crate::StreamId::from_internal(self.inner.stream_id())
1192    }
1193}
1194
1195// ===== impl SendPushedResponse =====
1196
1197impl<B: Buf> SendPushedResponse<B> {
1198    /// Send a response to a promised request.
1199    ///
1200    /// On success, a [`SendStream`] instance is returned. This instance can be
1201    /// used to stream the response body and send trailers.
1202    ///
1203    /// If a body or trailers will be sent on the returned [`SendStream`]
1204    /// instance, then `end_of_stream` must be set to `false` when calling this
1205    /// function.
1206    ///
1207    /// The [`SendPushedResponse`] instance is associated with a promised
1208    /// request.  This function may only be called once per instance and only if
1209    /// [`send_reset`] has not been previously called.
1210    ///
1211    /// [`SendPushedResponse`]: #
1212    /// [`SendStream`]: ../struct.SendStream.html
1213    /// [`send_reset`]: #method.send_reset
1214    pub fn send_response(
1215        &mut self,
1216        response: Response<()>,
1217        end_of_stream: bool,
1218    ) -> Result<SendStream<B>, crate::Error> {
1219        self.inner.send_response(response, end_of_stream)
1220    }
1221
1222    /// Send a stream reset to the peer.
1223    ///
1224    /// This essentially cancels the stream, including any inbound or outbound
1225    /// data streams.
1226    ///
1227    /// If this function is called before [`send_response`], a call to
1228    /// [`send_response`] will result in an error.
1229    ///
1230    /// If this function is called while a [`SendStream`] instance is active,
1231    /// any further use of the instance will result in an error.
1232    ///
1233    /// This function should only be called once.
1234    ///
1235    /// [`send_response`]: #method.send_response
1236    /// [`SendStream`]: ../struct.SendStream.html
1237    pub fn send_reset(&mut self, reason: Reason) {
1238        self.inner.send_reset(reason)
1239    }
1240
1241    /// Polls to be notified when the client resets this stream.
1242    ///
1243    /// If stream is still open, this returns `Poll::Pending`, and
1244    /// registers the task to be notified if a `RST_STREAM` is received.
1245    ///
1246    /// If a `RST_STREAM` frame is received for this stream, calling this
1247    /// method will yield the `Reason` for the reset.
1248    ///
1249    /// # Error
1250    ///
1251    /// Calling this method after having called `send_response` will return
1252    /// a user error.
1253    pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> {
1254        self.inner.poll_reset(cx)
1255    }
1256
1257    /// Returns the stream ID of the response stream.
1258    ///
1259    /// # Panics
1260    ///
1261    /// If the lock on the stream store has been poisoned.
1262    pub fn stream_id(&self) -> crate::StreamId {
1263        self.inner.stream_id()
1264    }
1265}
1266
1267// ===== impl Flush =====
1268
1269impl<T, B: Buf> Flush<T, B> {
1270    fn new(codec: Codec<T, B>) -> Self {
1271        Flush { codec: Some(codec) }
1272    }
1273}
1274
1275impl<T, B> Future for Flush<T, B>
1276where
1277    T: AsyncWrite + Unpin,
1278    B: Buf,
1279{
1280    type Output = Result<Codec<T, B>, crate::Error>;
1281
1282    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1283        // Flush the codec
1284        ready!(self.codec.as_mut().unwrap().flush(cx)).map_err(crate::Error::from_io)?;
1285
1286        // Return the codec
1287        Poll::Ready(Ok(self.codec.take().unwrap()))
1288    }
1289}
1290
1291impl<T, B: Buf> ReadPreface<T, B> {
1292    fn new(codec: Codec<T, B>) -> Self {
1293        ReadPreface {
1294            codec: Some(codec),
1295            pos: 0,
1296        }
1297    }
1298
1299    fn inner_mut(&mut self) -> &mut T {
1300        self.codec.as_mut().unwrap().get_mut()
1301    }
1302}
1303
1304impl<T, B> Future for ReadPreface<T, B>
1305where
1306    T: AsyncRead + Unpin,
1307    B: Buf,
1308{
1309    type Output = Result<Codec<T, B>, crate::Error>;
1310
1311    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1312        let mut buf = [0; 24];
1313        let mut rem = PREFACE.len() - self.pos;
1314
1315        while rem > 0 {
1316            let mut buf = ReadBuf::new(&mut buf[..rem]);
1317            ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
1318                .map_err(crate::Error::from_io)?;
1319            let n = buf.filled().len();
1320            if n == 0 {
1321                return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
1322                    io::ErrorKind::UnexpectedEof,
1323                    "connection closed before reading preface",
1324                ))));
1325            }
1326
1327            if &PREFACE[self.pos..self.pos + n] != buf.filled() {
1328                proto_err!(conn: "read_preface: invalid preface");
1329                // TODO: Should this just write the GO_AWAY frame directly?
1330                return Poll::Ready(Err(Error::library_go_away(Reason::PROTOCOL_ERROR).into()));
1331            }
1332
1333            self.pos += n;
1334            rem -= n; // TODO test
1335        }
1336
1337        Poll::Ready(Ok(self.codec.take().unwrap()))
1338    }
1339}
1340
1341// ===== impl Handshake =====
1342
1343impl<T, B: Buf> Future for Handshake<T, B>
1344where
1345    T: AsyncRead + AsyncWrite + Unpin,
1346    B: Buf,
1347{
1348    type Output = Result<Connection<T, B>, crate::Error>;
1349
1350    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1351        let span = self.span.clone(); // XXX(eliza): T_T
1352        let _e = span.enter();
1353        tracing::trace!(state = ?self.state);
1354
1355        loop {
1356            match &mut self.state {
1357                Handshaking::Flushing(flush) => {
1358                    // We're currently flushing a pending SETTINGS frame. Poll the
1359                    // flush future, and, if it's completed, advance our state to wait
1360                    // for the client preface.
1361                    let codec = match Pin::new(flush).poll(cx)? {
1362                        Poll::Pending => {
1363                            tracing::trace!(flush.poll = %"Pending");
1364                            return Poll::Pending;
1365                        }
1366                        Poll::Ready(flushed) => {
1367                            tracing::trace!(flush.poll = %"Ready");
1368                            flushed
1369                        }
1370                    };
1371                    self.state = Handshaking::ReadingPreface(
1372                        ReadPreface::new(codec).instrument(tracing::trace_span!("read_preface")),
1373                    );
1374                }
1375                Handshaking::ReadingPreface(read) => {
1376                    let codec = ready!(Pin::new(read).poll(cx)?);
1377
1378                    self.state = Handshaking::Done;
1379
1380                    let connection = proto::Connection::new(
1381                        codec,
1382                        Config {
1383                            next_stream_id: 2.into(),
1384                            // Server does not need to locally initiate any streams
1385                            initial_max_send_streams: 0,
1386                            max_send_buffer_size: self.builder.max_send_buffer_size,
1387                            reset_stream_duration: self.builder.reset_stream_duration,
1388                            reset_stream_max: self.builder.reset_stream_max,
1389                            remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
1390                            local_error_reset_streams_max: self
1391                                .builder
1392                                .local_max_error_reset_streams,
1393                            settings: self.builder.settings.clone(),
1394                        },
1395                    );
1396
1397                    tracing::trace!("connection established!");
1398                    let mut c = Connection { connection };
1399                    if let Some(sz) = self.builder.initial_target_connection_window_size {
1400                        c.set_target_window_size(sz);
1401                    }
1402
1403                    return Poll::Ready(Ok(c));
1404                }
1405                Handshaking::Done => {
1406                    panic!("Handshaking::poll() called again after handshaking was complete")
1407                }
1408            }
1409        }
1410    }
1411}
1412
1413impl<T, B> fmt::Debug for Handshake<T, B>
1414where
1415    T: AsyncRead + AsyncWrite + fmt::Debug,
1416    B: fmt::Debug + Buf,
1417{
1418    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1419        write!(fmt, "server::Handshake")
1420    }
1421}
1422
1423impl Peer {
1424    pub fn convert_send_message(
1425        id: StreamId,
1426        response: Response<()>,
1427        end_of_stream: bool,
1428    ) -> frame::Headers {
1429        use http::response::Parts;
1430
1431        // Extract the components of the HTTP request
1432        let (
1433            Parts {
1434                status, headers, ..
1435            },
1436            _,
1437        ) = response.into_parts();
1438
1439        // Build the set pseudo header set. All requests will include `method`
1440        // and `path`.
1441        let pseudo = Pseudo::response(status);
1442
1443        // Create the HEADERS frame
1444        let mut frame = frame::Headers::new(id, pseudo, headers);
1445
1446        if end_of_stream {
1447            frame.set_end_stream()
1448        }
1449
1450        frame
1451    }
1452
1453    pub fn convert_push_message(
1454        stream_id: StreamId,
1455        promised_id: StreamId,
1456        request: Request<()>,
1457    ) -> Result<frame::PushPromise, UserError> {
1458        use http::request::Parts;
1459
1460        if let Err(e) = frame::PushPromise::validate_request(&request) {
1461            use PushPromiseHeaderError::*;
1462            match e {
1463                NotSafeAndCacheable => tracing::debug!(
1464                    ?promised_id,
1465                    "convert_push_message: method {} is not safe and cacheable",
1466                    request.method(),
1467                ),
1468                InvalidContentLength(e) => tracing::debug!(
1469                    ?promised_id,
1470                    "convert_push_message; promised request has invalid content-length {:?}",
1471                    e,
1472                ),
1473            }
1474            return Err(UserError::MalformedHeaders);
1475        }
1476
1477        // Extract the components of the HTTP request
1478        let (
1479            Parts {
1480                method,
1481                uri,
1482                headers,
1483                ..
1484            },
1485            _,
1486        ) = request.into_parts();
1487
1488        let pseudo = Pseudo::request(method, uri, None);
1489
1490        Ok(frame::PushPromise::new(
1491            stream_id,
1492            promised_id,
1493            pseudo,
1494            headers,
1495        ))
1496    }
1497}
1498
1499impl proto::Peer for Peer {
1500    type Poll = Request<()>;
1501
1502    const NAME: &'static str = "Server";
1503
1504    /*
1505    fn is_server() -> bool {
1506        true
1507    }
1508    */
1509
1510    fn r#dyn() -> proto::DynPeer {
1511        proto::DynPeer::Server
1512    }
1513
1514    fn convert_poll_message(
1515        pseudo: Pseudo,
1516        fields: HeaderMap,
1517        stream_id: StreamId,
1518    ) -> Result<Self::Poll, Error> {
1519        use http::{uri, Version};
1520
1521        let mut b = Request::builder();
1522
1523        macro_rules! malformed {
1524            ($($arg:tt)*) => {{
1525                tracing::debug!($($arg)*);
1526                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1527            }}
1528        }
1529
1530        b = b.version(Version::HTTP_2);
1531
1532        let is_connect;
1533        if let Some(method) = pseudo.method {
1534            is_connect = method == Method::CONNECT;
1535            b = b.method(method);
1536        } else {
1537            malformed!("malformed headers: missing method");
1538        }
1539
1540        let has_protocol = pseudo.protocol.is_some();
1541        if has_protocol {
1542            if is_connect {
1543                // Assert that we have the right type.
1544                b = b.extension::<crate::ext::Protocol>(pseudo.protocol.unwrap());
1545            } else {
1546                malformed!("malformed headers: :protocol on non-CONNECT request");
1547            }
1548        }
1549
1550        if pseudo.status.is_some() {
1551            malformed!("malformed headers: :status field on request");
1552        }
1553
1554        // Convert the URI
1555        let mut parts = uri::Parts::default();
1556
1557        // A request translated from HTTP/1 must not include the :authority
1558        // header
1559        if let Some(authority) = pseudo.authority {
1560            let maybe_authority = uri::Authority::from_maybe_shared(authority.clone().into_inner());
1561            parts.authority = Some(maybe_authority.or_else(|why| {
1562                malformed!(
1563                    "malformed headers: malformed authority ({:?}): {}",
1564                    authority,
1565                    why,
1566                )
1567            })?);
1568        }
1569
1570        // A :scheme is required, except CONNECT.
1571        if let Some(scheme) = pseudo.scheme {
1572            if is_connect && !has_protocol {
1573                malformed!("malformed headers: :scheme in CONNECT");
1574            }
1575            let maybe_scheme = scheme.parse();
1576            let scheme = maybe_scheme.or_else(|why| {
1577                malformed!(
1578                    "malformed headers: malformed scheme ({:?}): {}",
1579                    scheme,
1580                    why,
1581                )
1582            })?;
1583
1584            // It's not possible to build an `Uri` from a scheme and path. So,
1585            // after validating is was a valid scheme, we just have to drop it
1586            // if there isn't an :authority.
1587            if parts.authority.is_some() {
1588                parts.scheme = Some(scheme);
1589            }
1590        } else if !is_connect || has_protocol {
1591            malformed!("malformed headers: missing scheme");
1592        }
1593
1594        if let Some(path) = pseudo.path {
1595            if is_connect && !has_protocol {
1596                malformed!("malformed headers: :path in CONNECT");
1597            }
1598
1599            // This cannot be empty
1600            if path.is_empty() {
1601                malformed!("malformed headers: missing path");
1602            }
1603
1604            let maybe_path = uri::PathAndQuery::from_maybe_shared(path.clone().into_inner());
1605            parts.path_and_query = Some(maybe_path.or_else(|why| {
1606                malformed!("malformed headers: malformed path ({:?}): {}", path, why,)
1607            })?);
1608        } else if is_connect && has_protocol {
1609            malformed!("malformed headers: missing path in extended CONNECT");
1610        }
1611
1612        b = b.uri(parts);
1613
1614        let mut request = match b.body(()) {
1615            Ok(request) => request,
1616            Err(e) => {
1617                // TODO: Should there be more specialized handling for different
1618                // kinds of errors
1619                proto_err!(stream: "error building request: {}; stream={:?}", e, stream_id);
1620                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1621            }
1622        };
1623
1624        *request.headers_mut() = fields;
1625
1626        Ok(request)
1627    }
1628}
1629
1630// ===== impl Handshaking =====
1631
1632impl<T, B> fmt::Debug for Handshaking<T, B>
1633where
1634    B: Buf,
1635{
1636    #[inline]
1637    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
1638        match *self {
1639            Handshaking::Flushing(_) => f.write_str("Flushing(_)"),
1640            Handshaking::ReadingPreface(_) => f.write_str("ReadingPreface(_)"),
1641            Handshaking::Done => f.write_str("Done"),
1642        }
1643    }
1644}