1use std::error::Error as StdError;
2use std::fmt;
3use std::future::Future;
4use std::marker::Unpin;
5#[cfg(feature = "tcp")]
6use std::net::{SocketAddr, TcpListener as StdTcpListener};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9#[cfg(feature = "tcp")]
10use std::time::Duration;
11
12use pin_project_lite::pin_project;
13
14use tokio::io::{AsyncRead, AsyncWrite};
15use tracing::trace;
16
17use super::accept::Accept;
18#[cfg(all(feature = "tcp"))]
19use super::tcp::AddrIncoming;
20use crate::body::{Body, HttpBody};
21use crate::common::exec::Exec;
22use crate::common::exec::{ConnStreamExec, NewSvcExec};
23use super::conn::{Connection, Http as Http_, UpgradeableConnection};
26use super::shutdown::{Graceful, GracefulWatcher};
27use crate::service::{HttpService, MakeServiceRef};
28
29use self::new_svc::NewSvcTask;
30
31pin_project! {
32 pub struct Server<I, S, E = Exec> {
39 #[pin]
40 incoming: I,
41 make_service: S,
42 protocol: Http_<E>,
43 }
44}
45
46#[derive(Debug)]
48#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
49pub struct Builder<I, E = Exec> {
50 incoming: I,
51 protocol: Http_<E>,
52}
53
54#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
57impl<I> Server<I, ()> {
58 pub fn builder(incoming: I) -> Builder<I> {
60 Builder {
61 incoming,
62 protocol: Http_::new(),
63 }
64 }
65}
66
67#[cfg(feature = "tcp")]
68#[cfg_attr(
69 docsrs,
70 doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
71)]
72impl Server<AddrIncoming, ()> {
73 pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
80 let incoming = AddrIncoming::new(addr).unwrap_or_else(|e| {
81 panic!("error binding to {}: {}", addr, e);
82 });
83 Server::builder(incoming)
84 }
85
86 pub fn try_bind(addr: &SocketAddr) -> crate::Result<Builder<AddrIncoming>> {
88 AddrIncoming::new(addr).map(Server::builder)
89 }
90
91 pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, crate::Error> {
93 AddrIncoming::from_std(listener).map(Server::builder)
94 }
95}
96
97#[cfg(feature = "tcp")]
98#[cfg_attr(
99 docsrs,
100 doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
101)]
102impl<S, E> Server<AddrIncoming, S, E> {
103 pub fn local_addr(&self) -> SocketAddr {
105 self.incoming.local_addr()
106 }
107}
108
109#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
110impl<I, IO, IE, S, E, B> Server<I, S, E>
111where
112 I: Accept<Conn = IO, Error = IE>,
113 IE: Into<Box<dyn StdError + Send + Sync>>,
114 IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
115 S: MakeServiceRef<IO, Body, ResBody = B>,
116 S::Error: Into<Box<dyn StdError + Send + Sync>>,
117 B: HttpBody + 'static,
118 B::Error: Into<Box<dyn StdError + Send + Sync>>,
119 E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
120{
121 pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
158 where
159 F: Future<Output = ()>,
160 E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
161 {
162 Graceful::new(self, signal)
163 }
164
165 fn poll_next_(
166 self: Pin<&mut Self>,
167 cx: &mut Context<'_>,
168 ) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
169 let me = self.project();
170 match ready!(me.make_service.poll_ready_ref(cx)) {
171 Ok(()) => (),
172 Err(e) => {
173 trace!("make_service closed");
174 return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e))));
175 }
176 }
177
178 if let Some(item) = ready!(me.incoming.poll_accept(cx)) {
179 let io = item.map_err(crate::Error::new_accept)?;
180 let new_fut = me.make_service.make_service_ref(&io);
181 Poll::Ready(Some(Ok(Connecting {
182 future: new_fut,
183 io: Some(io),
184 protocol: me.protocol.clone(),
185 })))
186 } else {
187 Poll::Ready(None)
188 }
189 }
190
191 pub(super) fn poll_watch<W>(
192 mut self: Pin<&mut Self>,
193 cx: &mut Context<'_>,
194 watcher: &W,
195 ) -> Poll<crate::Result<()>>
196 where
197 E: NewSvcExec<IO, S::Future, S::Service, E, W>,
198 W: Watcher<IO, S::Service, E>,
199 {
200 loop {
201 if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) {
202 let fut = NewSvcTask::new(connecting, watcher.clone());
203 self.as_mut().project().protocol.exec.execute_new_svc(fut);
204 } else {
205 return Poll::Ready(Ok(()));
206 }
207 }
208 }
209}
210
211#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
212impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
213where
214 I: Accept<Conn = IO, Error = IE>,
215 IE: Into<Box<dyn StdError + Send + Sync>>,
216 IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
217 S: MakeServiceRef<IO, Body, ResBody = B>,
218 S::Error: Into<Box<dyn StdError + Send + Sync>>,
219 B: HttpBody + 'static,
220 B::Error: Into<Box<dyn StdError + Send + Sync>>,
221 E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
222 E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
223{
224 type Output = crate::Result<()>;
225
226 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
227 self.poll_watch(cx, &NoopWatcher)
228 }
229}
230
231impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 let mut st = f.debug_struct("Server");
234 st.field("listener", &self.incoming);
235 st.finish()
236 }
237}
238
239#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
242impl<I, E> Builder<I, E> {
243 pub fn new(incoming: I, protocol: Http_<E>) -> Self {
247 Builder { incoming, protocol }
248 }
249
250 #[cfg(feature = "http1")]
254 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
255 pub fn http1_keepalive(mut self, val: bool) -> Self {
256 self.protocol.http1_keep_alive(val);
257 self
258 }
259
260 #[cfg(feature = "http1")]
269 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
270 pub fn http1_half_close(mut self, val: bool) -> Self {
271 self.protocol.http1_half_close(val);
272 self
273 }
274
275 #[cfg(feature = "http1")]
279 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
280 pub fn http1_max_buf_size(mut self, val: usize) -> Self {
281 self.protocol.max_buf_size(val);
282 self
283 }
284
285 #[doc(hidden)]
290 #[cfg(feature = "http1")]
291 pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
292 self.protocol.pipeline_flush(val);
293 self
294 }
295
296 #[cfg(feature = "http1")]
309 pub fn http1_writev(mut self, enabled: bool) -> Self {
310 self.protocol.http1_writev(enabled);
311 self
312 }
313
314 #[cfg(feature = "http1")]
321 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
322 pub fn http1_title_case_headers(mut self, val: bool) -> Self {
323 self.protocol.http1_title_case_headers(val);
324 self
325 }
326
327 #[cfg(feature = "http1")]
341 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
342 pub fn http1_preserve_header_case(mut self, val: bool) -> Self {
343 self.protocol.http1_preserve_header_case(val);
344 self
345 }
346
347 #[cfg(all(feature = "http1", feature = "runtime"))]
352 #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
353 pub fn http1_header_read_timeout(mut self, read_timeout: Duration) -> Self {
354 self.protocol.http1_header_read_timeout(read_timeout);
355 self
356 }
357
358 #[cfg(feature = "http1")]
362 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
363 pub fn http1_only(mut self, val: bool) -> Self {
364 self.protocol.http1_only(val);
365 self
366 }
367
368 #[cfg(feature = "http2")]
372 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
373 pub fn http2_only(mut self, val: bool) -> Self {
374 self.protocol.http2_only(val);
375 self
376 }
377
378 #[cfg(feature = "http2")]
384 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
385 pub fn http2_max_pending_accept_reset_streams(mut self, max: impl Into<Option<usize>>) -> Self {
386 self.protocol.http2_max_pending_accept_reset_streams(max);
387 self
388 }
389
390 #[cfg(feature = "http2")]
399 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
400 pub fn http2_max_local_error_reset_streams(mut self, max: impl Into<Option<usize>>) -> Self {
401 self.protocol.http2_max_local_error_reset_streams(max);
402 self
403 }
404
405 #[cfg(feature = "http2")]
414 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
415 pub fn http2_initial_stream_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
416 self.protocol.http2_initial_stream_window_size(sz.into());
417 self
418 }
419
420 #[cfg(feature = "http2")]
426 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
427 pub fn http2_initial_connection_window_size(mut self, sz: impl Into<Option<u32>>) -> Self {
428 self.protocol
429 .http2_initial_connection_window_size(sz.into());
430 self
431 }
432
433 #[cfg(feature = "http2")]
439 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
440 pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
441 self.protocol.http2_adaptive_window(enabled);
442 self
443 }
444
445 #[cfg(feature = "http2")]
451 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
452 pub fn http2_max_frame_size(mut self, sz: impl Into<Option<u32>>) -> Self {
453 self.protocol.http2_max_frame_size(sz);
454 self
455 }
456
457 #[cfg(feature = "http2")]
461 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
462 pub fn http2_max_header_list_size(mut self, max: u32) -> Self {
463 self.protocol.http2_max_header_list_size(max);
464 self
465 }
466
467 #[cfg(feature = "http2")]
474 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
475 pub fn http2_max_concurrent_streams(mut self, max: impl Into<Option<u32>>) -> Self {
476 self.protocol.http2_max_concurrent_streams(max.into());
477 self
478 }
479
480 #[cfg(all(feature = "runtime", feature = "http2"))]
491 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
492 pub fn http2_keep_alive_interval(mut self, interval: impl Into<Option<Duration>>) -> Self {
493 self.protocol.http2_keep_alive_interval(interval);
494 self
495 }
496
497 #[cfg(all(feature = "runtime", feature = "http2"))]
508 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
509 pub fn http2_keep_alive_timeout(mut self, timeout: Duration) -> Self {
510 self.protocol.http2_keep_alive_timeout(timeout);
511 self
512 }
513
514 #[cfg(feature = "http2")]
522 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
523 pub fn http2_max_send_buf_size(mut self, max: usize) -> Self {
524 self.protocol.http2_max_send_buf_size(max);
525 self
526 }
527
528 #[cfg(feature = "http2")]
532 pub fn http2_enable_connect_protocol(mut self) -> Self {
533 self.protocol.http2_enable_connect_protocol();
534 self
535 }
536
537 pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
541 Builder {
542 incoming: self.incoming,
543 protocol: self.protocol.with_executor(executor),
544 }
545 }
546
547 pub fn serve<S, B>(self, make_service: S) -> Server<I, S, E>
578 where
579 I: Accept,
580 I::Error: Into<Box<dyn StdError + Send + Sync>>,
581 I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
582 S: MakeServiceRef<I::Conn, Body, ResBody = B>,
583 S::Error: Into<Box<dyn StdError + Send + Sync>>,
584 B: HttpBody + 'static,
585 B::Error: Into<Box<dyn StdError + Send + Sync>>,
586 E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
587 E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
588 {
589 Server {
590 incoming: self.incoming,
591 make_service,
592 protocol: self.protocol.clone(),
593 }
594 }
595}
596
597#[cfg(feature = "tcp")]
598#[cfg_attr(
599 docsrs,
600 doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
601)]
602impl<E> Builder<AddrIncoming, E> {
603 pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
607 self.incoming.set_keepalive(keepalive);
608 self
609 }
610
611 pub fn tcp_keepalive_interval(mut self, interval: Option<Duration>) -> Self {
614 self.incoming.set_keepalive_interval(interval);
615 self
616 }
617
618 pub fn tcp_keepalive_retries(mut self, retries: Option<u32>) -> Self {
620 self.incoming.set_keepalive_retries(retries);
621 self
622 }
623
624 pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
626 self.incoming.set_nodelay(enabled);
627 self
628 }
629
630 pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self {
646 self.incoming.set_sleep_on_errors(val);
647 self
648 }
649
650 pub fn local_addr(&self) -> SocketAddr {
655 self.incoming.local_addr()
656 }
657}
658
659pub trait Watcher<I, S: HttpService<Body>, E>: Clone {
668 type Future: Future<Output = crate::Result<()>>;
669
670 fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future;
671}
672
673#[allow(missing_debug_implementations)]
674#[derive(Copy, Clone)]
675pub struct NoopWatcher;
676
677impl<I, S, E> Watcher<I, S, E> for NoopWatcher
678where
679 I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
680 S: HttpService<Body>,
681 E: ConnStreamExec<S::Future, S::ResBody>,
682 S::ResBody: 'static,
683 <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
684{
685 type Future = UpgradeableConnection<I, S, E>;
686
687 fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
688 conn
689 }
690}
691
692pub(crate) mod new_svc {
694 use std::error::Error as StdError;
695 use std::future::Future;
696 use std::marker::Unpin;
697 use std::pin::Pin;
698 use std::task::{Context, Poll};
699
700 use tokio::io::{AsyncRead, AsyncWrite};
701 use tracing::debug;
702
703 use super::{Connecting, Watcher};
704 use crate::body::{Body, HttpBody};
705 use crate::common::exec::ConnStreamExec;
706 use crate::service::HttpService;
707 use pin_project_lite::pin_project;
708
709 pin_project! {
720 #[allow(missing_debug_implementations)]
721 pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
722 #[pin]
723 state: State<I, N, S, E, W>,
724 }
725 }
726
727 pin_project! {
728 #[project = StateProj]
729 pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
730 Connecting {
731 #[pin]
732 connecting: Connecting<I, N, E>,
733 watcher: W,
734 },
735 Connected {
736 #[pin]
737 future: W::Future,
738 },
739 }
740 }
741
742 impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
743 pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
744 NewSvcTask {
745 state: State::Connecting {
746 connecting,
747 watcher,
748 },
749 }
750 }
751 }
752
753 impl<I, N, S, NE, B, E, W> Future for NewSvcTask<I, N, S, E, W>
754 where
755 I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
756 N: Future<Output = Result<S, NE>>,
757 NE: Into<Box<dyn StdError + Send + Sync>>,
758 S: HttpService<Body, ResBody = B>,
759 B: HttpBody + 'static,
760 B::Error: Into<Box<dyn StdError + Send + Sync>>,
761 E: ConnStreamExec<S::Future, B>,
762 W: Watcher<I, S, E>,
763 {
764 type Output = ();
765
766 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
767 let mut me = self.project();
772 loop {
773 let next = {
774 match me.state.as_mut().project() {
775 StateProj::Connecting {
776 connecting,
777 watcher,
778 } => {
779 let res = ready!(connecting.poll(cx));
780 let conn = match res {
781 Ok(conn) => conn,
782 Err(err) => {
783 let err = crate::Error::new_user_make_service(err);
784 debug!("connecting error: {}", err);
785 return Poll::Ready(());
786 }
787 };
788 let future = watcher.watch(conn.with_upgrades());
789 State::Connected { future }
790 }
791 StateProj::Connected { future } => {
792 return future.poll(cx).map(|res| {
793 if let Err(err) = res {
794 debug!("connection error: {}", err);
795 }
796 });
797 }
798 }
799 };
800
801 me.state.set(next);
802 }
803 }
804 }
805}
806
807pin_project! {
808 #[must_use = "futures do nothing unless polled"]
813 #[derive(Debug)]
814 #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
815 pub struct Connecting<I, F, E = Exec> {
816 #[pin]
817 future: F,
818 io: Option<I>,
819 protocol: Http_<E>,
820 }
821}
822
823impl<I, F, S, FE, E, B> Future for Connecting<I, F, E>
824where
825 I: AsyncRead + AsyncWrite + Unpin,
826 F: Future<Output = Result<S, FE>>,
827 S: HttpService<Body, ResBody = B>,
828 B: HttpBody + 'static,
829 B::Error: Into<Box<dyn StdError + Send + Sync>>,
830 E: ConnStreamExec<S::Future, B>,
831{
832 type Output = Result<Connection<I, S, E>, FE>;
833
834 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
835 let mut me = self.project();
836 let service = ready!(me.future.poll(cx))?;
837 let io = Option::take(&mut me.io).expect("polled after complete");
838 Poll::Ready(Ok(me.protocol.serve_connection(io, service)))
839 }
840}