hyper/server/
conn.rs

1//! Lower-level Server connection API.
2//!
3//! The types in this module are to provide a lower-level API based around a
4//! single connection. Accepting a connection and binding it with a service
5//! are not handled at this level. This module provides the building blocks to
6//! customize those things externally.
7//!
8//! If you don't have need to manage connections yourself, consider using the
9//! higher-level [Server](super) API.
10//!
11//! ## Example
12//! A simple example that uses the `Http` struct to talk HTTP over a Tokio TCP stream
13//! ```no_run
14//! # #[cfg(all(feature = "http1", feature = "runtime"))]
15//! # mod rt {
16//! use http::{Request, Response, StatusCode};
17//! use hyper::{server::conn::Http, service::service_fn, Body};
18//! use std::{net::SocketAddr, convert::Infallible};
19//! use tokio::net::TcpListener;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
23//!     let addr: SocketAddr = ([127, 0, 0, 1], 8080).into();
24//!
25//!     let mut tcp_listener = TcpListener::bind(addr).await?;
26//!     loop {
27//!         let (tcp_stream, _) = tcp_listener.accept().await?;
28//!         tokio::task::spawn(async move {
29//!             if let Err(http_err) = Http::new()
30//!                     .http1_only(true)
31//!                     .http1_keep_alive(true)
32//!                     .serve_connection(tcp_stream, service_fn(hello))
33//!                     .await {
34//!                 eprintln!("Error while serving HTTP connection: {}", http_err);
35//!             }
36//!         });
37//!     }
38//! }
39//!
40//! async fn hello(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
41//!    Ok(Response::new(Body::from("Hello World!")))
42//! }
43//! # }
44//! ```
45
46#[cfg(all(
47    any(feature = "http1", feature = "http2"),
48    not(all(feature = "http1", feature = "http2"))
49))]
50use std::marker::PhantomData;
51#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))]
52use std::time::Duration;
53
54#[cfg(feature = "http2")]
55use crate::common::io::Rewind;
56#[cfg(all(feature = "http1", feature = "http2"))]
57use crate::error::{Kind, Parse};
58#[cfg(feature = "http1")]
59use crate::upgrade::Upgraded;
60
61#[cfg(all(feature = "backports", feature = "http1"))]
62pub mod http1;
63#[cfg(all(feature = "backports", feature = "http2"))]
64pub mod http2;
65
66cfg_feature! {
67    #![any(feature = "http1", feature = "http2")]
68
69    use std::error::Error as StdError;
70    use std::fmt;
71    use std::task::{Context, Poll};
72    use std::pin::Pin;
73    use std::future::Future;
74    use std::marker::Unpin;
75    #[cfg(not(all(feature = "http1", feature = "http2")))]
76    use std::convert::Infallible;
77
78    use bytes::Bytes;
79    use pin_project_lite::pin_project;
80    use tokio::io::{AsyncRead, AsyncWrite};
81    use tracing::trace;
82
83    pub use super::server::Connecting;
84    use crate::body::{Body, HttpBody};
85    use crate::common::exec::{ConnStreamExec, Exec};
86    use crate::proto;
87    use crate::service::HttpService;
88
89    pub(super) use self::upgrades::UpgradeableConnection;
90}
91
92#[cfg(feature = "tcp")]
93pub use super::tcp::{AddrIncoming, AddrStream};
94
95/// A lower-level configuration of the HTTP protocol.
96///
97/// This structure is used to configure options for an HTTP server connection.
98///
99/// If you don't have need to manage connections yourself, consider using the
100/// higher-level [Server](super) API.
101#[derive(Clone, Debug)]
102#[cfg(any(feature = "http1", feature = "http2"))]
103#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
104#[cfg_attr(
105    feature = "deprecated",
106    deprecated(
107        note = "This struct will be replaced with `server::conn::http1::Builder` and `server::conn::http2::Builder` in 1.0, enable the \"backports\" feature to use them now."
108    )
109)]
110pub struct Http<E = Exec> {
111    pub(crate) exec: E,
112    h1_half_close: bool,
113    h1_keep_alive: bool,
114    h1_title_case_headers: bool,
115    h1_preserve_header_case: bool,
116    #[cfg(all(feature = "http1", feature = "runtime"))]
117    h1_header_read_timeout: Option<Duration>,
118    h1_writev: Option<bool>,
119    #[cfg(feature = "http2")]
120    h2_builder: proto::h2::server::Config,
121    mode: ConnectionMode,
122    max_buf_size: Option<usize>,
123    pipeline_flush: bool,
124}
125
126/// The internal mode of HTTP protocol which indicates the behavior when a parse error occurs.
127#[cfg(any(feature = "http1", feature = "http2"))]
128#[derive(Clone, Debug, PartialEq)]
129enum ConnectionMode {
130    /// Always use HTTP/1 and do not upgrade when a parse error occurs.
131    #[cfg(feature = "http1")]
132    H1Only,
133    /// Always use HTTP/2.
134    #[cfg(feature = "http2")]
135    H2Only,
136    /// Use HTTP/1 and try to upgrade to h2 when a parse error occurs.
137    #[cfg(all(feature = "http1", feature = "http2"))]
138    Fallback,
139}
140
141#[cfg(any(feature = "http1", feature = "http2"))]
142pin_project! {
143    /// A future binding a connection with a Service.
144    ///
145    /// Polling this future will drive HTTP forward.
146    #[must_use = "futures do nothing unless polled"]
147    #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
148    pub struct Connection<T, S, E = Exec>
149    where
150        S: HttpService<Body>,
151    {
152        pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
153        fallback: Fallback<E>,
154    }
155}
156
157#[cfg(feature = "http1")]
158type Http1Dispatcher<T, B, S> =
159    proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
160
161#[cfg(all(not(feature = "http1"), feature = "http2"))]
162type Http1Dispatcher<T, B, S> = (Infallible, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
163
164#[cfg(feature = "http2")]
165type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>;
166
167#[cfg(all(not(feature = "http2"), feature = "http1"))]
168type Http2Server<T, B, S, E> = (
169    Infallible,
170    PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>,
171);
172
173#[cfg(any(feature = "http1", feature = "http2"))]
174pin_project! {
175    #[project = ProtoServerProj]
176    pub(super) enum ProtoServer<T, B, S, E = Exec>
177    where
178        S: HttpService<Body>,
179        B: HttpBody,
180    {
181        H1 {
182            #[pin]
183            h1: Http1Dispatcher<T, B, S>,
184        },
185        H2 {
186            #[pin]
187            h2: Http2Server<T, B, S, E>,
188        },
189    }
190}
191
192#[cfg(all(feature = "http1", feature = "http2"))]
193#[derive(Clone, Debug)]
194enum Fallback<E> {
195    ToHttp2(proto::h2::server::Config, E),
196    Http1Only,
197}
198
199#[cfg(all(
200    any(feature = "http1", feature = "http2"),
201    not(all(feature = "http1", feature = "http2"))
202))]
203type Fallback<E> = PhantomData<E>;
204
205#[cfg(all(feature = "http1", feature = "http2"))]
206impl<E> Fallback<E> {
207    fn to_h2(&self) -> bool {
208        match *self {
209            Fallback::ToHttp2(..) => true,
210            Fallback::Http1Only => false,
211        }
212    }
213}
214
215#[cfg(all(feature = "http1", feature = "http2"))]
216impl<E> Unpin for Fallback<E> {}
217
218/// Deconstructed parts of a `Connection`.
219///
220/// This allows taking apart a `Connection` at a later time, in order to
221/// reclaim the IO object, and additional related pieces.
222#[derive(Debug)]
223#[cfg(any(feature = "http1", feature = "http2"))]
224#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
225#[cfg_attr(
226    feature = "deprecated",
227    deprecated(
228        note = "This struct will be replaced with `server::conn::http1::Parts` in 1.0, enable the \"backports\" feature to use them now."
229    )
230)]
231pub struct Parts<T, S> {
232    /// The original IO object used in the handshake.
233    pub io: T,
234    /// A buffer of bytes that have been read but not processed as HTTP.
235    ///
236    /// If the client sent additional bytes after its last request, and
237    /// this connection "ended" with an upgrade, the read buffer will contain
238    /// those bytes.
239    ///
240    /// You will want to check for any existing bytes if you plan to continue
241    /// communicating on the IO object.
242    pub read_buf: Bytes,
243    /// The `Service` used to serve this connection.
244    pub service: S,
245    _inner: (),
246}
247
248// ===== impl Http =====
249
250#[cfg_attr(feature = "deprecated", allow(deprecated))]
251#[cfg(any(feature = "http1", feature = "http2"))]
252impl Http {
253    /// Creates a new instance of the HTTP protocol, ready to spawn a server or
254    /// start accepting connections.
255    pub fn new() -> Http {
256        Http {
257            exec: Exec::Default,
258            h1_half_close: false,
259            h1_keep_alive: true,
260            h1_title_case_headers: false,
261            h1_preserve_header_case: false,
262            #[cfg(all(feature = "http1", feature = "runtime"))]
263            h1_header_read_timeout: None,
264            h1_writev: None,
265            #[cfg(feature = "http2")]
266            h2_builder: Default::default(),
267            mode: ConnectionMode::default(),
268            max_buf_size: None,
269            pipeline_flush: false,
270        }
271    }
272}
273
274#[cfg_attr(feature = "deprecated", allow(deprecated))]
275#[cfg(any(feature = "http1", feature = "http2"))]
276impl<E> Http<E> {
277    /// Sets whether HTTP1 is required.
278    ///
279    /// Default is false
280    #[cfg(feature = "http1")]
281    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
282    pub fn http1_only(&mut self, val: bool) -> &mut Self {
283        if val {
284            self.mode = ConnectionMode::H1Only;
285        } else {
286            #[cfg(feature = "http2")]
287            {
288                self.mode = ConnectionMode::Fallback;
289            }
290        }
291        self
292    }
293
294    /// Set whether HTTP/1 connections should support half-closures.
295    ///
296    /// Clients can chose to shutdown their write-side while waiting
297    /// for the server to respond. Setting this to `true` will
298    /// prevent closing the connection immediately if `read`
299    /// detects an EOF in the middle of a request.
300    ///
301    /// Default is `false`.
302    #[cfg(feature = "http1")]
303    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
304    pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
305        self.h1_half_close = val;
306        self
307    }
308
309    /// Enables or disables HTTP/1 keep-alive.
310    ///
311    /// Default is true.
312    #[cfg(feature = "http1")]
313    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
314    pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
315        self.h1_keep_alive = val;
316        self
317    }
318
319    /// Set whether HTTP/1 connections will write header names as title case at
320    /// the socket level.
321    ///
322    /// Note that this setting does not affect HTTP/2.
323    ///
324    /// Default is false.
325    #[cfg(feature = "http1")]
326    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
327    pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self {
328        self.h1_title_case_headers = enabled;
329        self
330    }
331
332    /// Set whether to support preserving original header cases.
333    ///
334    /// Currently, this will record the original cases received, and store them
335    /// in a private extension on the `Request`. It will also look for and use
336    /// such an extension in any provided `Response`.
337    ///
338    /// Since the relevant extension is still private, there is no way to
339    /// interact with the original cases. The only effect this can have now is
340    /// to forward the cases in a proxy-like fashion.
341    ///
342    /// Note that this setting does not affect HTTP/2.
343    ///
344    /// Default is false.
345    #[cfg(feature = "http1")]
346    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
347    pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self {
348        self.h1_preserve_header_case = enabled;
349        self
350    }
351
352    /// Set a timeout for reading client request headers. If a client does not
353    /// transmit the entire header within this time, the connection is closed.
354    ///
355    /// Default is None.
356    #[cfg(all(feature = "http1", feature = "runtime"))]
357    #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
358    pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
359        self.h1_header_read_timeout = Some(read_timeout);
360        self
361    }
362
363    /// Set whether HTTP/1 connections should try to use vectored writes,
364    /// or always flatten into a single buffer.
365    ///
366    /// Note that setting this to false may mean more copies of body data,
367    /// but may also improve performance when an IO transport doesn't
368    /// support vectored writes well, such as most TLS implementations.
369    ///
370    /// Setting this to true will force hyper to use queued strategy
371    /// which may eliminate unnecessary cloning on some TLS backends
372    ///
373    /// Default is `auto`. In this mode hyper will try to guess which
374    /// mode to use
375    #[inline]
376    #[cfg(feature = "http1")]
377    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
378    pub fn http1_writev(&mut self, val: bool) -> &mut Self {
379        self.h1_writev = Some(val);
380        self
381    }
382
383    /// Sets whether HTTP2 is required.
384    ///
385    /// Default is false
386    #[cfg(feature = "http2")]
387    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
388    pub fn http2_only(&mut self, val: bool) -> &mut Self {
389        if val {
390            self.mode = ConnectionMode::H2Only;
391        } else {
392            #[cfg(feature = "http1")]
393            {
394                self.mode = ConnectionMode::Fallback;
395            }
396        }
397        self
398    }
399
400    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
401    ///
402    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
403    /// As of v0.3.17, it is 20.
404    ///
405    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
406    #[cfg(feature = "http2")]
407    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
408    pub fn http2_max_pending_accept_reset_streams(
409        &mut self,
410        max: impl Into<Option<usize>>,
411    ) -> &mut Self {
412        self.h2_builder.max_pending_accept_reset_streams = max.into();
413
414        self
415    }
416
417    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
418    ///
419    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
420    /// As of v0.3.17, it is 20.
421    ///
422    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
423    #[cfg(feature = "http2")]
424    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
425    pub fn http2_max_local_error_reset_streams(
426        &mut self,
427        max: impl Into<Option<usize>>,
428    ) -> &mut Self {
429        self.h2_builder.max_local_error_reset_streams = max.into();
430
431        self
432    }
433
434    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
435    /// stream-level flow control.
436    ///
437    /// Passing `None` will do nothing.
438    ///
439    /// If not set, hyper will use a default.
440    ///
441    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
442    #[cfg(feature = "http2")]
443    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
444    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
445        if let Some(sz) = sz.into() {
446            self.h2_builder.adaptive_window = false;
447            self.h2_builder.initial_stream_window_size = sz;
448        }
449        self
450    }
451
452    /// Sets the max connection-level flow control for HTTP2.
453    ///
454    /// Passing `None` will do nothing.
455    ///
456    /// If not set, hyper will use a default.
457    #[cfg(feature = "http2")]
458    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
459    pub fn http2_initial_connection_window_size(
460        &mut self,
461        sz: impl Into<Option<u32>>,
462    ) -> &mut Self {
463        if let Some(sz) = sz.into() {
464            self.h2_builder.adaptive_window = false;
465            self.h2_builder.initial_conn_window_size = sz;
466        }
467        self
468    }
469
470    /// Sets whether to use an adaptive flow control.
471    ///
472    /// Enabling this will override the limits set in
473    /// `http2_initial_stream_window_size` and
474    /// `http2_initial_connection_window_size`.
475    #[cfg(feature = "http2")]
476    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
477    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
478        use proto::h2::SPEC_WINDOW_SIZE;
479
480        self.h2_builder.adaptive_window = enabled;
481        if enabled {
482            self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
483            self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
484        }
485        self
486    }
487
488    /// Sets the maximum frame size to use for HTTP2.
489    ///
490    /// Passing `None` will do nothing.
491    ///
492    /// If not set, hyper will use a default.
493    #[cfg(feature = "http2")]
494    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
495    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
496        if let Some(sz) = sz.into() {
497            self.h2_builder.max_frame_size = sz;
498        }
499        self
500    }
501
502    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
503    /// connections.
504    ///
505    /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing.
506    ///
507    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
508    #[cfg(feature = "http2")]
509    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
510    pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
511        self.h2_builder.max_concurrent_streams = max.into();
512        self
513    }
514
515    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
516    /// connection alive.
517    ///
518    /// Pass `None` to disable HTTP2 keep-alive.
519    ///
520    /// Default is currently disabled.
521    ///
522    /// # Cargo Feature
523    ///
524    /// Requires the `runtime` cargo feature to be enabled.
525    #[cfg(feature = "runtime")]
526    #[cfg(feature = "http2")]
527    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
528    pub fn http2_keep_alive_interval(
529        &mut self,
530        interval: impl Into<Option<Duration>>,
531    ) -> &mut Self {
532        self.h2_builder.keep_alive_interval = interval.into();
533        self
534    }
535
536    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
537    ///
538    /// If the ping is not acknowledged within the timeout, the connection will
539    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
540    ///
541    /// Default is 20 seconds.
542    ///
543    /// # Cargo Feature
544    ///
545    /// Requires the `runtime` cargo feature to be enabled.
546    #[cfg(feature = "runtime")]
547    #[cfg(feature = "http2")]
548    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
549    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
550        self.h2_builder.keep_alive_timeout = timeout;
551        self
552    }
553
554    /// Set the maximum write buffer size for each HTTP/2 stream.
555    ///
556    /// Default is currently ~400KB, but may change.
557    ///
558    /// # Panics
559    ///
560    /// The value must be no larger than `u32::MAX`.
561    #[cfg(feature = "http2")]
562    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
563    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
564        assert!(max <= std::u32::MAX as usize);
565        self.h2_builder.max_send_buffer_size = max;
566        self
567    }
568
569    /// Enables the [extended CONNECT protocol].
570    ///
571    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
572    #[cfg(feature = "http2")]
573    pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
574        self.h2_builder.enable_connect_protocol = true;
575        self
576    }
577
578    /// Sets the max size of received header frames.
579    ///
580    /// Default is currently ~16MB, but may change.
581    #[cfg(feature = "http2")]
582    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
583    pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
584        self.h2_builder.max_header_list_size = max;
585        self
586    }
587
588    /// Set the maximum buffer size for the connection.
589    ///
590    /// Default is ~400kb.
591    ///
592    /// # Panics
593    ///
594    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
595    #[cfg(feature = "http1")]
596    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
597    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
598        assert!(
599            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
600            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
601        );
602        self.max_buf_size = Some(max);
603        self
604    }
605
606    /// Aggregates flushes to better support pipelined responses.
607    ///
608    /// Experimental, may have bugs.
609    ///
610    /// Default is false.
611    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
612        self.pipeline_flush = enabled;
613        self
614    }
615
616    /// Set the executor used to spawn background tasks.
617    ///
618    /// Default uses implicit default (like `tokio::spawn`).
619    pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
620        Http {
621            exec,
622            h1_half_close: self.h1_half_close,
623            h1_keep_alive: self.h1_keep_alive,
624            h1_title_case_headers: self.h1_title_case_headers,
625            h1_preserve_header_case: self.h1_preserve_header_case,
626            #[cfg(all(feature = "http1", feature = "runtime"))]
627            h1_header_read_timeout: self.h1_header_read_timeout,
628            h1_writev: self.h1_writev,
629            #[cfg(feature = "http2")]
630            h2_builder: self.h2_builder,
631            mode: self.mode,
632            max_buf_size: self.max_buf_size,
633            pipeline_flush: self.pipeline_flush,
634        }
635    }
636
637    /// Bind a connection together with a [`Service`](crate::service::Service).
638    ///
639    /// This returns a Future that must be polled in order for HTTP to be
640    /// driven on the connection.
641    ///
642    /// # Example
643    ///
644    /// ```
645    /// # use hyper::{Body, Request, Response};
646    /// # use hyper::service::Service;
647    /// # use hyper::server::conn::Http;
648    /// # use tokio::io::{AsyncRead, AsyncWrite};
649    /// # async fn run<I, S>(some_io: I, some_service: S)
650    /// # where
651    /// #     I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
652    /// #     S: Service<hyper::Request<Body>, Response=hyper::Response<Body>> + Send + 'static,
653    /// #     S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
654    /// #     S::Future: Send,
655    /// # {
656    /// let http = Http::new();
657    /// let conn = http.serve_connection(some_io, some_service);
658    ///
659    /// if let Err(e) = conn.await {
660    ///     eprintln!("server connection error: {}", e);
661    /// }
662    /// # }
663    /// # fn main() {}
664    /// ```
665    pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
666    where
667        S: HttpService<Body, ResBody = Bd>,
668        S::Error: Into<Box<dyn StdError + Send + Sync>>,
669        Bd: HttpBody + 'static,
670        Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
671        I: AsyncRead + AsyncWrite + Unpin,
672        E: ConnStreamExec<S::Future, Bd>,
673    {
674        #[cfg(feature = "http1")]
675        macro_rules! h1 {
676            () => {{
677                let mut conn = proto::Conn::new(io);
678                if !self.h1_keep_alive {
679                    conn.disable_keep_alive();
680                }
681                if self.h1_half_close {
682                    conn.set_allow_half_close();
683                }
684                if self.h1_title_case_headers {
685                    conn.set_title_case_headers();
686                }
687                if self.h1_preserve_header_case {
688                    conn.set_preserve_header_case();
689                }
690                #[cfg(all(feature = "http1", feature = "runtime"))]
691                if let Some(header_read_timeout) = self.h1_header_read_timeout {
692                    conn.set_http1_header_read_timeout(header_read_timeout);
693                }
694                if let Some(writev) = self.h1_writev {
695                    if writev {
696                        conn.set_write_strategy_queue();
697                    } else {
698                        conn.set_write_strategy_flatten();
699                    }
700                }
701                conn.set_flush_pipeline(self.pipeline_flush);
702                if let Some(max) = self.max_buf_size {
703                    conn.set_max_buf_size(max);
704                }
705                let sd = proto::h1::dispatch::Server::new(service);
706                ProtoServer::H1 {
707                    h1: proto::h1::Dispatcher::new(sd, conn),
708                }
709            }};
710        }
711
712        let proto = match self.mode {
713            #[cfg(feature = "http1")]
714            #[cfg(not(feature = "http2"))]
715            ConnectionMode::H1Only => h1!(),
716            #[cfg(feature = "http2")]
717            #[cfg(feature = "http1")]
718            ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(),
719            #[cfg(feature = "http2")]
720            ConnectionMode::H2Only => {
721                let rewind_io = Rewind::new(io);
722                let h2 =
723                    proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
724                ProtoServer::H2 { h2 }
725            }
726        };
727
728        Connection {
729            conn: Some(proto),
730            #[cfg(all(feature = "http1", feature = "http2"))]
731            fallback: if self.mode == ConnectionMode::Fallback {
732                Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
733            } else {
734                Fallback::Http1Only
735            },
736            #[cfg(not(all(feature = "http1", feature = "http2")))]
737            fallback: PhantomData,
738        }
739    }
740}
741
742// ===== impl Connection =====
743
744#[cfg(any(feature = "http1", feature = "http2"))]
745impl<I, B, S, E> Connection<I, S, E>
746where
747    S: HttpService<Body, ResBody = B>,
748    S::Error: Into<Box<dyn StdError + Send + Sync>>,
749    I: AsyncRead + AsyncWrite + Unpin,
750    B: HttpBody + 'static,
751    B::Error: Into<Box<dyn StdError + Send + Sync>>,
752    E: ConnStreamExec<S::Future, B>,
753{
754    /// Start a graceful shutdown process for this connection.
755    ///
756    /// This `Connection` should continue to be polled until shutdown
757    /// can finish.
758    ///
759    /// # Note
760    ///
761    /// This should only be called while the `Connection` future is still
762    /// pending. If called after `Connection::poll` has resolved, this does
763    /// nothing.
764    pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
765        match self.conn {
766            #[cfg(feature = "http1")]
767            Some(ProtoServer::H1 { ref mut h1, .. }) => {
768                h1.disable_keep_alive();
769            }
770            #[cfg(feature = "http2")]
771            Some(ProtoServer::H2 { ref mut h2 }) => {
772                h2.graceful_shutdown();
773            }
774            None => (),
775
776            #[cfg(not(feature = "http1"))]
777            Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {},
778            #[cfg(not(feature = "http2"))]
779            Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {},
780        }
781    }
782
783    /// Return the inner IO object, and additional information.
784    ///
785    /// If the IO object has been "rewound" the io will not contain those bytes rewound.
786    /// This should only be called after `poll_without_shutdown` signals
787    /// that the connection is "done". Otherwise, it may not have finished
788    /// flushing all necessary HTTP bytes.
789    ///
790    /// # Panics
791    /// This method will panic if this connection is using an h2 protocol.
792    #[cfg_attr(feature = "deprecated", allow(deprecated))]
793    pub fn into_parts(self) -> Parts<I, S> {
794        self.try_into_parts()
795            .unwrap_or_else(|| panic!("h2 cannot into_inner"))
796    }
797
798    /// Return the inner IO object, and additional information, if available.
799    ///
800    /// This method will return a `None` if this connection is using an h2 protocol.
801    #[cfg_attr(feature = "deprecated", allow(deprecated))]
802    pub fn try_into_parts(self) -> Option<Parts<I, S>> {
803        match self.conn.unwrap() {
804            #[cfg(feature = "http1")]
805            ProtoServer::H1 { h1, .. } => {
806                let (io, read_buf, dispatch) = h1.into_inner();
807                Some(Parts {
808                    io,
809                    read_buf,
810                    service: dispatch.into_service(),
811                    _inner: (),
812                })
813            }
814            ProtoServer::H2 { .. } => None,
815
816            #[cfg(not(feature = "http1"))]
817            ProtoServer::H1 { h1, .. } => match h1.0 {},
818        }
819    }
820
821    /// Poll the connection for completion, but without calling `shutdown`
822    /// on the underlying IO.
823    ///
824    /// This is useful to allow running a connection while doing an HTTP
825    /// upgrade. Once the upgrade is completed, the connection would be "done",
826    /// but it is not desired to actually shutdown the IO object. Instead you
827    /// would take it back using `into_parts`.
828    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
829        loop {
830            match *self.conn.as_mut().unwrap() {
831                #[cfg(feature = "http1")]
832                ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
833                    Ok(()) => return Poll::Ready(Ok(())),
834                    Err(e) => {
835                        #[cfg(feature = "http2")]
836                        match *e.kind() {
837                            Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
838                                self.upgrade_h2();
839                                continue;
840                            }
841                            _ => (),
842                        }
843
844                        return Poll::Ready(Err(e));
845                    }
846                },
847                #[cfg(feature = "http2")]
848                ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
849
850                #[cfg(not(feature = "http1"))]
851                ProtoServer::H1 { ref mut h1, .. } => match h1.0 {},
852                #[cfg(not(feature = "http2"))]
853                ProtoServer::H2 { ref mut h2 } => match h2.0 {},
854            };
855        }
856    }
857
858    /// Prevent shutdown of the underlying IO object at the end of service the request,
859    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
860    ///
861    /// # Error
862    ///
863    /// This errors if the underlying connection protocol is not HTTP/1.
864    #[cfg_attr(feature = "deprecated", allow(deprecated))]
865    pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> {
866        let mut conn = Some(self);
867        futures_util::future::poll_fn(move |cx| {
868            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
869            Poll::Ready(
870                conn.take()
871                    .unwrap()
872                    .try_into_parts()
873                    .ok_or_else(crate::Error::new_without_shutdown_not_h1),
874            )
875        })
876    }
877
878    #[cfg(all(feature = "http1", feature = "http2"))]
879    fn upgrade_h2(&mut self) {
880        trace!("Trying to upgrade connection to h2");
881        let conn = self.conn.take();
882
883        let (io, read_buf, dispatch) = match conn.unwrap() {
884            ProtoServer::H1 { h1, .. } => h1.into_inner(),
885            ProtoServer::H2 { .. } => {
886                panic!("h2 cannot into_inner");
887            }
888        };
889        let mut rewind_io = Rewind::new(io);
890        rewind_io.rewind(read_buf);
891        let (builder, exec) = match self.fallback {
892            Fallback::ToHttp2(ref builder, ref exec) => (builder, exec),
893            Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"),
894        };
895        let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
896
897        debug_assert!(self.conn.is_none());
898        self.conn = Some(ProtoServer::H2 { h2 });
899    }
900
901    /// Enable this connection to support higher-level HTTP upgrades.
902    ///
903    /// See [the `upgrade` module](crate::upgrade) for more.
904    pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E>
905    where
906        I: Send,
907    {
908        UpgradeableConnection { inner: self }
909    }
910}
911
912#[cfg(any(feature = "http1", feature = "http2"))]
913impl<I, B, S, E> Future for Connection<I, S, E>
914where
915    S: HttpService<Body, ResBody = B>,
916    S::Error: Into<Box<dyn StdError + Send + Sync>>,
917    I: AsyncRead + AsyncWrite + Unpin,
918    B: HttpBody + 'static,
919    B::Error: Into<Box<dyn StdError + Send + Sync>>,
920    E: ConnStreamExec<S::Future, B>,
921{
922    type Output = crate::Result<()>;
923
924    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
925        loop {
926            match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
927                Ok(done) => {
928                    match done {
929                        proto::Dispatched::Shutdown => {}
930                        #[cfg(feature = "http1")]
931                        proto::Dispatched::Upgrade(pending) => {
932                            // With no `Send` bound on `I`, we can't try to do
933                            // upgrades here. In case a user was trying to use
934                            // `Body::on_upgrade` with this API, send a special
935                            // error letting them know about that.
936                            pending.manual();
937                        }
938                    };
939                    return Poll::Ready(Ok(()));
940                }
941                Err(e) => {
942                    #[cfg(feature = "http1")]
943                    #[cfg(feature = "http2")]
944                    match *e.kind() {
945                        Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
946                            self.upgrade_h2();
947                            continue;
948                        }
949                        _ => (),
950                    }
951
952                    return Poll::Ready(Err(e));
953                }
954            }
955        }
956    }
957}
958
959#[cfg(any(feature = "http1", feature = "http2"))]
960impl<I, S> fmt::Debug for Connection<I, S>
961where
962    S: HttpService<Body>,
963{
964    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
965        f.debug_struct("Connection").finish()
966    }
967}
968
969// ===== impl ConnectionMode =====
970
971#[cfg(any(feature = "http1", feature = "http2"))]
972impl Default for ConnectionMode {
973    #[cfg(all(feature = "http1", feature = "http2"))]
974    fn default() -> ConnectionMode {
975        ConnectionMode::Fallback
976    }
977
978    #[cfg(all(feature = "http1", not(feature = "http2")))]
979    fn default() -> ConnectionMode {
980        ConnectionMode::H1Only
981    }
982
983    #[cfg(all(not(feature = "http1"), feature = "http2"))]
984    fn default() -> ConnectionMode {
985        ConnectionMode::H2Only
986    }
987}
988
989// ===== impl ProtoServer =====
990
991#[cfg(any(feature = "http1", feature = "http2"))]
992impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
993where
994    T: AsyncRead + AsyncWrite + Unpin,
995    S: HttpService<Body, ResBody = B>,
996    S::Error: Into<Box<dyn StdError + Send + Sync>>,
997    B: HttpBody + 'static,
998    B::Error: Into<Box<dyn StdError + Send + Sync>>,
999    E: ConnStreamExec<S::Future, B>,
1000{
1001    type Output = crate::Result<proto::Dispatched>;
1002
1003    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1004        match self.project() {
1005            #[cfg(feature = "http1")]
1006            ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
1007            #[cfg(feature = "http2")]
1008            ProtoServerProj::H2 { h2 } => h2.poll(cx),
1009
1010            #[cfg(not(feature = "http1"))]
1011            ProtoServerProj::H1 { h1, .. } => match h1.0 {},
1012            #[cfg(not(feature = "http2"))]
1013            ProtoServerProj::H2 { h2 } => match h2.0 {},
1014        }
1015    }
1016}
1017
1018#[cfg(any(feature = "http1", feature = "http2"))]
1019mod upgrades {
1020    use super::*;
1021
1022    // A future binding a connection with a Service with Upgrade support.
1023    //
1024    // This type is unnameable outside the crate, and so basically just an
1025    // `impl Future`, without requiring Rust 1.26.
1026    #[must_use = "futures do nothing unless polled"]
1027    #[allow(missing_debug_implementations)]
1028    pub struct UpgradeableConnection<T, S, E>
1029    where
1030        S: HttpService<Body>,
1031    {
1032        pub(super) inner: Connection<T, S, E>,
1033    }
1034
1035    impl<I, B, S, E> UpgradeableConnection<I, S, E>
1036    where
1037        S: HttpService<Body, ResBody = B>,
1038        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1039        I: AsyncRead + AsyncWrite + Unpin,
1040        B: HttpBody + 'static,
1041        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1042        E: ConnStreamExec<S::Future, B>,
1043    {
1044        /// Start a graceful shutdown process for this connection.
1045        ///
1046        /// This `Connection` should continue to be polled until shutdown
1047        /// can finish.
1048        pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
1049            Pin::new(&mut self.inner).graceful_shutdown()
1050        }
1051    }
1052
1053    impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
1054    where
1055        S: HttpService<Body, ResBody = B>,
1056        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1057        I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1058        B: HttpBody + 'static,
1059        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1060        E: ConnStreamExec<S::Future, B>,
1061    {
1062        type Output = crate::Result<()>;
1063
1064        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1065            loop {
1066                match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
1067                    Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
1068                    #[cfg(feature = "http1")]
1069                    Ok(proto::Dispatched::Upgrade(pending)) => {
1070                        match self.inner.conn.take() {
1071                            Some(ProtoServer::H1 { h1, .. }) => {
1072                                let (io, buf, _) = h1.into_inner();
1073                                pending.fulfill(Upgraded::new(io, buf));
1074                                return Poll::Ready(Ok(()));
1075                            }
1076                            _ => {
1077                                drop(pending);
1078                                unreachable!("Upgrade expects h1")
1079                            }
1080                        };
1081                    }
1082                    Err(e) => {
1083                        #[cfg(feature = "http1")]
1084                        #[cfg(feature = "http2")]
1085                        match *e.kind() {
1086                            Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
1087                                self.inner.upgrade_h2();
1088                                continue;
1089                            }
1090                            _ => (),
1091                        }
1092
1093                        return Poll::Ready(Err(e));
1094                    }
1095                }
1096            }
1097        }
1098    }
1099}