1use std::error::Error as StdError;
2use std::future::Future;
3use std::marker::Unpin;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6#[cfg(feature = "runtime")]
7use std::time::Duration;
8
9use bytes::Bytes;
10use h2::server::{Connection, Handshake, SendResponse};
11use h2::{Reason, RecvStream};
12use http::{Method, Request};
13use pin_project_lite::pin_project;
14use tokio::io::{AsyncRead, AsyncWrite};
15use tracing::{debug, trace, warn};
16
17use super::{ping, PipeToSendStream, SendBuf};
18use crate::body::HttpBody;
19use crate::common::date;
20use crate::common::exec::ConnStreamExec;
21use crate::ext::Protocol;
22use crate::headers;
23use crate::proto::h2::ping::Recorder;
24use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
25use crate::proto::Dispatched;
26use crate::service::HttpService;
27
28use crate::upgrade::{OnUpgrade, Pending, Upgraded};
29use crate::{Body, Response};
30
31const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 16 << 20; const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
43
44#[derive(Clone, Debug)]
45pub(crate) struct Config {
46 pub(crate) adaptive_window: bool,
47 pub(crate) initial_conn_window_size: u32,
48 pub(crate) initial_stream_window_size: u32,
49 pub(crate) max_frame_size: u32,
50 pub(crate) enable_connect_protocol: bool,
51 pub(crate) max_concurrent_streams: Option<u32>,
52 pub(crate) max_pending_accept_reset_streams: Option<usize>,
53 pub(crate) max_local_error_reset_streams: Option<usize>,
54 #[cfg(feature = "runtime")]
55 pub(crate) keep_alive_interval: Option<Duration>,
56 #[cfg(feature = "runtime")]
57 pub(crate) keep_alive_timeout: Duration,
58 pub(crate) max_send_buffer_size: usize,
59 pub(crate) max_header_list_size: u32,
60}
61
62impl Default for Config {
63 fn default() -> Config {
64 Config {
65 adaptive_window: false,
66 initial_conn_window_size: DEFAULT_CONN_WINDOW,
67 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
68 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
69 enable_connect_protocol: false,
70 max_concurrent_streams: None,
71 max_pending_accept_reset_streams: None,
72 max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
73 #[cfg(feature = "runtime")]
74 keep_alive_interval: None,
75 #[cfg(feature = "runtime")]
76 keep_alive_timeout: Duration::from_secs(20),
77 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
78 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
79 }
80 }
81}
82
83pin_project! {
84 pub(crate) struct Server<T, S, B, E>
85 where
86 S: HttpService<Body>,
87 B: HttpBody,
88 {
89 exec: E,
90 service: S,
91 state: State<T, B>,
92 }
93}
94
95enum State<T, B>
96where
97 B: HttpBody,
98{
99 Handshaking {
100 ping_config: ping::Config,
101 hs: Handshake<T, SendBuf<B::Data>>,
102 },
103 Serving(Serving<T, B>),
104 Closed,
105}
106
107struct Serving<T, B>
108where
109 B: HttpBody,
110{
111 ping: Option<(ping::Recorder, ping::Ponger)>,
112 conn: Connection<T, SendBuf<B::Data>>,
113 closing: Option<crate::Error>,
114}
115
116impl<T, S, B, E> Server<T, S, B, E>
117where
118 T: AsyncRead + AsyncWrite + Unpin,
119 S: HttpService<Body, ResBody = B>,
120 S::Error: Into<Box<dyn StdError + Send + Sync>>,
121 B: HttpBody + 'static,
122 E: ConnStreamExec<S::Future, B>,
123{
124 pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> {
125 let mut builder = h2::server::Builder::default();
126 builder
127 .initial_window_size(config.initial_stream_window_size)
128 .initial_connection_window_size(config.initial_conn_window_size)
129 .max_frame_size(config.max_frame_size)
130 .max_header_list_size(config.max_header_list_size)
131 .max_local_error_reset_streams(config.max_local_error_reset_streams)
132 .max_send_buffer_size(config.max_send_buffer_size);
133 if let Some(max) = config.max_concurrent_streams {
134 builder.max_concurrent_streams(max);
135 }
136 if let Some(max) = config.max_pending_accept_reset_streams {
137 builder.max_pending_accept_reset_streams(max);
138 }
139 if config.enable_connect_protocol {
140 builder.enable_connect_protocol();
141 }
142 let handshake = builder.handshake(io);
143
144 let bdp = if config.adaptive_window {
145 Some(config.initial_stream_window_size)
146 } else {
147 None
148 };
149
150 let ping_config = ping::Config {
151 bdp_initial_window: bdp,
152 #[cfg(feature = "runtime")]
153 keep_alive_interval: config.keep_alive_interval,
154 #[cfg(feature = "runtime")]
155 keep_alive_timeout: config.keep_alive_timeout,
156 #[cfg(feature = "runtime")]
159 keep_alive_while_idle: true,
160 };
161
162 Server {
163 exec,
164 state: State::Handshaking {
165 ping_config,
166 hs: handshake,
167 },
168 service,
169 }
170 }
171
172 pub(crate) fn graceful_shutdown(&mut self) {
173 trace!("graceful_shutdown");
174 match self.state {
175 State::Handshaking { .. } => {
176 }
178 State::Serving(ref mut srv) => {
179 if srv.closing.is_none() {
180 srv.conn.graceful_shutdown();
181 }
182 return;
183 }
184 State::Closed => {
185 return;
186 }
187 }
188 self.state = State::Closed;
189 }
190}
191
192impl<T, S, B, E> Future for Server<T, S, B, E>
193where
194 T: AsyncRead + AsyncWrite + Unpin,
195 S: HttpService<Body, ResBody = B>,
196 S::Error: Into<Box<dyn StdError + Send + Sync>>,
197 B: HttpBody + 'static,
198 E: ConnStreamExec<S::Future, B>,
199{
200 type Output = crate::Result<Dispatched>;
201
202 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203 let me = &mut *self;
204 loop {
205 let next = match me.state {
206 State::Handshaking {
207 ref mut hs,
208 ref ping_config,
209 } => {
210 let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
211 let ping = if ping_config.is_enabled() {
212 let pp = conn.ping_pong().expect("conn.ping_pong");
213 Some(ping::channel(pp, ping_config.clone()))
214 } else {
215 None
216 };
217 State::Serving(Serving {
218 ping,
219 conn,
220 closing: None,
221 })
222 }
223 State::Serving(ref mut srv) => {
224 ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
225 return Poll::Ready(Ok(Dispatched::Shutdown));
226 }
227 State::Closed => {
228 return Poll::Ready(Ok(Dispatched::Shutdown));
231 }
232 };
233 me.state = next;
234 }
235 }
236}
237
238impl<T, B> Serving<T, B>
239where
240 T: AsyncRead + AsyncWrite + Unpin,
241 B: HttpBody + 'static,
242{
243 fn poll_server<S, E>(
244 &mut self,
245 cx: &mut Context<'_>,
246 service: &mut S,
247 exec: &mut E,
248 ) -> Poll<crate::Result<()>>
249 where
250 S: HttpService<Body, ResBody = B>,
251 S::Error: Into<Box<dyn StdError + Send + Sync>>,
252 E: ConnStreamExec<S::Future, B>,
253 {
254 if self.closing.is_none() {
255 loop {
256 self.poll_ping(cx);
257
258 match service.poll_ready(cx) {
263 Poll::Ready(Ok(())) => (),
264 Poll::Pending => {
265 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
268 trace!("incoming connection complete");
269 return Poll::Ready(Ok(()));
270 }
271 Poll::Ready(Err(err)) => {
272 let err = crate::Error::new_user_service(err);
273 debug!("service closed: {}", err);
274
275 let reason = err.h2_reason();
276 if reason == Reason::NO_ERROR {
277 trace!("interpreting NO_ERROR user error as graceful_shutdown");
279 self.conn.graceful_shutdown();
280 } else {
281 trace!("abruptly shutting down with {:?}", reason);
282 self.conn.abrupt_shutdown(reason);
283 }
284 self.closing = Some(err);
285 break;
286 }
287 }
288
289 match ready!(self.conn.poll_accept(cx)) {
291 Some(Ok((req, mut respond))) => {
292 trace!("incoming request");
293 let content_length = headers::content_length_parse_all(req.headers());
294 let ping = self
295 .ping
296 .as_ref()
297 .map(|ping| ping.0.clone())
298 .unwrap_or_else(ping::disabled);
299
300 ping.record_non_data();
302
303 let is_connect = req.method() == Method::CONNECT;
304 let (mut parts, stream) = req.into_parts();
305 let (mut req, connect_parts) = if !is_connect {
306 (
307 Request::from_parts(
308 parts,
309 crate::Body::h2(stream, content_length.into(), ping),
310 ),
311 None,
312 )
313 } else {
314 if content_length.map_or(false, |len| len != 0) {
315 warn!("h2 connect request with non-zero body not supported");
316 respond.send_reset(h2::Reason::INTERNAL_ERROR);
317 return Poll::Ready(Ok(()));
318 }
319 let (pending, upgrade) = crate::upgrade::pending();
320 debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
321 parts.extensions.insert(upgrade);
322 (
323 Request::from_parts(parts, crate::Body::empty()),
324 Some(ConnectParts {
325 pending,
326 ping,
327 recv_stream: stream,
328 }),
329 )
330 };
331
332 if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
333 req.extensions_mut().insert(Protocol::from_inner(protocol));
334 }
335
336 let fut = H2Stream::new(service.call(req), connect_parts, respond);
337 exec.execute_h2stream(fut);
338 }
339 Some(Err(e)) => {
340 return Poll::Ready(Err(crate::Error::new_h2(e)));
341 }
342 None => {
343 if let Some((ref ping, _)) = self.ping {
345 ping.ensure_not_timed_out()?;
346 }
347
348 trace!("incoming connection complete");
349 return Poll::Ready(Ok(()));
350 }
351 }
352 }
353 }
354
355 debug_assert!(
356 self.closing.is_some(),
357 "poll_server broke loop without closing"
358 );
359
360 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
361
362 Poll::Ready(Err(self.closing.take().expect("polled after error")))
363 }
364
365 fn poll_ping(&mut self, cx: &mut Context<'_>) {
366 if let Some((_, ref mut estimator)) = self.ping {
367 match estimator.poll(cx) {
368 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
369 self.conn.set_target_window_size(wnd);
370 let _ = self.conn.set_initial_window_size(wnd);
371 }
372 #[cfg(feature = "runtime")]
373 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
374 debug!("keep-alive timed out, closing connection");
375 self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
376 }
377 Poll::Pending => {}
378 }
379 }
380 }
381}
382
383pin_project! {
384 #[allow(missing_debug_implementations)]
385 pub struct H2Stream<F, B>
386 where
387 B: HttpBody,
388 {
389 reply: SendResponse<SendBuf<B::Data>>,
390 #[pin]
391 state: H2StreamState<F, B>,
392 }
393}
394
395pin_project! {
396 #[project = H2StreamStateProj]
397 enum H2StreamState<F, B>
398 where
399 B: HttpBody,
400 {
401 Service {
402 #[pin]
403 fut: F,
404 connect_parts: Option<ConnectParts>,
405 },
406 Body {
407 #[pin]
408 pipe: PipeToSendStream<B>,
409 },
410 }
411}
412
413struct ConnectParts {
414 pending: Pending,
415 ping: Recorder,
416 recv_stream: RecvStream,
417}
418
419impl<F, B> H2Stream<F, B>
420where
421 B: HttpBody,
422{
423 fn new(
424 fut: F,
425 connect_parts: Option<ConnectParts>,
426 respond: SendResponse<SendBuf<B::Data>>,
427 ) -> H2Stream<F, B> {
428 H2Stream {
429 reply: respond,
430 state: H2StreamState::Service { fut, connect_parts },
431 }
432 }
433}
434
435macro_rules! reply {
436 ($me:expr, $res:expr, $eos:expr) => {{
437 match $me.reply.send_response($res, $eos) {
438 Ok(tx) => tx,
439 Err(e) => {
440 debug!("send response error: {}", e);
441 $me.reply.send_reset(Reason::INTERNAL_ERROR);
442 return Poll::Ready(Err(crate::Error::new_h2(e)));
443 }
444 }
445 }};
446}
447
448impl<F, B, E> H2Stream<F, B>
449where
450 F: Future<Output = Result<Response<B>, E>>,
451 B: HttpBody,
452 B::Data: 'static,
453 B::Error: Into<Box<dyn StdError + Send + Sync>>,
454 E: Into<Box<dyn StdError + Send + Sync>>,
455{
456 fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
457 let mut me = self.project();
458 loop {
459 let next = match me.state.as_mut().project() {
460 H2StreamStateProj::Service {
461 fut: h,
462 connect_parts,
463 } => {
464 let res = match h.poll(cx) {
465 Poll::Ready(Ok(r)) => r,
466 Poll::Pending => {
467 if let Poll::Ready(reason) =
470 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
471 {
472 debug!("stream received RST_STREAM: {:?}", reason);
473 return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
474 }
475 return Poll::Pending;
476 }
477 Poll::Ready(Err(e)) => {
478 let err = crate::Error::new_user_service(e);
479 warn!("http2 service errored: {}", err);
480 me.reply.send_reset(err.h2_reason());
481 return Poll::Ready(Err(err));
482 }
483 };
484
485 let (head, body) = res.into_parts();
486 let mut res = ::http::Response::from_parts(head, ());
487 super::strip_connection_headers(res.headers_mut(), false);
488
489 res.headers_mut()
491 .entry(::http::header::DATE)
492 .or_insert_with(date::update_and_header_value);
493
494 if let Some(connect_parts) = connect_parts.take() {
495 if res.status().is_success() {
496 if headers::content_length_parse_all(res.headers())
497 .map_or(false, |len| len != 0)
498 {
499 warn!("h2 successful response to CONNECT request with body not supported");
500 me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
501 return Poll::Ready(Err(crate::Error::new_user_header()));
502 }
503 let send_stream = reply!(me, res, false);
504 connect_parts.pending.fulfill(Upgraded::new(
505 H2Upgraded {
506 ping: connect_parts.ping,
507 recv_stream: connect_parts.recv_stream,
508 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
509 buf: Bytes::new(),
510 },
511 Bytes::new(),
512 ));
513 return Poll::Ready(Ok(()));
514 }
515 }
516
517 if !body.is_end_stream() {
518 if let Some(len) = body.size_hint().exact() {
520 headers::set_content_length_if_missing(res.headers_mut(), len);
521 }
522
523 let body_tx = reply!(me, res, false);
524 H2StreamState::Body {
525 pipe: PipeToSendStream::new(body, body_tx),
526 }
527 } else {
528 reply!(me, res, true);
529 return Poll::Ready(Ok(()));
530 }
531 }
532 H2StreamStateProj::Body { pipe } => {
533 return pipe.poll(cx);
534 }
535 };
536 me.state.set(next);
537 }
538 }
539}
540
541impl<F, B, E> Future for H2Stream<F, B>
542where
543 F: Future<Output = Result<Response<B>, E>>,
544 B: HttpBody,
545 B::Data: 'static,
546 B::Error: Into<Box<dyn StdError + Send + Sync>>,
547 E: Into<Box<dyn StdError + Send + Sync>>,
548{
549 type Output = ();
550
551 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552 self.poll2(cx).map(|res| {
553 if let Err(e) = res {
554 debug!("stream error: {}", e);
555 }
556 })
557 }
558}