1#[cfg(all(
47 any(feature = "http1", feature = "http2"),
48 not(all(feature = "http1", feature = "http2"))
49))]
50use std::marker::PhantomData;
51#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))]
52use std::time::Duration;
53
54#[cfg(feature = "http2")]
55use crate::common::io::Rewind;
56#[cfg(all(feature = "http1", feature = "http2"))]
57use crate::error::{Kind, Parse};
58#[cfg(feature = "http1")]
59use crate::upgrade::Upgraded;
60
61#[cfg(all(feature = "backports", feature = "http1"))]
62pub mod http1;
63#[cfg(all(feature = "backports", feature = "http2"))]
64pub mod http2;
65
66cfg_feature! {
67 #![any(feature = "http1", feature = "http2")]
68
69 use std::error::Error as StdError;
70 use std::fmt;
71 use std::task::{Context, Poll};
72 use std::pin::Pin;
73 use std::future::Future;
74 use std::marker::Unpin;
75 #[cfg(not(all(feature = "http1", feature = "http2")))]
76 use std::convert::Infallible;
77
78 use bytes::Bytes;
79 use pin_project_lite::pin_project;
80 use tokio::io::{AsyncRead, AsyncWrite};
81 use tracing::trace;
82
83 pub use super::server::Connecting;
84 use crate::body::{Body, HttpBody};
85 use crate::common::exec::{ConnStreamExec, Exec};
86 use crate::proto;
87 use crate::service::HttpService;
88
89 pub(super) use self::upgrades::UpgradeableConnection;
90}
91
92#[cfg(feature = "tcp")]
93pub use super::tcp::{AddrIncoming, AddrStream};
94
95#[derive(Clone, Debug)]
102#[cfg(any(feature = "http1", feature = "http2"))]
103#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
104#[cfg_attr(
105 feature = "deprecated",
106 deprecated(
107 note = "This struct will be replaced with `server::conn::http1::Builder` and `server::conn::http2::Builder` in 1.0, enable the \"backports\" feature to use them now."
108 )
109)]
110pub struct Http<E = Exec> {
111 pub(crate) exec: E,
112 h1_half_close: bool,
113 h1_keep_alive: bool,
114 h1_title_case_headers: bool,
115 h1_preserve_header_case: bool,
116 #[cfg(all(feature = "http1", feature = "runtime"))]
117 h1_header_read_timeout: Option<Duration>,
118 h1_writev: Option<bool>,
119 #[cfg(feature = "http2")]
120 h2_builder: proto::h2::server::Config,
121 mode: ConnectionMode,
122 max_buf_size: Option<usize>,
123 pipeline_flush: bool,
124}
125
126#[cfg(any(feature = "http1", feature = "http2"))]
128#[derive(Clone, Debug, PartialEq)]
129enum ConnectionMode {
130 #[cfg(feature = "http1")]
132 H1Only,
133 #[cfg(feature = "http2")]
135 H2Only,
136 #[cfg(all(feature = "http1", feature = "http2"))]
138 Fallback,
139}
140
141#[cfg(any(feature = "http1", feature = "http2"))]
142pin_project! {
143 #[must_use = "futures do nothing unless polled"]
147 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
148 pub struct Connection<T, S, E = Exec>
149 where
150 S: HttpService<Body>,
151 {
152 pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
153 fallback: Fallback<E>,
154 }
155}
156
157#[cfg(feature = "http1")]
158type Http1Dispatcher<T, B, S> =
159 proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
160
161#[cfg(all(not(feature = "http1"), feature = "http2"))]
162type Http1Dispatcher<T, B, S> = (Infallible, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
163
164#[cfg(feature = "http2")]
165type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>;
166
167#[cfg(all(not(feature = "http2"), feature = "http1"))]
168type Http2Server<T, B, S, E> = (
169 Infallible,
170 PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>,
171);
172
173#[cfg(any(feature = "http1", feature = "http2"))]
174pin_project! {
175 #[project = ProtoServerProj]
176 pub(super) enum ProtoServer<T, B, S, E = Exec>
177 where
178 S: HttpService<Body>,
179 B: HttpBody,
180 {
181 H1 {
182 #[pin]
183 h1: Http1Dispatcher<T, B, S>,
184 },
185 H2 {
186 #[pin]
187 h2: Http2Server<T, B, S, E>,
188 },
189 }
190}
191
192#[cfg(all(feature = "http1", feature = "http2"))]
193#[derive(Clone, Debug)]
194enum Fallback<E> {
195 ToHttp2(proto::h2::server::Config, E),
196 Http1Only,
197}
198
199#[cfg(all(
200 any(feature = "http1", feature = "http2"),
201 not(all(feature = "http1", feature = "http2"))
202))]
203type Fallback<E> = PhantomData<E>;
204
205#[cfg(all(feature = "http1", feature = "http2"))]
206impl<E> Fallback<E> {
207 fn to_h2(&self) -> bool {
208 match *self {
209 Fallback::ToHttp2(..) => true,
210 Fallback::Http1Only => false,
211 }
212 }
213}
214
215#[cfg(all(feature = "http1", feature = "http2"))]
216impl<E> Unpin for Fallback<E> {}
217
218#[derive(Debug)]
223#[cfg(any(feature = "http1", feature = "http2"))]
224#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
225#[cfg_attr(
226 feature = "deprecated",
227 deprecated(
228 note = "This struct will be replaced with `server::conn::http1::Parts` in 1.0, enable the \"backports\" feature to use them now."
229 )
230)]
231pub struct Parts<T, S> {
232 pub io: T,
234 pub read_buf: Bytes,
243 pub service: S,
245 _inner: (),
246}
247
248#[cfg_attr(feature = "deprecated", allow(deprecated))]
251#[cfg(any(feature = "http1", feature = "http2"))]
252impl Http {
253 pub fn new() -> Http {
256 Http {
257 exec: Exec::Default,
258 h1_half_close: false,
259 h1_keep_alive: true,
260 h1_title_case_headers: false,
261 h1_preserve_header_case: false,
262 #[cfg(all(feature = "http1", feature = "runtime"))]
263 h1_header_read_timeout: None,
264 h1_writev: None,
265 #[cfg(feature = "http2")]
266 h2_builder: Default::default(),
267 mode: ConnectionMode::default(),
268 max_buf_size: None,
269 pipeline_flush: false,
270 }
271 }
272}
273
274#[cfg_attr(feature = "deprecated", allow(deprecated))]
275#[cfg(any(feature = "http1", feature = "http2"))]
276impl<E> Http<E> {
277 #[cfg(feature = "http1")]
281 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
282 pub fn http1_only(&mut self, val: bool) -> &mut Self {
283 if val {
284 self.mode = ConnectionMode::H1Only;
285 } else {
286 #[cfg(feature = "http2")]
287 {
288 self.mode = ConnectionMode::Fallback;
289 }
290 }
291 self
292 }
293
294 #[cfg(feature = "http1")]
303 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
304 pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
305 self.h1_half_close = val;
306 self
307 }
308
309 #[cfg(feature = "http1")]
313 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
314 pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self {
315 self.h1_keep_alive = val;
316 self
317 }
318
319 #[cfg(feature = "http1")]
326 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
327 pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self {
328 self.h1_title_case_headers = enabled;
329 self
330 }
331
332 #[cfg(feature = "http1")]
346 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
347 pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self {
348 self.h1_preserve_header_case = enabled;
349 self
350 }
351
352 #[cfg(all(feature = "http1", feature = "runtime"))]
357 #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
358 pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
359 self.h1_header_read_timeout = Some(read_timeout);
360 self
361 }
362
363 #[inline]
376 #[cfg(feature = "http1")]
377 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
378 pub fn http1_writev(&mut self, val: bool) -> &mut Self {
379 self.h1_writev = Some(val);
380 self
381 }
382
383 #[cfg(feature = "http2")]
387 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
388 pub fn http2_only(&mut self, val: bool) -> &mut Self {
389 if val {
390 self.mode = ConnectionMode::H2Only;
391 } else {
392 #[cfg(feature = "http1")]
393 {
394 self.mode = ConnectionMode::Fallback;
395 }
396 }
397 self
398 }
399
400 #[cfg(feature = "http2")]
407 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
408 pub fn http2_max_pending_accept_reset_streams(
409 &mut self,
410 max: impl Into<Option<usize>>,
411 ) -> &mut Self {
412 self.h2_builder.max_pending_accept_reset_streams = max.into();
413
414 self
415 }
416
417 #[cfg(feature = "http2")]
424 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
425 pub fn http2_max_local_error_reset_streams(
426 &mut self,
427 max: impl Into<Option<usize>>,
428 ) -> &mut Self {
429 self.h2_builder.max_local_error_reset_streams = max.into();
430
431 self
432 }
433
434 #[cfg(feature = "http2")]
443 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
444 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
445 if let Some(sz) = sz.into() {
446 self.h2_builder.adaptive_window = false;
447 self.h2_builder.initial_stream_window_size = sz;
448 }
449 self
450 }
451
452 #[cfg(feature = "http2")]
458 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
459 pub fn http2_initial_connection_window_size(
460 &mut self,
461 sz: impl Into<Option<u32>>,
462 ) -> &mut Self {
463 if let Some(sz) = sz.into() {
464 self.h2_builder.adaptive_window = false;
465 self.h2_builder.initial_conn_window_size = sz;
466 }
467 self
468 }
469
470 #[cfg(feature = "http2")]
476 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
477 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
478 use proto::h2::SPEC_WINDOW_SIZE;
479
480 self.h2_builder.adaptive_window = enabled;
481 if enabled {
482 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
483 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
484 }
485 self
486 }
487
488 #[cfg(feature = "http2")]
494 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
495 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
496 if let Some(sz) = sz.into() {
497 self.h2_builder.max_frame_size = sz;
498 }
499 self
500 }
501
502 #[cfg(feature = "http2")]
509 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
510 pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
511 self.h2_builder.max_concurrent_streams = max.into();
512 self
513 }
514
515 #[cfg(feature = "runtime")]
526 #[cfg(feature = "http2")]
527 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
528 pub fn http2_keep_alive_interval(
529 &mut self,
530 interval: impl Into<Option<Duration>>,
531 ) -> &mut Self {
532 self.h2_builder.keep_alive_interval = interval.into();
533 self
534 }
535
536 #[cfg(feature = "runtime")]
547 #[cfg(feature = "http2")]
548 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
549 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
550 self.h2_builder.keep_alive_timeout = timeout;
551 self
552 }
553
554 #[cfg(feature = "http2")]
562 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
563 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
564 assert!(max <= std::u32::MAX as usize);
565 self.h2_builder.max_send_buffer_size = max;
566 self
567 }
568
569 #[cfg(feature = "http2")]
573 pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
574 self.h2_builder.enable_connect_protocol = true;
575 self
576 }
577
578 #[cfg(feature = "http2")]
582 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
583 pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
584 self.h2_builder.max_header_list_size = max;
585 self
586 }
587
588 #[cfg(feature = "http1")]
596 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
597 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
598 assert!(
599 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
600 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
601 );
602 self.max_buf_size = Some(max);
603 self
604 }
605
606 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
612 self.pipeline_flush = enabled;
613 self
614 }
615
616 pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
620 Http {
621 exec,
622 h1_half_close: self.h1_half_close,
623 h1_keep_alive: self.h1_keep_alive,
624 h1_title_case_headers: self.h1_title_case_headers,
625 h1_preserve_header_case: self.h1_preserve_header_case,
626 #[cfg(all(feature = "http1", feature = "runtime"))]
627 h1_header_read_timeout: self.h1_header_read_timeout,
628 h1_writev: self.h1_writev,
629 #[cfg(feature = "http2")]
630 h2_builder: self.h2_builder,
631 mode: self.mode,
632 max_buf_size: self.max_buf_size,
633 pipeline_flush: self.pipeline_flush,
634 }
635 }
636
637 pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
666 where
667 S: HttpService<Body, ResBody = Bd>,
668 S::Error: Into<Box<dyn StdError + Send + Sync>>,
669 Bd: HttpBody + 'static,
670 Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
671 I: AsyncRead + AsyncWrite + Unpin,
672 E: ConnStreamExec<S::Future, Bd>,
673 {
674 #[cfg(feature = "http1")]
675 macro_rules! h1 {
676 () => {{
677 let mut conn = proto::Conn::new(io);
678 if !self.h1_keep_alive {
679 conn.disable_keep_alive();
680 }
681 if self.h1_half_close {
682 conn.set_allow_half_close();
683 }
684 if self.h1_title_case_headers {
685 conn.set_title_case_headers();
686 }
687 if self.h1_preserve_header_case {
688 conn.set_preserve_header_case();
689 }
690 #[cfg(all(feature = "http1", feature = "runtime"))]
691 if let Some(header_read_timeout) = self.h1_header_read_timeout {
692 conn.set_http1_header_read_timeout(header_read_timeout);
693 }
694 if let Some(writev) = self.h1_writev {
695 if writev {
696 conn.set_write_strategy_queue();
697 } else {
698 conn.set_write_strategy_flatten();
699 }
700 }
701 conn.set_flush_pipeline(self.pipeline_flush);
702 if let Some(max) = self.max_buf_size {
703 conn.set_max_buf_size(max);
704 }
705 let sd = proto::h1::dispatch::Server::new(service);
706 ProtoServer::H1 {
707 h1: proto::h1::Dispatcher::new(sd, conn),
708 }
709 }};
710 }
711
712 let proto = match self.mode {
713 #[cfg(feature = "http1")]
714 #[cfg(not(feature = "http2"))]
715 ConnectionMode::H1Only => h1!(),
716 #[cfg(feature = "http2")]
717 #[cfg(feature = "http1")]
718 ConnectionMode::H1Only | ConnectionMode::Fallback => h1!(),
719 #[cfg(feature = "http2")]
720 ConnectionMode::H2Only => {
721 let rewind_io = Rewind::new(io);
722 let h2 =
723 proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
724 ProtoServer::H2 { h2 }
725 }
726 };
727
728 Connection {
729 conn: Some(proto),
730 #[cfg(all(feature = "http1", feature = "http2"))]
731 fallback: if self.mode == ConnectionMode::Fallback {
732 Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
733 } else {
734 Fallback::Http1Only
735 },
736 #[cfg(not(all(feature = "http1", feature = "http2")))]
737 fallback: PhantomData,
738 }
739 }
740}
741
742#[cfg(any(feature = "http1", feature = "http2"))]
745impl<I, B, S, E> Connection<I, S, E>
746where
747 S: HttpService<Body, ResBody = B>,
748 S::Error: Into<Box<dyn StdError + Send + Sync>>,
749 I: AsyncRead + AsyncWrite + Unpin,
750 B: HttpBody + 'static,
751 B::Error: Into<Box<dyn StdError + Send + Sync>>,
752 E: ConnStreamExec<S::Future, B>,
753{
754 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
765 match self.conn {
766 #[cfg(feature = "http1")]
767 Some(ProtoServer::H1 { ref mut h1, .. }) => {
768 h1.disable_keep_alive();
769 }
770 #[cfg(feature = "http2")]
771 Some(ProtoServer::H2 { ref mut h2 }) => {
772 h2.graceful_shutdown();
773 }
774 None => (),
775
776 #[cfg(not(feature = "http1"))]
777 Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {},
778 #[cfg(not(feature = "http2"))]
779 Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {},
780 }
781 }
782
783 #[cfg_attr(feature = "deprecated", allow(deprecated))]
793 pub fn into_parts(self) -> Parts<I, S> {
794 self.try_into_parts()
795 .unwrap_or_else(|| panic!("h2 cannot into_inner"))
796 }
797
798 #[cfg_attr(feature = "deprecated", allow(deprecated))]
802 pub fn try_into_parts(self) -> Option<Parts<I, S>> {
803 match self.conn.unwrap() {
804 #[cfg(feature = "http1")]
805 ProtoServer::H1 { h1, .. } => {
806 let (io, read_buf, dispatch) = h1.into_inner();
807 Some(Parts {
808 io,
809 read_buf,
810 service: dispatch.into_service(),
811 _inner: (),
812 })
813 }
814 ProtoServer::H2 { .. } => None,
815
816 #[cfg(not(feature = "http1"))]
817 ProtoServer::H1 { h1, .. } => match h1.0 {},
818 }
819 }
820
821 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
829 loop {
830 match *self.conn.as_mut().unwrap() {
831 #[cfg(feature = "http1")]
832 ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
833 Ok(()) => return Poll::Ready(Ok(())),
834 Err(e) => {
835 #[cfg(feature = "http2")]
836 match *e.kind() {
837 Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
838 self.upgrade_h2();
839 continue;
840 }
841 _ => (),
842 }
843
844 return Poll::Ready(Err(e));
845 }
846 },
847 #[cfg(feature = "http2")]
848 ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
849
850 #[cfg(not(feature = "http1"))]
851 ProtoServer::H1 { ref mut h1, .. } => match h1.0 {},
852 #[cfg(not(feature = "http2"))]
853 ProtoServer::H2 { ref mut h2 } => match h2.0 {},
854 };
855 }
856 }
857
858 #[cfg_attr(feature = "deprecated", allow(deprecated))]
865 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<I, S>>> {
866 let mut conn = Some(self);
867 futures_util::future::poll_fn(move |cx| {
868 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
869 Poll::Ready(
870 conn.take()
871 .unwrap()
872 .try_into_parts()
873 .ok_or_else(crate::Error::new_without_shutdown_not_h1),
874 )
875 })
876 }
877
878 #[cfg(all(feature = "http1", feature = "http2"))]
879 fn upgrade_h2(&mut self) {
880 trace!("Trying to upgrade connection to h2");
881 let conn = self.conn.take();
882
883 let (io, read_buf, dispatch) = match conn.unwrap() {
884 ProtoServer::H1 { h1, .. } => h1.into_inner(),
885 ProtoServer::H2 { .. } => {
886 panic!("h2 cannot into_inner");
887 }
888 };
889 let mut rewind_io = Rewind::new(io);
890 rewind_io.rewind(read_buf);
891 let (builder, exec) = match self.fallback {
892 Fallback::ToHttp2(ref builder, ref exec) => (builder, exec),
893 Fallback::Http1Only => unreachable!("upgrade_h2 with Fallback::Http1Only"),
894 };
895 let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
896
897 debug_assert!(self.conn.is_none());
898 self.conn = Some(ProtoServer::H2 { h2 });
899 }
900
901 pub fn with_upgrades(self) -> UpgradeableConnection<I, S, E>
905 where
906 I: Send,
907 {
908 UpgradeableConnection { inner: self }
909 }
910}
911
912#[cfg(any(feature = "http1", feature = "http2"))]
913impl<I, B, S, E> Future for Connection<I, S, E>
914where
915 S: HttpService<Body, ResBody = B>,
916 S::Error: Into<Box<dyn StdError + Send + Sync>>,
917 I: AsyncRead + AsyncWrite + Unpin,
918 B: HttpBody + 'static,
919 B::Error: Into<Box<dyn StdError + Send + Sync>>,
920 E: ConnStreamExec<S::Future, B>,
921{
922 type Output = crate::Result<()>;
923
924 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
925 loop {
926 match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) {
927 Ok(done) => {
928 match done {
929 proto::Dispatched::Shutdown => {}
930 #[cfg(feature = "http1")]
931 proto::Dispatched::Upgrade(pending) => {
932 pending.manual();
937 }
938 };
939 return Poll::Ready(Ok(()));
940 }
941 Err(e) => {
942 #[cfg(feature = "http1")]
943 #[cfg(feature = "http2")]
944 match *e.kind() {
945 Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => {
946 self.upgrade_h2();
947 continue;
948 }
949 _ => (),
950 }
951
952 return Poll::Ready(Err(e));
953 }
954 }
955 }
956 }
957}
958
959#[cfg(any(feature = "http1", feature = "http2"))]
960impl<I, S> fmt::Debug for Connection<I, S>
961where
962 S: HttpService<Body>,
963{
964 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
965 f.debug_struct("Connection").finish()
966 }
967}
968
969#[cfg(any(feature = "http1", feature = "http2"))]
972impl Default for ConnectionMode {
973 #[cfg(all(feature = "http1", feature = "http2"))]
974 fn default() -> ConnectionMode {
975 ConnectionMode::Fallback
976 }
977
978 #[cfg(all(feature = "http1", not(feature = "http2")))]
979 fn default() -> ConnectionMode {
980 ConnectionMode::H1Only
981 }
982
983 #[cfg(all(not(feature = "http1"), feature = "http2"))]
984 fn default() -> ConnectionMode {
985 ConnectionMode::H2Only
986 }
987}
988
989#[cfg(any(feature = "http1", feature = "http2"))]
992impl<T, B, S, E> Future for ProtoServer<T, B, S, E>
993where
994 T: AsyncRead + AsyncWrite + Unpin,
995 S: HttpService<Body, ResBody = B>,
996 S::Error: Into<Box<dyn StdError + Send + Sync>>,
997 B: HttpBody + 'static,
998 B::Error: Into<Box<dyn StdError + Send + Sync>>,
999 E: ConnStreamExec<S::Future, B>,
1000{
1001 type Output = crate::Result<proto::Dispatched>;
1002
1003 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1004 match self.project() {
1005 #[cfg(feature = "http1")]
1006 ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
1007 #[cfg(feature = "http2")]
1008 ProtoServerProj::H2 { h2 } => h2.poll(cx),
1009
1010 #[cfg(not(feature = "http1"))]
1011 ProtoServerProj::H1 { h1, .. } => match h1.0 {},
1012 #[cfg(not(feature = "http2"))]
1013 ProtoServerProj::H2 { h2 } => match h2.0 {},
1014 }
1015 }
1016}
1017
1018#[cfg(any(feature = "http1", feature = "http2"))]
1019mod upgrades {
1020 use super::*;
1021
1022 #[must_use = "futures do nothing unless polled"]
1027 #[allow(missing_debug_implementations)]
1028 pub struct UpgradeableConnection<T, S, E>
1029 where
1030 S: HttpService<Body>,
1031 {
1032 pub(super) inner: Connection<T, S, E>,
1033 }
1034
1035 impl<I, B, S, E> UpgradeableConnection<I, S, E>
1036 where
1037 S: HttpService<Body, ResBody = B>,
1038 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1039 I: AsyncRead + AsyncWrite + Unpin,
1040 B: HttpBody + 'static,
1041 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1042 E: ConnStreamExec<S::Future, B>,
1043 {
1044 pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
1049 Pin::new(&mut self.inner).graceful_shutdown()
1050 }
1051 }
1052
1053 impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
1054 where
1055 S: HttpService<Body, ResBody = B>,
1056 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1057 I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1058 B: HttpBody + 'static,
1059 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1060 E: ConnStreamExec<S::Future, B>,
1061 {
1062 type Output = crate::Result<()>;
1063
1064 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1065 loop {
1066 match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) {
1067 Ok(proto::Dispatched::Shutdown) => return Poll::Ready(Ok(())),
1068 #[cfg(feature = "http1")]
1069 Ok(proto::Dispatched::Upgrade(pending)) => {
1070 match self.inner.conn.take() {
1071 Some(ProtoServer::H1 { h1, .. }) => {
1072 let (io, buf, _) = h1.into_inner();
1073 pending.fulfill(Upgraded::new(io, buf));
1074 return Poll::Ready(Ok(()));
1075 }
1076 _ => {
1077 drop(pending);
1078 unreachable!("Upgrade expects h1")
1079 }
1080 };
1081 }
1082 Err(e) => {
1083 #[cfg(feature = "http1")]
1084 #[cfg(feature = "http2")]
1085 match *e.kind() {
1086 Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => {
1087 self.inner.upgrade_h2();
1088 continue;
1089 }
1090 _ => (),
1091 }
1092
1093 return Poll::Ready(Err(e));
1094 }
1095 }
1096 }
1097 }
1098 }
1099}