hyper/proto/h2/
mod.rs

1use bytes::{Buf, Bytes};
2use h2::{Reason, RecvStream, SendStream};
3use http::header::{HeaderName, CONNECTION, TE, TRAILER, TRANSFER_ENCODING, UPGRADE};
4use http::HeaderMap;
5use pin_project_lite::pin_project;
6use std::error::Error as StdError;
7use std::future::Future;
8use std::io::{self, Cursor, IoSlice};
9use std::mem;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13use tracing::{debug, trace, warn};
14
15use crate::body::HttpBody;
16use crate::proto::h2::ping::Recorder;
17
18pub(crate) mod ping;
19
20cfg_client! {
21    pub(crate) mod client;
22    pub(crate) use self::client::ClientTask;
23}
24
25cfg_server! {
26    pub(crate) mod server;
27    pub(crate) use self::server::Server;
28}
29
30/// Default initial stream window size defined in HTTP2 spec.
31pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
32
33fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
34    // List of connection headers from:
35    // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
36    //
37    // TE headers are allowed in HTTP/2 requests as long as the value is "trailers", so they're
38    // tested separately.
39    let connection_headers = [
40        HeaderName::from_lowercase(b"keep-alive").unwrap(),
41        HeaderName::from_lowercase(b"proxy-connection").unwrap(),
42        TRAILER,
43        TRANSFER_ENCODING,
44        UPGRADE,
45    ];
46
47    for header in connection_headers.iter() {
48        if headers.remove(header).is_some() {
49            warn!("Connection header illegal in HTTP/2: {}", header.as_str());
50        }
51    }
52
53    if is_request {
54        if headers
55            .get(TE)
56            .map(|te_header| te_header != "trailers")
57            .unwrap_or(false)
58        {
59            warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
60            headers.remove(TE);
61        }
62    } else if headers.remove(TE).is_some() {
63        warn!("TE headers illegal in HTTP/2 responses");
64    }
65
66    if let Some(header) = headers.remove(CONNECTION) {
67        warn!(
68            "Connection header illegal in HTTP/2: {}",
69            CONNECTION.as_str()
70        );
71        let header_contents = header.to_str().unwrap();
72
73        // A `Connection` header may have a comma-separated list of names of other headers that
74        // are meant for only this specific connection.
75        //
76        // Iterate these names and remove them as headers. Connection-specific headers are
77        // forbidden in HTTP2, as that information has been moved into frame types of the h2
78        // protocol.
79        for name in header_contents.split(',') {
80            let name = name.trim();
81            headers.remove(name);
82        }
83    }
84}
85
86// body adapters used by both Client and Server
87
88pin_project! {
89    struct PipeToSendStream<S>
90    where
91        S: HttpBody,
92    {
93        body_tx: SendStream<SendBuf<S::Data>>,
94        data_done: bool,
95        #[pin]
96        stream: S,
97    }
98}
99
100impl<S> PipeToSendStream<S>
101where
102    S: HttpBody,
103{
104    fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
105        PipeToSendStream {
106            body_tx: tx,
107            data_done: false,
108            stream,
109        }
110    }
111}
112
113impl<S> Future for PipeToSendStream<S>
114where
115    S: HttpBody,
116    S::Error: Into<Box<dyn StdError + Send + Sync>>,
117{
118    type Output = crate::Result<()>;
119
120    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
121        let mut me = self.project();
122        loop {
123            if !*me.data_done {
124                // we don't have the next chunk of data yet, so just reserve 1 byte to make
125                // sure there's some capacity available. h2 will handle the capacity management
126                // for the actual body chunk.
127                me.body_tx.reserve_capacity(1);
128
129                if me.body_tx.capacity() == 0 {
130                    loop {
131                        match ready!(me.body_tx.poll_capacity(cx)) {
132                            Some(Ok(0)) => {}
133                            Some(Ok(_)) => break,
134                            Some(Err(e)) => {
135                                return Poll::Ready(Err(crate::Error::new_body_write(e)))
136                            }
137                            None => {
138                                // None means the stream is no longer in a
139                                // streaming state, we either finished it
140                                // somehow, or the remote reset us.
141                                return Poll::Ready(Err(crate::Error::new_body_write(
142                                    "send stream capacity unexpectedly closed",
143                                )));
144                            }
145                        }
146                    }
147                } else if let Poll::Ready(reason) = me
148                    .body_tx
149                    .poll_reset(cx)
150                    .map_err(crate::Error::new_body_write)?
151                {
152                    debug!("stream received RST_STREAM: {:?}", reason);
153                    return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
154                        reason,
155                    ))));
156                }
157
158                match ready!(me.stream.as_mut().poll_data(cx)) {
159                    Some(Ok(chunk)) => {
160                        let is_eos = me.stream.is_end_stream();
161                        trace!(
162                            "send body chunk: {} bytes, eos={}",
163                            chunk.remaining(),
164                            is_eos,
165                        );
166
167                        let buf = SendBuf::Buf(chunk);
168                        me.body_tx
169                            .send_data(buf, is_eos)
170                            .map_err(crate::Error::new_body_write)?;
171
172                        if is_eos {
173                            return Poll::Ready(Ok(()));
174                        }
175                    }
176                    Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
177                    None => {
178                        me.body_tx.reserve_capacity(0);
179                        let is_eos = me.stream.is_end_stream();
180                        if is_eos {
181                            return Poll::Ready(me.body_tx.send_eos_frame());
182                        } else {
183                            *me.data_done = true;
184                            // loop again to poll_trailers
185                        }
186                    }
187                }
188            } else {
189                if let Poll::Ready(reason) = me
190                    .body_tx
191                    .poll_reset(cx)
192                    .map_err(crate::Error::new_body_write)?
193                {
194                    debug!("stream received RST_STREAM: {:?}", reason);
195                    return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(
196                        reason,
197                    ))));
198                }
199
200                match ready!(me.stream.poll_trailers(cx)) {
201                    Ok(Some(trailers)) => {
202                        me.body_tx
203                            .send_trailers(trailers)
204                            .map_err(crate::Error::new_body_write)?;
205                        return Poll::Ready(Ok(()));
206                    }
207                    Ok(None) => {
208                        // There were no trailers, so send an empty DATA frame...
209                        return Poll::Ready(me.body_tx.send_eos_frame());
210                    }
211                    Err(e) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
212                }
213            }
214        }
215    }
216}
217
218trait SendStreamExt {
219    fn on_user_err<E>(&mut self, err: E) -> crate::Error
220    where
221        E: Into<Box<dyn std::error::Error + Send + Sync>>;
222    fn send_eos_frame(&mut self) -> crate::Result<()>;
223}
224
225impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
226    fn on_user_err<E>(&mut self, err: E) -> crate::Error
227    where
228        E: Into<Box<dyn std::error::Error + Send + Sync>>,
229    {
230        let err = crate::Error::new_user_body(err);
231        debug!("send body user stream error: {}", err);
232        self.send_reset(err.h2_reason());
233        err
234    }
235
236    fn send_eos_frame(&mut self) -> crate::Result<()> {
237        trace!("send body eos");
238        self.send_data(SendBuf::None, true)
239            .map_err(crate::Error::new_body_write)
240    }
241}
242
243#[repr(usize)]
244enum SendBuf<B> {
245    Buf(B),
246    Cursor(Cursor<Box<[u8]>>),
247    None,
248}
249
250impl<B: Buf> Buf for SendBuf<B> {
251    #[inline]
252    fn remaining(&self) -> usize {
253        match *self {
254            Self::Buf(ref b) => b.remaining(),
255            Self::Cursor(ref c) => Buf::remaining(c),
256            Self::None => 0,
257        }
258    }
259
260    #[inline]
261    fn chunk(&self) -> &[u8] {
262        match *self {
263            Self::Buf(ref b) => b.chunk(),
264            Self::Cursor(ref c) => c.chunk(),
265            Self::None => &[],
266        }
267    }
268
269    #[inline]
270    fn advance(&mut self, cnt: usize) {
271        match *self {
272            Self::Buf(ref mut b) => b.advance(cnt),
273            Self::Cursor(ref mut c) => c.advance(cnt),
274            Self::None => {}
275        }
276    }
277
278    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
279        match *self {
280            Self::Buf(ref b) => b.chunks_vectored(dst),
281            Self::Cursor(ref c) => c.chunks_vectored(dst),
282            Self::None => 0,
283        }
284    }
285}
286
287struct H2Upgraded<B>
288where
289    B: Buf,
290{
291    ping: Recorder,
292    send_stream: UpgradedSendStream<B>,
293    recv_stream: RecvStream,
294    buf: Bytes,
295}
296
297impl<B> AsyncRead for H2Upgraded<B>
298where
299    B: Buf,
300{
301    fn poll_read(
302        mut self: Pin<&mut Self>,
303        cx: &mut Context<'_>,
304        read_buf: &mut ReadBuf<'_>,
305    ) -> Poll<Result<(), io::Error>> {
306        if self.buf.is_empty() {
307            self.buf = loop {
308                match ready!(self.recv_stream.poll_data(cx)) {
309                    None => return Poll::Ready(Ok(())),
310                    Some(Ok(buf)) if buf.is_empty() && !self.recv_stream.is_end_stream() => {
311                        continue
312                    }
313                    Some(Ok(buf)) => {
314                        self.ping.record_data(buf.len());
315                        break buf;
316                    }
317                    Some(Err(e)) => {
318                        return Poll::Ready(match e.reason() {
319                            Some(Reason::NO_ERROR) | Some(Reason::CANCEL) => Ok(()),
320                            Some(Reason::STREAM_CLOSED) => {
321                                Err(io::Error::new(io::ErrorKind::BrokenPipe, e))
322                            }
323                            _ => Err(h2_to_io_error(e)),
324                        })
325                    }
326                }
327            };
328        }
329        let cnt = std::cmp::min(self.buf.len(), read_buf.remaining());
330        read_buf.put_slice(&self.buf[..cnt]);
331        self.buf.advance(cnt);
332        let _ = self.recv_stream.flow_control().release_capacity(cnt);
333        Poll::Ready(Ok(()))
334    }
335}
336
337impl<B> AsyncWrite for H2Upgraded<B>
338where
339    B: Buf,
340{
341    fn poll_write(
342        mut self: Pin<&mut Self>,
343        cx: &mut Context<'_>,
344        buf: &[u8],
345    ) -> Poll<Result<usize, io::Error>> {
346        if buf.is_empty() {
347            return Poll::Ready(Ok(0));
348        }
349        self.send_stream.reserve_capacity(buf.len());
350
351        // We ignore all errors returned by `poll_capacity` and `write`, as we
352        // will get the correct from `poll_reset` anyway.
353        let cnt = match ready!(self.send_stream.poll_capacity(cx)) {
354            None => Some(0),
355            Some(Ok(cnt)) => self
356                .send_stream
357                .write(&buf[..cnt], false)
358                .ok()
359                .map(|()| cnt),
360            Some(Err(_)) => None,
361        };
362
363        if let Some(cnt) = cnt {
364            return Poll::Ready(Ok(cnt));
365        }
366
367        Poll::Ready(Err(h2_to_io_error(
368            match ready!(self.send_stream.poll_reset(cx)) {
369                Ok(Reason::NO_ERROR) | Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
370                    return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
371                }
372                Ok(reason) => reason.into(),
373                Err(e) => e,
374            },
375        )))
376    }
377
378    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
379        Poll::Ready(Ok(()))
380    }
381
382    fn poll_shutdown(
383        mut self: Pin<&mut Self>,
384        cx: &mut Context<'_>,
385    ) -> Poll<Result<(), io::Error>> {
386        if self.send_stream.write(&[], true).is_ok() {
387            return Poll::Ready(Ok(()));
388        }
389
390        Poll::Ready(Err(h2_to_io_error(
391            match ready!(self.send_stream.poll_reset(cx)) {
392                Ok(Reason::NO_ERROR) => return Poll::Ready(Ok(())),
393                Ok(Reason::CANCEL) | Ok(Reason::STREAM_CLOSED) => {
394                    return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
395                }
396                Ok(reason) => reason.into(),
397                Err(e) => e,
398            },
399        )))
400    }
401}
402
403fn h2_to_io_error(e: h2::Error) -> io::Error {
404    if e.is_io() {
405        e.into_io().unwrap()
406    } else {
407        io::Error::new(io::ErrorKind::Other, e)
408    }
409}
410
411struct UpgradedSendStream<B>(SendStream<SendBuf<Neutered<B>>>);
412
413impl<B> UpgradedSendStream<B>
414where
415    B: Buf,
416{
417    unsafe fn new(inner: SendStream<SendBuf<B>>) -> Self {
418        assert_eq!(mem::size_of::<B>(), mem::size_of::<Neutered<B>>());
419        Self(mem::transmute(inner))
420    }
421
422    fn reserve_capacity(&mut self, cnt: usize) {
423        unsafe { self.as_inner_unchecked().reserve_capacity(cnt) }
424    }
425
426    fn poll_capacity(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<usize, h2::Error>>> {
427        unsafe { self.as_inner_unchecked().poll_capacity(cx) }
428    }
429
430    fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<h2::Reason, h2::Error>> {
431        unsafe { self.as_inner_unchecked().poll_reset(cx) }
432    }
433
434    fn write(&mut self, buf: &[u8], end_of_stream: bool) -> Result<(), io::Error> {
435        let send_buf = SendBuf::Cursor(Cursor::new(buf.into()));
436        unsafe {
437            self.as_inner_unchecked()
438                .send_data(send_buf, end_of_stream)
439                .map_err(h2_to_io_error)
440        }
441    }
442
443    unsafe fn as_inner_unchecked(&mut self) -> &mut SendStream<SendBuf<B>> {
444        &mut *(&mut self.0 as *mut _ as *mut _)
445    }
446}
447
448#[repr(transparent)]
449struct Neutered<B> {
450    _inner: B,
451    impossible: Impossible,
452}
453
454enum Impossible {}
455
456unsafe impl<B> Send for Neutered<B> {}
457
458impl<B> Buf for Neutered<B> {
459    fn remaining(&self) -> usize {
460        match self.impossible {}
461    }
462
463    fn chunk(&self) -> &[u8] {
464        match self.impossible {}
465    }
466
467    fn advance(&mut self, _cnt: usize) {
468        match self.impossible {}
469    }
470}