hyper/proto/h2/
server.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::marker::Unpin;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6#[cfg(feature = "runtime")]
7use std::time::Duration;
8
9use bytes::Bytes;
10use h2::server::{Connection, Handshake, SendResponse};
11use h2::{Reason, RecvStream};
12use http::{Method, Request};
13use pin_project_lite::pin_project;
14use tokio::io::{AsyncRead, AsyncWrite};
15use tracing::{debug, trace, warn};
16
17use super::{ping, PipeToSendStream, SendBuf};
18use crate::body::HttpBody;
19use crate::common::date;
20use crate::common::exec::ConnStreamExec;
21use crate::ext::Protocol;
22use crate::headers;
23use crate::proto::h2::ping::Recorder;
24use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
25use crate::proto::Dispatched;
26use crate::service::HttpService;
27
28use crate::upgrade::{OnUpgrade, Pending, Upgraded};
29use crate::{Body, Response};
30
31// Our defaults are chosen for the "majority" case, which usually are not
32// resource constrained, and so the spec default of 64kb can be too limiting
33// for performance.
34//
35// At the same time, a server more often has multiple clients connected, and
36// so is more likely to use more resources than a client would.
37const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
38const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
39const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
40const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
41const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20; // 16 MB "sane default" taken from golang http2
42const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
43
44#[derive(Clone, Debug)]
45pub(crate) struct Config {
46    pub(crate) adaptive_window: bool,
47    pub(crate) initial_conn_window_size: u32,
48    pub(crate) initial_stream_window_size: u32,
49    pub(crate) max_frame_size: u32,
50    pub(crate) enable_connect_protocol: bool,
51    pub(crate) max_concurrent_streams: Option<u32>,
52    pub(crate) max_pending_accept_reset_streams: Option<usize>,
53    pub(crate) max_local_error_reset_streams: Option<usize>,
54    #[cfg(feature = "runtime")]
55    pub(crate) keep_alive_interval: Option<Duration>,
56    #[cfg(feature = "runtime")]
57    pub(crate) keep_alive_timeout: Duration,
58    pub(crate) max_send_buffer_size: usize,
59    pub(crate) max_header_list_size: u32,
60}
61
62impl Default for Config {
63    fn default() -> Config {
64        Config {
65            adaptive_window: false,
66            initial_conn_window_size: DEFAULT_CONN_WINDOW,
67            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
68            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
69            enable_connect_protocol: false,
70            max_concurrent_streams: None,
71            max_pending_accept_reset_streams: None,
72            max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
73            #[cfg(feature = "runtime")]
74            keep_alive_interval: None,
75            #[cfg(feature = "runtime")]
76            keep_alive_timeout: Duration::from_secs(20),
77            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
78            max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
79        }
80    }
81}
82
83pin_project! {
84    pub(crate) struct Server<T, S, B, E>
85    where
86        S: HttpService<Body>,
87        B: HttpBody,
88    {
89        exec: E,
90        service: S,
91        state: State<T, B>,
92    }
93}
94
95enum State<T, B>
96where
97    B: HttpBody,
98{
99    Handshaking {
100        ping_config: ping::Config,
101        hs: Handshake<T, SendBuf<B::Data>>,
102    },
103    Serving(Serving<T, B>),
104    Closed,
105}
106
107struct Serving<T, B>
108where
109    B: HttpBody,
110{
111    ping: Option<(ping::Recorder, ping::Ponger)>,
112    conn: Connection<T, SendBuf<B::Data>>,
113    closing: Option<crate::Error>,
114}
115
116impl<T, S, B, E> Server<T, S, B, E>
117where
118    T: AsyncRead + AsyncWrite + Unpin,
119    S: HttpService<Body, ResBody = B>,
120    S::Error: Into<Box<dyn StdError + Send + Sync>>,
121    B: HttpBody + 'static,
122    E: ConnStreamExec<S::Future, B>,
123{
124    pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> {
125        let mut builder = h2::server::Builder::default();
126        builder
127            .initial_window_size(config.initial_stream_window_size)
128            .initial_connection_window_size(config.initial_conn_window_size)
129            .max_frame_size(config.max_frame_size)
130            .max_header_list_size(config.max_header_list_size)
131            .max_local_error_reset_streams(config.max_local_error_reset_streams)
132            .max_send_buffer_size(config.max_send_buffer_size);
133        if let Some(max) = config.max_concurrent_streams {
134            builder.max_concurrent_streams(max);
135        }
136        if let Some(max) = config.max_pending_accept_reset_streams {
137            builder.max_pending_accept_reset_streams(max);
138        }
139        if config.enable_connect_protocol {
140            builder.enable_connect_protocol();
141        }
142        let handshake = builder.handshake(io);
143
144        let bdp = if config.adaptive_window {
145            Some(config.initial_stream_window_size)
146        } else {
147            None
148        };
149
150        let ping_config = ping::Config {
151            bdp_initial_window: bdp,
152            #[cfg(feature = "runtime")]
153            keep_alive_interval: config.keep_alive_interval,
154            #[cfg(feature = "runtime")]
155            keep_alive_timeout: config.keep_alive_timeout,
156            // If keep-alive is enabled for servers, always enabled while
157            // idle, so it can more aggressively close dead connections.
158            #[cfg(feature = "runtime")]
159            keep_alive_while_idle: true,
160        };
161
162        Server {
163            exec,
164            state: State::Handshaking {
165                ping_config,
166                hs: handshake,
167            },
168            service,
169        }
170    }
171
172    pub(crate) fn graceful_shutdown(&mut self) {
173        trace!("graceful_shutdown");
174        match self.state {
175            State::Handshaking { .. } => {
176                // fall-through, to replace state with Closed
177            }
178            State::Serving(ref mut srv) => {
179                if srv.closing.is_none() {
180                    srv.conn.graceful_shutdown();
181                }
182                return;
183            }
184            State::Closed => {
185                return;
186            }
187        }
188        self.state = State::Closed;
189    }
190}
191
192impl<T, S, B, E> Future for Server<T, S, B, E>
193where
194    T: AsyncRead + AsyncWrite + Unpin,
195    S: HttpService<Body, ResBody = B>,
196    S::Error: Into<Box<dyn StdError + Send + Sync>>,
197    B: HttpBody + 'static,
198    E: ConnStreamExec<S::Future, B>,
199{
200    type Output = crate::Result<Dispatched>;
201
202    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203        let me = &mut *self;
204        loop {
205            let next = match me.state {
206                State::Handshaking {
207                    ref mut hs,
208                    ref ping_config,
209                } => {
210                    let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
211                    let ping = if ping_config.is_enabled() {
212                        let pp = conn.ping_pong().expect("conn.ping_pong");
213                        Some(ping::channel(pp, ping_config.clone()))
214                    } else {
215                        None
216                    };
217                    State::Serving(Serving {
218                        ping,
219                        conn,
220                        closing: None,
221                    })
222                }
223                State::Serving(ref mut srv) => {
224                    ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
225                    return Poll::Ready(Ok(Dispatched::Shutdown));
226                }
227                State::Closed => {
228                    // graceful_shutdown was called before handshaking finished,
229                    // nothing to do here...
230                    return Poll::Ready(Ok(Dispatched::Shutdown));
231                }
232            };
233            me.state = next;
234        }
235    }
236}
237
238impl<T, B> Serving<T, B>
239where
240    T: AsyncRead + AsyncWrite + Unpin,
241    B: HttpBody + 'static,
242{
243    fn poll_server<S, E>(
244        &mut self,
245        cx: &mut Context<'_>,
246        service: &mut S,
247        exec: &mut E,
248    ) -> Poll<crate::Result<()>>
249    where
250        S: HttpService<Body, ResBody = B>,
251        S::Error: Into<Box<dyn StdError + Send + Sync>>,
252        E: ConnStreamExec<S::Future, B>,
253    {
254        if self.closing.is_none() {
255            loop {
256                self.poll_ping(cx);
257
258                // Check that the service is ready to accept a new request.
259                //
260                // - If not, just drive the connection some.
261                // - If ready, try to accept a new request from the connection.
262                match service.poll_ready(cx) {
263                    Poll::Ready(Ok(())) => (),
264                    Poll::Pending => {
265                        // use `poll_closed` instead of `poll_accept`,
266                        // in order to avoid accepting a request.
267                        ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
268                        trace!("incoming connection complete");
269                        return Poll::Ready(Ok(()));
270                    }
271                    Poll::Ready(Err(err)) => {
272                        let err = crate::Error::new_user_service(err);
273                        debug!("service closed: {}", err);
274
275                        let reason = err.h2_reason();
276                        if reason == Reason::NO_ERROR {
277                            // NO_ERROR is only used for graceful shutdowns...
278                            trace!("interpreting NO_ERROR user error as graceful_shutdown");
279                            self.conn.graceful_shutdown();
280                        } else {
281                            trace!("abruptly shutting down with {:?}", reason);
282                            self.conn.abrupt_shutdown(reason);
283                        }
284                        self.closing = Some(err);
285                        break;
286                    }
287                }
288
289                // When the service is ready, accepts an incoming request.
290                match ready!(self.conn.poll_accept(cx)) {
291                    Some(Ok((req, mut respond))) => {
292                        trace!("incoming request");
293                        let content_length = headers::content_length_parse_all(req.headers());
294                        let ping = self
295                            .ping
296                            .as_ref()
297                            .map(|ping| ping.0.clone())
298                            .unwrap_or_else(ping::disabled);
299
300                        // Record the headers received
301                        ping.record_non_data();
302
303                        let is_connect = req.method() == Method::CONNECT;
304                        let (mut parts, stream) = req.into_parts();
305                        let (mut req, connect_parts) = if !is_connect {
306                            (
307                                Request::from_parts(
308                                    parts,
309                                    crate::Body::h2(stream, content_length.into(), ping),
310                                ),
311                                None,
312                            )
313                        } else {
314                            if content_length.map_or(false, |len| len != 0) {
315                                warn!("h2 connect request with non-zero body not supported");
316                                respond.send_reset(h2::Reason::INTERNAL_ERROR);
317                                return Poll::Ready(Ok(()));
318                            }
319                            let (pending, upgrade) = crate::upgrade::pending();
320                            debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
321                            parts.extensions.insert(upgrade);
322                            (
323                                Request::from_parts(parts, crate::Body::empty()),
324                                Some(ConnectParts {
325                                    pending,
326                                    ping,
327                                    recv_stream: stream,
328                                }),
329                            )
330                        };
331
332                        if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
333                            req.extensions_mut().insert(Protocol::from_inner(protocol));
334                        }
335
336                        let fut = H2Stream::new(service.call(req), connect_parts, respond);
337                        exec.execute_h2stream(fut);
338                    }
339                    Some(Err(e)) => {
340                        return Poll::Ready(Err(crate::Error::new_h2(e)));
341                    }
342                    None => {
343                        // no more incoming streams...
344                        if let Some((ref ping, _)) = self.ping {
345                            ping.ensure_not_timed_out()?;
346                        }
347
348                        trace!("incoming connection complete");
349                        return Poll::Ready(Ok(()));
350                    }
351                }
352            }
353        }
354
355        debug_assert!(
356            self.closing.is_some(),
357            "poll_server broke loop without closing"
358        );
359
360        ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
361
362        Poll::Ready(Err(self.closing.take().expect("polled after error")))
363    }
364
365    fn poll_ping(&mut self, cx: &mut Context<'_>) {
366        if let Some((_, ref mut estimator)) = self.ping {
367            match estimator.poll(cx) {
368                Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
369                    self.conn.set_target_window_size(wnd);
370                    let _ = self.conn.set_initial_window_size(wnd);
371                }
372                #[cfg(feature = "runtime")]
373                Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
374                    debug!("keep-alive timed out, closing connection");
375                    self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
376                }
377                Poll::Pending => {}
378            }
379        }
380    }
381}
382
383pin_project! {
384    #[allow(missing_debug_implementations)]
385    pub struct H2Stream<F, B>
386    where
387        B: HttpBody,
388    {
389        reply: SendResponse<SendBuf<B::Data>>,
390        #[pin]
391        state: H2StreamState<F, B>,
392    }
393}
394
395pin_project! {
396    #[project = H2StreamStateProj]
397    enum H2StreamState<F, B>
398    where
399        B: HttpBody,
400    {
401        Service {
402            #[pin]
403            fut: F,
404            connect_parts: Option<ConnectParts>,
405        },
406        Body {
407            #[pin]
408            pipe: PipeToSendStream<B>,
409        },
410    }
411}
412
413struct ConnectParts {
414    pending: Pending,
415    ping: Recorder,
416    recv_stream: RecvStream,
417}
418
419impl<F, B> H2Stream<F, B>
420where
421    B: HttpBody,
422{
423    fn new(
424        fut: F,
425        connect_parts: Option<ConnectParts>,
426        respond: SendResponse<SendBuf<B::Data>>,
427    ) -> H2Stream<F, B> {
428        H2Stream {
429            reply: respond,
430            state: H2StreamState::Service { fut, connect_parts },
431        }
432    }
433}
434
435macro_rules! reply {
436    ($me:expr, $res:expr, $eos:expr) => {{
437        match $me.reply.send_response($res, $eos) {
438            Ok(tx) => tx,
439            Err(e) => {
440                debug!("send response error: {}", e);
441                $me.reply.send_reset(Reason::INTERNAL_ERROR);
442                return Poll::Ready(Err(crate::Error::new_h2(e)));
443            }
444        }
445    }};
446}
447
448impl<F, B, E> H2Stream<F, B>
449where
450    F: Future<Output = Result<Response<B>, E>>,
451    B: HttpBody,
452    B::Data: 'static,
453    B::Error: Into<Box<dyn StdError + Send + Sync>>,
454    E: Into<Box<dyn StdError + Send + Sync>>,
455{
456    fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
457        let mut me = self.project();
458        loop {
459            let next = match me.state.as_mut().project() {
460                H2StreamStateProj::Service {
461                    fut: h,
462                    connect_parts,
463                } => {
464                    let res = match h.poll(cx) {
465                        Poll::Ready(Ok(r)) => r,
466                        Poll::Pending => {
467                            // Response is not yet ready, so we want to check if the client has sent a
468                            // RST_STREAM frame which would cancel the current request.
469                            if let Poll::Ready(reason) =
470                                me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
471                            {
472                                debug!("stream received RST_STREAM: {:?}", reason);
473                                return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
474                            }
475                            return Poll::Pending;
476                        }
477                        Poll::Ready(Err(e)) => {
478                            let err = crate::Error::new_user_service(e);
479                            warn!("http2 service errored: {}", err);
480                            me.reply.send_reset(err.h2_reason());
481                            return Poll::Ready(Err(err));
482                        }
483                    };
484
485                    let (head, body) = res.into_parts();
486                    let mut res = ::http::Response::from_parts(head, ());
487                    super::strip_connection_headers(res.headers_mut(), false);
488
489                    // set Date header if it isn't already set...
490                    res.headers_mut()
491                        .entry(::http::header::DATE)
492                        .or_insert_with(date::update_and_header_value);
493
494                    if let Some(connect_parts) = connect_parts.take() {
495                        if res.status().is_success() {
496                            if headers::content_length_parse_all(res.headers())
497                                .map_or(false, |len| len != 0)
498                            {
499                                warn!("h2 successful response to CONNECT request with body not supported");
500                                me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
501                                return Poll::Ready(Err(crate::Error::new_user_header()));
502                            }
503                            let send_stream = reply!(me, res, false);
504                            connect_parts.pending.fulfill(Upgraded::new(
505                                H2Upgraded {
506                                    ping: connect_parts.ping,
507                                    recv_stream: connect_parts.recv_stream,
508                                    send_stream: unsafe { UpgradedSendStream::new(send_stream) },
509                                    buf: Bytes::new(),
510                                },
511                                Bytes::new(),
512                            ));
513                            return Poll::Ready(Ok(()));
514                        }
515                    }
516
517                    if !body.is_end_stream() {
518                        // automatically set Content-Length from body...
519                        if let Some(len) = body.size_hint().exact() {
520                            headers::set_content_length_if_missing(res.headers_mut(), len);
521                        }
522
523                        let body_tx = reply!(me, res, false);
524                        H2StreamState::Body {
525                            pipe: PipeToSendStream::new(body, body_tx),
526                        }
527                    } else {
528                        reply!(me, res, true);
529                        return Poll::Ready(Ok(()));
530                    }
531                }
532                H2StreamStateProj::Body { pipe } => {
533                    return pipe.poll(cx);
534                }
535            };
536            me.state.set(next);
537        }
538    }
539}
540
541impl<F, B, E> Future for H2Stream<F, B>
542where
543    F: Future<Output = Result<Response<B>, E>>,
544    B: HttpBody,
545    B::Data: 'static,
546    B::Error: Into<Box<dyn StdError + Send + Sync>>,
547    E: Into<Box<dyn StdError + Send + Sync>>,
548{
549    type Output = ();
550
551    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552        self.poll2(cx).map(|res| {
553            if let Err(e) = res {
554                debug!("stream error: {}", e);
555            }
556        })
557    }
558}