hyper/proto/h2/
client.rs

1use std::convert::Infallible;
2use std::error::Error as StdError;
3use std::future::Future;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7#[cfg(feature = "runtime")]
8use std::time::Duration;
9
10use bytes::Bytes;
11use futures_channel::{mpsc, oneshot};
12use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
13use futures_util::stream::StreamExt as _;
14use h2::client::{Builder, SendRequest};
15use h2::SendStream;
16use http::{Method, StatusCode};
17use tokio::io::{AsyncRead, AsyncWrite};
18use tracing::{debug, trace, warn};
19
20use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
21use crate::body::HttpBody;
22use crate::client::dispatch::Callback;
23use crate::common::exec::Exec;
24use crate::ext::Protocol;
25use crate::headers;
26use crate::proto::h2::UpgradedSendStream;
27use crate::proto::Dispatched;
28use crate::upgrade::Upgraded;
29use crate::{Body, Request, Response};
30use h2::client::ResponseFuture;
31
32type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
33
34///// An mpsc channel is used to help notify the `Connection` task when *all*
35///// other handles to it have been dropped, so that it can shutdown.
36type ConnDropRef = mpsc::Sender<Infallible>;
37
38///// A oneshot channel watches the `Connection` task, and when it completes,
39///// the "dispatch" task will be notified and can shutdown sooner.
40type ConnEof = oneshot::Receiver<Infallible>;
41
42// Our defaults are chosen for the "majority" case, which usually are not
43// resource constrained, and so the spec default of 64kb can be too limiting
44// for performance.
45const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
46const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
47const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
48const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
49
50#[derive(Clone, Debug)]
51pub(crate) struct Config {
52    pub(crate) adaptive_window: bool,
53    pub(crate) initial_conn_window_size: u32,
54    pub(crate) initial_stream_window_size: u32,
55    pub(crate) max_frame_size: u32,
56    #[cfg(feature = "runtime")]
57    pub(crate) keep_alive_interval: Option<Duration>,
58    #[cfg(feature = "runtime")]
59    pub(crate) keep_alive_timeout: Duration,
60    #[cfg(feature = "runtime")]
61    pub(crate) keep_alive_while_idle: bool,
62    pub(crate) max_concurrent_reset_streams: Option<usize>,
63    pub(crate) max_send_buffer_size: usize,
64}
65
66impl Default for Config {
67    fn default() -> Config {
68        Config {
69            adaptive_window: false,
70            initial_conn_window_size: DEFAULT_CONN_WINDOW,
71            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
72            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
73            #[cfg(feature = "runtime")]
74            keep_alive_interval: None,
75            #[cfg(feature = "runtime")]
76            keep_alive_timeout: Duration::from_secs(20),
77            #[cfg(feature = "runtime")]
78            keep_alive_while_idle: false,
79            max_concurrent_reset_streams: None,
80            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
81        }
82    }
83}
84
85fn new_builder(config: &Config) -> Builder {
86    let mut builder = Builder::default();
87    builder
88        .initial_window_size(config.initial_stream_window_size)
89        .initial_connection_window_size(config.initial_conn_window_size)
90        .max_frame_size(config.max_frame_size)
91        .max_send_buffer_size(config.max_send_buffer_size)
92        .enable_push(false);
93    if let Some(max) = config.max_concurrent_reset_streams {
94        builder.max_concurrent_reset_streams(max);
95    }
96    builder
97}
98
99fn new_ping_config(config: &Config) -> ping::Config {
100    ping::Config {
101        bdp_initial_window: if config.adaptive_window {
102            Some(config.initial_stream_window_size)
103        } else {
104            None
105        },
106        #[cfg(feature = "runtime")]
107        keep_alive_interval: config.keep_alive_interval,
108        #[cfg(feature = "runtime")]
109        keep_alive_timeout: config.keep_alive_timeout,
110        #[cfg(feature = "runtime")]
111        keep_alive_while_idle: config.keep_alive_while_idle,
112    }
113}
114
115pub(crate) async fn handshake<T, B>(
116    io: T,
117    req_rx: ClientRx<B>,
118    config: &Config,
119    exec: Exec,
120) -> crate::Result<ClientTask<B>>
121where
122    T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
123    B: HttpBody,
124    B::Data: Send + 'static,
125{
126    let (h2_tx, mut conn) = new_builder(config)
127        .handshake::<_, SendBuf<B::Data>>(io)
128        .await
129        .map_err(crate::Error::new_h2)?;
130
131    // An mpsc channel is used entirely to detect when the
132    // 'Client' has been dropped. This is to get around a bug
133    // in h2 where dropping all SendRequests won't notify a
134    // parked Connection.
135    let (conn_drop_ref, rx) = mpsc::channel(1);
136    let (cancel_tx, conn_eof) = oneshot::channel();
137
138    let conn_drop_rx = rx.into_future().map(|(item, _rx)| {
139        if let Some(never) = item {
140            match never {}
141        }
142    });
143
144    let ping_config = new_ping_config(&config);
145
146    let (conn, ping) = if ping_config.is_enabled() {
147        let pp = conn.ping_pong().expect("conn.ping_pong");
148        let (recorder, mut ponger) = ping::channel(pp, ping_config);
149
150        let conn = future::poll_fn(move |cx| {
151            match ponger.poll(cx) {
152                Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
153                    conn.set_target_window_size(wnd);
154                    conn.set_initial_window_size(wnd)?;
155                }
156                #[cfg(feature = "runtime")]
157                Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
158                    debug!("connection keep-alive timed out");
159                    return Poll::Ready(Ok(()));
160                }
161                Poll::Pending => {}
162            }
163
164            Pin::new(&mut conn).poll(cx)
165        });
166        (Either::Left(conn), recorder)
167    } else {
168        (Either::Right(conn), ping::disabled())
169    };
170    let conn = conn.map_err(|e| debug!("connection error: {}", e));
171
172    exec.execute(conn_task(conn, conn_drop_rx, cancel_tx));
173
174    Ok(ClientTask {
175        ping,
176        conn_drop_ref,
177        conn_eof,
178        executor: exec,
179        h2_tx,
180        req_rx,
181        fut_ctx: None,
182    })
183}
184
185async fn conn_task<C, D>(conn: C, drop_rx: D, cancel_tx: oneshot::Sender<Infallible>)
186where
187    C: Future + Unpin,
188    D: Future<Output = ()> + Unpin,
189{
190    match future::select(conn, drop_rx).await {
191        Either::Left(_) => {
192            // ok or err, the `conn` has finished
193        }
194        Either::Right(((), conn)) => {
195            // mpsc has been dropped, hopefully polling
196            // the connection some more should start shutdown
197            // and then close
198            trace!("send_request dropped, starting conn shutdown");
199            drop(cancel_tx);
200            let _ = conn.await;
201        }
202    }
203}
204
205struct FutCtx<B>
206where
207    B: HttpBody,
208{
209    is_connect: bool,
210    eos: bool,
211    fut: ResponseFuture,
212    body_tx: SendStream<SendBuf<B::Data>>,
213    body: B,
214    cb: Callback<Request<B>, Response<Body>>,
215}
216
217impl<B: HttpBody> Unpin for FutCtx<B> {}
218
219pub(crate) struct ClientTask<B>
220where
221    B: HttpBody,
222{
223    ping: ping::Recorder,
224    conn_drop_ref: ConnDropRef,
225    conn_eof: ConnEof,
226    executor: Exec,
227    h2_tx: SendRequest<SendBuf<B::Data>>,
228    req_rx: ClientRx<B>,
229    fut_ctx: Option<FutCtx<B>>,
230}
231
232impl<B> ClientTask<B>
233where
234    B: HttpBody + 'static,
235{
236    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
237        self.h2_tx.is_extended_connect_protocol_enabled()
238    }
239}
240
241impl<B> ClientTask<B>
242where
243    B: HttpBody + Send + 'static,
244    B::Data: Send,
245    B::Error: Into<Box<dyn StdError + Send + Sync>>,
246{
247    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
248        let ping = self.ping.clone();
249        let send_stream = if !f.is_connect {
250            if !f.eos {
251                let mut pipe = Box::pin(PipeToSendStream::new(f.body, f.body_tx)).map(|res| {
252                    if let Err(e) = res {
253                        debug!("client request body error: {}", e);
254                    }
255                });
256
257                // eagerly see if the body pipe is ready and
258                // can thus skip allocating in the executor
259                match Pin::new(&mut pipe).poll(cx) {
260                    Poll::Ready(_) => (),
261                    Poll::Pending => {
262                        let conn_drop_ref = self.conn_drop_ref.clone();
263                        // keep the ping recorder's knowledge of an
264                        // "open stream" alive while this body is
265                        // still sending...
266                        let ping = ping.clone();
267                        let pipe = pipe.map(move |x| {
268                            drop(conn_drop_ref);
269                            drop(ping);
270                            x
271                        });
272                        // Clear send task
273                        self.executor.execute(pipe);
274                    }
275                }
276            }
277
278            None
279        } else {
280            Some(f.body_tx)
281        };
282
283        let fut = f.fut.map(move |result| match result {
284            Ok(res) => {
285                // record that we got the response headers
286                ping.record_non_data();
287
288                let content_length = headers::content_length_parse_all(res.headers());
289                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
290                    if content_length.map_or(false, |len| len != 0) {
291                        warn!("h2 connect response with non-zero body not supported");
292
293                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
294                        return Err((
295                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
296                            None,
297                        ));
298                    }
299                    let (parts, recv_stream) = res.into_parts();
300                    let mut res = Response::from_parts(parts, Body::empty());
301
302                    let (pending, on_upgrade) = crate::upgrade::pending();
303                    let io = H2Upgraded {
304                        ping,
305                        send_stream: unsafe { UpgradedSendStream::new(send_stream) },
306                        recv_stream,
307                        buf: Bytes::new(),
308                    };
309                    let upgraded = Upgraded::new(io, Bytes::new());
310
311                    pending.fulfill(upgraded);
312                    res.extensions_mut().insert(on_upgrade);
313
314                    Ok(res)
315                } else {
316                    let res = res.map(|stream| {
317                        let ping = ping.for_stream(&stream);
318                        crate::Body::h2(stream, content_length.into(), ping)
319                    });
320                    Ok(res)
321                }
322            }
323            Err(err) => {
324                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
325
326                debug!("client response error: {}", err);
327                Err((crate::Error::new_h2(err), None))
328            }
329        });
330        self.executor.execute(f.cb.send_when(fut));
331    }
332}
333
334impl<B> Future for ClientTask<B>
335where
336    B: HttpBody + Send + 'static,
337    B::Data: Send,
338    B::Error: Into<Box<dyn StdError + Send + Sync>>,
339{
340    type Output = crate::Result<Dispatched>;
341
342    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
343        loop {
344            match ready!(self.h2_tx.poll_ready(cx)) {
345                Ok(()) => (),
346                Err(err) => {
347                    self.ping.ensure_not_timed_out()?;
348                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
349                        trace!("connection gracefully shutdown");
350                        Poll::Ready(Ok(Dispatched::Shutdown))
351                    } else {
352                        Poll::Ready(Err(crate::Error::new_h2(err)))
353                    };
354                }
355            };
356
357            match self.fut_ctx.take() {
358                // If we were waiting on pending open
359                // continue where we left off.
360                Some(f) => {
361                    self.poll_pipe(f, cx);
362                    continue;
363                }
364                None => (),
365            }
366
367            match self.req_rx.poll_recv(cx) {
368                Poll::Ready(Some((req, cb))) => {
369                    // check that future hasn't been canceled already
370                    if cb.is_canceled() {
371                        trace!("request callback is canceled");
372                        continue;
373                    }
374                    let (head, body) = req.into_parts();
375                    let mut req = ::http::Request::from_parts(head, ());
376                    super::strip_connection_headers(req.headers_mut(), true);
377                    if let Some(len) = body.size_hint().exact() {
378                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
379                            headers::set_content_length_if_missing(req.headers_mut(), len);
380                        }
381                    }
382
383                    let is_connect = req.method() == Method::CONNECT;
384                    let eos = body.is_end_stream();
385
386                    if is_connect {
387                        if headers::content_length_parse_all(req.headers())
388                            .map_or(false, |len| len != 0)
389                        {
390                            warn!("h2 connect request with non-zero body not supported");
391                            cb.send(Err((
392                                crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
393                                None,
394                            )));
395                            continue;
396                        }
397                    }
398
399                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
400                        req.extensions_mut().insert(protocol.into_inner());
401                    }
402
403                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
404                        Ok(ok) => ok,
405                        Err(err) => {
406                            debug!("client send request error: {}", err);
407                            cb.send(Err((crate::Error::new_h2(err), None)));
408                            continue;
409                        }
410                    };
411
412                    let f = FutCtx {
413                        is_connect,
414                        eos,
415                        fut,
416                        body_tx,
417                        body,
418                        cb,
419                    };
420
421                    // Check poll_ready() again.
422                    // If the call to send_request() resulted in the new stream being pending open
423                    // we have to wait for the open to complete before accepting new requests.
424                    match self.h2_tx.poll_ready(cx) {
425                        Poll::Pending => {
426                            // Save Context
427                            self.fut_ctx = Some(f);
428                            return Poll::Pending;
429                        }
430                        Poll::Ready(Ok(())) => (),
431                        Poll::Ready(Err(err)) => {
432                            f.cb.send(Err((crate::Error::new_h2(err), None)));
433                            continue;
434                        }
435                    }
436                    self.poll_pipe(f, cx);
437                    continue;
438                }
439
440                Poll::Ready(None) => {
441                    trace!("client::dispatch::Sender dropped");
442                    return Poll::Ready(Ok(Dispatched::Shutdown));
443                }
444
445                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
446                    Ok(never) => match never {},
447                    Err(_conn_is_eof) => {
448                        trace!("connection task is closed, closing dispatch task");
449                        return Poll::Ready(Ok(Dispatched::Shutdown));
450                    }
451                },
452            }
453        }
454    }
455}