hyper/proto/h1/
dispatch.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::marker::Unpin;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::{Buf, Bytes};
8use http::Request;
9use tokio::io::{AsyncRead, AsyncWrite};
10use tracing::{debug, trace};
11
12use super::{Http1Transaction, Wants};
13use crate::body::{Body, DecodedLength, HttpBody};
14use crate::common;
15use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
16use crate::upgrade::OnUpgrade;
17
18pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
19    conn: Conn<I, Bs::Data, T>,
20    dispatch: D,
21    body_tx: Option<crate::body::Sender>,
22    body_rx: Pin<Box<Option<Bs>>>,
23    is_closing: bool,
24}
25
26pub(crate) trait Dispatch {
27    type PollItem;
28    type PollBody;
29    type PollError;
30    type RecvItem;
31    fn poll_msg(
32        self: Pin<&mut Self>,
33        cx: &mut Context<'_>,
34    ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
35    fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
36    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
37    fn should_poll(&self) -> bool;
38}
39
40cfg_server! {
41    use crate::service::HttpService;
42
43    pub(crate) struct Server<S: HttpService<B>, B> {
44        in_flight: Pin<Box<Option<S::Future>>>,
45        pub(crate) service: S,
46    }
47}
48
49cfg_client! {
50    pin_project_lite::pin_project! {
51        pub(crate) struct Client<B> {
52            callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
53            #[pin]
54            rx: ClientRx<B>,
55            rx_closed: bool,
56        }
57    }
58
59    type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
60}
61
62impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
63where
64    D: Dispatch<
65            PollItem = MessageHead<T::Outgoing>,
66            PollBody = Bs,
67            RecvItem = MessageHead<T::Incoming>,
68        > + Unpin,
69    D::PollError: Into<Box<dyn StdError + Send + Sync>>,
70    I: AsyncRead + AsyncWrite + Unpin,
71    T: Http1Transaction + Unpin,
72    Bs: HttpBody + 'static,
73    Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
74{
75    pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
76        Dispatcher {
77            conn,
78            dispatch,
79            body_tx: None,
80            body_rx: Box::pin(None),
81            is_closing: false,
82        }
83    }
84
85    #[cfg(feature = "server")]
86    pub(crate) fn disable_keep_alive(&mut self) {
87        self.conn.disable_keep_alive();
88        if self.conn.is_write_closed() {
89            self.close();
90        }
91    }
92
93    pub(crate) fn into_inner(self) -> (I, Bytes, D) {
94        let (io, buf) = self.conn.into_inner();
95        (io, buf, self.dispatch)
96    }
97
98    /// Run this dispatcher until HTTP says this connection is done,
99    /// but don't call `AsyncWrite::shutdown` on the underlying IO.
100    ///
101    /// This is useful for old-style HTTP upgrades, but ignores
102    /// newer-style upgrade API.
103    pub(crate) fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
104    where
105        Self: Unpin,
106    {
107        Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
108            if let Dispatched::Upgrade(pending) = ds {
109                pending.manual();
110            }
111        })
112    }
113
114    fn poll_catch(
115        &mut self,
116        cx: &mut Context<'_>,
117        should_shutdown: bool,
118    ) -> Poll<crate::Result<Dispatched>> {
119        Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
120            // Be sure to alert a streaming body of the failure.
121            if let Some(mut body) = self.body_tx.take() {
122                body.send_error(crate::Error::new_body("connection error"));
123            }
124            // An error means we're shutting down either way.
125            // We just try to give the error to the user,
126            // and close the connection with an Ok. If we
127            // cannot give it to the user, then return the Err.
128            self.dispatch.recv_msg(Err(e))?;
129            Ok(Dispatched::Shutdown)
130        }))
131    }
132
133    fn poll_inner(
134        &mut self,
135        cx: &mut Context<'_>,
136        should_shutdown: bool,
137    ) -> Poll<crate::Result<Dispatched>> {
138        T::update_date();
139
140        ready!(self.poll_loop(cx))?;
141
142        if self.is_done() {
143            if let Some(pending) = self.conn.pending_upgrade() {
144                self.conn.take_error()?;
145                return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
146            } else if should_shutdown {
147                ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
148            }
149            self.conn.take_error()?;
150            Poll::Ready(Ok(Dispatched::Shutdown))
151        } else {
152            Poll::Pending
153        }
154    }
155
156    fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
157        // Limit the looping on this connection, in case it is ready far too
158        // often, so that other futures don't starve.
159        //
160        // 16 was chosen arbitrarily, as that is number of pipelined requests
161        // benchmarks often use. Perhaps it should be a config option instead.
162        for _ in 0..16 {
163            let _ = self.poll_read(cx)?;
164            let _ = self.poll_write(cx)?;
165            let _ = self.poll_flush(cx)?;
166
167            // This could happen if reading paused before blocking on IO,
168            // such as getting to the end of a framed message, but then
169            // writing/flushing set the state back to Init. In that case,
170            // if the read buffer still had bytes, we'd want to try poll_read
171            // again, or else we wouldn't ever be woken up again.
172            //
173            // Using this instead of task::current() and notify() inside
174            // the Conn is noticeably faster in pipelined benchmarks.
175            if !self.conn.wants_read_again() {
176                //break;
177                return Poll::Ready(Ok(()));
178            }
179        }
180
181        trace!("poll_loop yielding (self = {:p})", self);
182
183        common::task::yield_now(cx).map(|never| match never {})
184    }
185
186    fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
187        loop {
188            if self.is_closing {
189                return Poll::Ready(Ok(()));
190            } else if self.conn.can_read_head() {
191                ready!(self.poll_read_head(cx))?;
192            } else if let Some(mut body) = self.body_tx.take() {
193                if self.conn.can_read_body() {
194                    match body.poll_ready(cx) {
195                        Poll::Ready(Ok(())) => (),
196                        Poll::Pending => {
197                            self.body_tx = Some(body);
198                            return Poll::Pending;
199                        }
200                        Poll::Ready(Err(_canceled)) => {
201                            // user doesn't care about the body
202                            // so we should stop reading
203                            trace!("body receiver dropped before eof, draining or closing");
204                            self.conn.poll_drain_or_close_read(cx);
205                            continue;
206                        }
207                    }
208                    match self.conn.poll_read_body(cx) {
209                        Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
210                            Ok(()) => {
211                                self.body_tx = Some(body);
212                            }
213                            Err(_canceled) => {
214                                if self.conn.can_read_body() {
215                                    trace!("body receiver dropped before eof, closing");
216                                    self.conn.close_read();
217                                }
218                            }
219                        },
220                        Poll::Ready(None) => {
221                            // just drop, the body will close automatically
222                        }
223                        Poll::Pending => {
224                            self.body_tx = Some(body);
225                            return Poll::Pending;
226                        }
227                        Poll::Ready(Some(Err(e))) => {
228                            body.send_error(crate::Error::new_body(e));
229                        }
230                    }
231                } else {
232                    // just drop, the body will close automatically
233                }
234            } else {
235                return self.conn.poll_read_keep_alive(cx);
236            }
237        }
238    }
239
240    fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
241        // can dispatch receive, or does it still care about, an incoming message?
242        match ready!(self.dispatch.poll_ready(cx)) {
243            Ok(()) => (),
244            Err(()) => {
245                trace!("dispatch no longer receiving messages");
246                self.close();
247                return Poll::Ready(Ok(()));
248            }
249        }
250        // dispatch is ready for a message, try to read one
251        match ready!(self.conn.poll_read_head(cx)) {
252            Some(Ok((mut head, body_len, wants))) => {
253                let body = match body_len {
254                    DecodedLength::ZERO => Body::empty(),
255                    other => {
256                        let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
257                        self.body_tx = Some(tx);
258                        rx
259                    }
260                };
261                if wants.contains(Wants::UPGRADE) {
262                    let upgrade = self.conn.on_upgrade();
263                    debug_assert!(!upgrade.is_none(), "empty upgrade");
264                    debug_assert!(
265                        head.extensions.get::<OnUpgrade>().is_none(),
266                        "OnUpgrade already set"
267                    );
268                    head.extensions.insert(upgrade);
269                }
270                self.dispatch.recv_msg(Ok((head, body)))?;
271                Poll::Ready(Ok(()))
272            }
273            Some(Err(err)) => {
274                debug!("read_head error: {}", err);
275                self.dispatch.recv_msg(Err(err))?;
276                // if here, the dispatcher gave the user the error
277                // somewhere else. we still need to shutdown, but
278                // not as a second error.
279                self.close();
280                Poll::Ready(Ok(()))
281            }
282            None => {
283                // read eof, the write side will have been closed too unless
284                // allow_read_close was set to true, in which case just do
285                // nothing...
286                debug_assert!(self.conn.is_read_closed());
287                if self.conn.is_write_closed() {
288                    self.close();
289                }
290                Poll::Ready(Ok(()))
291            }
292        }
293    }
294
295    fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
296        loop {
297            if self.is_closing {
298                return Poll::Ready(Ok(()));
299            } else if self.body_rx.is_none()
300                && self.conn.can_write_head()
301                && self.dispatch.should_poll()
302            {
303                if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
304                    let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
305
306                    // Check if the body knows its full data immediately.
307                    //
308                    // If so, we can skip a bit of bookkeeping that streaming
309                    // bodies need to do.
310                    if let Some(full) = crate::body::take_full_data(&mut body) {
311                        self.conn.write_full_msg(head, full);
312                        return Poll::Ready(Ok(()));
313                    }
314
315                    let body_type = if body.is_end_stream() {
316                        self.body_rx.set(None);
317                        None
318                    } else {
319                        let btype = body
320                            .size_hint()
321                            .exact()
322                            .map(BodyLength::Known)
323                            .or_else(|| Some(BodyLength::Unknown));
324                        self.body_rx.set(Some(body));
325                        btype
326                    };
327                    self.conn.write_head(head, body_type);
328                } else {
329                    self.close();
330                    return Poll::Ready(Ok(()));
331                }
332            } else if !self.conn.can_buffer_body() {
333                ready!(self.poll_flush(cx))?;
334            } else {
335                // A new scope is needed :(
336                if let (Some(mut body), clear_body) =
337                    OptGuard::new(self.body_rx.as_mut()).guard_mut()
338                {
339                    debug_assert!(!*clear_body, "opt guard defaults to keeping body");
340                    if !self.conn.can_write_body() {
341                        trace!(
342                            "no more write body allowed, user body is_end_stream = {}",
343                            body.is_end_stream(),
344                        );
345                        *clear_body = true;
346                        continue;
347                    }
348
349                    let item = ready!(body.as_mut().poll_data(cx));
350                    if let Some(item) = item {
351                        let chunk = item.map_err(|e| {
352                            *clear_body = true;
353                            crate::Error::new_user_body(e)
354                        })?;
355                        let eos = body.is_end_stream();
356                        if eos {
357                            *clear_body = true;
358                            if chunk.remaining() == 0 {
359                                trace!("discarding empty chunk");
360                                self.conn.end_body()?;
361                            } else {
362                                self.conn.write_body_and_end(chunk);
363                            }
364                        } else {
365                            if chunk.remaining() == 0 {
366                                trace!("discarding empty chunk");
367                                continue;
368                            }
369                            self.conn.write_body(chunk);
370                        }
371                    } else {
372                        *clear_body = true;
373                        self.conn.end_body()?;
374                    }
375                } else {
376                    // If there's no body_rx, end the body
377                    if self.conn.can_write_body() {
378                        self.conn.end_body()?;
379                    } else {
380                        return Poll::Pending;
381                    }
382                }
383            }
384        }
385    }
386
387    fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
388        self.conn.poll_flush(cx).map_err(|err| {
389            debug!("error writing: {}", err);
390            crate::Error::new_body_write(err)
391        })
392    }
393
394    fn close(&mut self) {
395        self.is_closing = true;
396        self.conn.close_read();
397        self.conn.close_write();
398    }
399
400    fn is_done(&self) -> bool {
401        if self.is_closing {
402            return true;
403        }
404
405        let read_done = self.conn.is_read_closed();
406
407        if !T::should_read_first() && read_done {
408            // a client that cannot read may was well be done.
409            true
410        } else {
411            let write_done = self.conn.is_write_closed()
412                || (!self.dispatch.should_poll() && self.body_rx.is_none());
413            read_done && write_done
414        }
415    }
416}
417
418impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
419where
420    D: Dispatch<
421            PollItem = MessageHead<T::Outgoing>,
422            PollBody = Bs,
423            RecvItem = MessageHead<T::Incoming>,
424        > + Unpin,
425    D::PollError: Into<Box<dyn StdError + Send + Sync>>,
426    I: AsyncRead + AsyncWrite + Unpin,
427    T: Http1Transaction + Unpin,
428    Bs: HttpBody + 'static,
429    Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
430{
431    type Output = crate::Result<Dispatched>;
432
433    #[inline]
434    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
435        self.poll_catch(cx, true)
436    }
437}
438
439// ===== impl OptGuard =====
440
441/// A drop guard to allow a mutable borrow of an Option while being able to
442/// set whether the `Option` should be cleared on drop.
443struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
444
445impl<'a, T> OptGuard<'a, T> {
446    fn new(pin: Pin<&'a mut Option<T>>) -> Self {
447        OptGuard(pin, false)
448    }
449
450    fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
451        (self.0.as_mut().as_pin_mut(), &mut self.1)
452    }
453}
454
455impl<'a, T> Drop for OptGuard<'a, T> {
456    fn drop(&mut self) {
457        if self.1 {
458            self.0.set(None);
459        }
460    }
461}
462
463// ===== impl Server =====
464
465cfg_server! {
466    impl<S, B> Server<S, B>
467    where
468        S: HttpService<B>,
469    {
470        pub(crate) fn new(service: S) -> Server<S, B> {
471            Server {
472                in_flight: Box::pin(None),
473                service,
474            }
475        }
476
477        pub(crate) fn into_service(self) -> S {
478            self.service
479        }
480    }
481
482    // Service is never pinned
483    impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
484
485    impl<S, Bs> Dispatch for Server<S, Body>
486    where
487        S: HttpService<Body, ResBody = Bs>,
488        S::Error: Into<Box<dyn StdError + Send + Sync>>,
489        Bs: HttpBody,
490    {
491        type PollItem = MessageHead<http::StatusCode>;
492        type PollBody = Bs;
493        type PollError = S::Error;
494        type RecvItem = RequestHead;
495
496        fn poll_msg(
497            mut self: Pin<&mut Self>,
498            cx: &mut Context<'_>,
499        ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
500            let mut this = self.as_mut();
501            let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
502                let resp = ready!(fut.as_mut().poll(cx)?);
503                let (parts, body) = resp.into_parts();
504                let head = MessageHead {
505                    version: parts.version,
506                    subject: parts.status,
507                    headers: parts.headers,
508                    extensions: parts.extensions,
509                };
510                Poll::Ready(Some(Ok((head, body))))
511            } else {
512                unreachable!("poll_msg shouldn't be called if no inflight");
513            };
514
515            // Since in_flight finished, remove it
516            this.in_flight.set(None);
517            ret
518        }
519
520        fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
521            let (msg, body) = msg?;
522            let mut req = Request::new(body);
523            *req.method_mut() = msg.subject.0;
524            *req.uri_mut() = msg.subject.1;
525            *req.headers_mut() = msg.headers;
526            *req.version_mut() = msg.version;
527            *req.extensions_mut() = msg.extensions;
528            let fut = self.service.call(req);
529            self.in_flight.set(Some(fut));
530            Ok(())
531        }
532
533        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
534            if self.in_flight.is_some() {
535                Poll::Pending
536            } else {
537                self.service.poll_ready(cx).map_err(|_e| {
538                    // FIXME: return error value.
539                    trace!("service closed");
540                })
541            }
542        }
543
544        fn should_poll(&self) -> bool {
545            self.in_flight.is_some()
546        }
547    }
548}
549
550// ===== impl Client =====
551
552cfg_client! {
553    impl<B> Client<B> {
554        pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
555            Client {
556                callback: None,
557                rx,
558                rx_closed: false,
559            }
560        }
561    }
562
563    impl<B> Dispatch for Client<B>
564    where
565        B: HttpBody,
566    {
567        type PollItem = RequestHead;
568        type PollBody = B;
569        type PollError = std::convert::Infallible;
570        type RecvItem = crate::proto::ResponseHead;
571
572        fn poll_msg(
573            mut self: Pin<&mut Self>,
574            cx: &mut Context<'_>,
575        ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
576            let mut this = self.as_mut();
577            debug_assert!(!this.rx_closed);
578            match this.rx.poll_recv(cx) {
579                Poll::Ready(Some((req, mut cb))) => {
580                    // check that future hasn't been canceled already
581                    match cb.poll_canceled(cx) {
582                        Poll::Ready(()) => {
583                            trace!("request canceled");
584                            Poll::Ready(None)
585                        }
586                        Poll::Pending => {
587                            let (parts, body) = req.into_parts();
588                            let head = RequestHead {
589                                version: parts.version,
590                                subject: crate::proto::RequestLine(parts.method, parts.uri),
591                                headers: parts.headers,
592                                extensions: parts.extensions,
593                            };
594                            this.callback = Some(cb);
595                            Poll::Ready(Some(Ok((head, body))))
596                        }
597                    }
598                }
599                Poll::Ready(None) => {
600                    // user has dropped sender handle
601                    trace!("client tx closed");
602                    this.rx_closed = true;
603                    Poll::Ready(None)
604                }
605                Poll::Pending => Poll::Pending,
606            }
607        }
608
609        fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
610            match msg {
611                Ok((msg, body)) => {
612                    if let Some(cb) = self.callback.take() {
613                        let res = msg.into_response(body);
614                        cb.send(Ok(res));
615                        Ok(())
616                    } else {
617                        // Getting here is likely a bug! An error should have happened
618                        // in Conn::require_empty_read() before ever parsing a
619                        // full message!
620                        Err(crate::Error::new_unexpected_message())
621                    }
622                }
623                Err(err) => {
624                    if let Some(cb) = self.callback.take() {
625                        cb.send(Err((err, None)));
626                        Ok(())
627                    } else if !self.rx_closed {
628                        self.rx.close();
629                        if let Some((req, cb)) = self.rx.try_recv() {
630                            trace!("canceling queued request with connection error: {}", err);
631                            // in this case, the message was never even started, so it's safe to tell
632                            // the user that the request was completely canceled
633                            cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
634                            Ok(())
635                        } else {
636                            Err(err)
637                        }
638                    } else {
639                        Err(err)
640                    }
641                }
642            }
643        }
644
645        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
646            match self.callback {
647                Some(ref mut cb) => match cb.poll_canceled(cx) {
648                    Poll::Ready(()) => {
649                        trace!("callback receiver has dropped");
650                        Poll::Ready(Err(()))
651                    }
652                    Poll::Pending => Poll::Ready(Ok(())),
653                },
654                None => Poll::Ready(Err(())),
655            }
656        }
657
658        fn should_poll(&self) -> bool {
659            self.callback.is_none()
660        }
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667    use crate::proto::h1::ClientTransaction;
668    use std::time::Duration;
669
670    #[test]
671    fn client_read_bytes_before_writing_request() {
672        let _ = pretty_env_logger::try_init();
673
674        tokio_test::task::spawn(()).enter(|cx, _| {
675            let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
676
677            // Block at 0 for now, but we will release this response before
678            // the request is ready to write later...
679            let (mut tx, rx) = crate::client::dispatch::channel();
680            let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
681            let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
682
683            // First poll is needed to allow tx to send...
684            assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
685
686            // Unblock our IO, which has a response before we've sent request!
687            //
688            handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
689
690            let mut res_rx = tx
691                .try_send(crate::Request::new(crate::Body::empty()))
692                .unwrap();
693
694            tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
695            let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
696                .expect_err("callback should send error");
697
698            match (err.0.kind(), err.1) {
699                (&crate::error::Kind::Canceled, Some(_)) => (),
700                other => panic!("expected Canceled, got {:?}", other),
701            }
702        });
703    }
704
705    #[tokio::test]
706    async fn client_flushing_is_not_ready_for_next_request() {
707        let _ = pretty_env_logger::try_init();
708
709        let (io, _handle) = tokio_test::io::Builder::new()
710            .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
711            .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
712            .wait(std::time::Duration::from_secs(2))
713            .build_with_handle();
714
715        let (mut tx, rx) = crate::client::dispatch::channel();
716        let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
717        conn.set_write_strategy_queue();
718
719        let dispatcher = Dispatcher::new(Client::new(rx), conn);
720        let _dispatcher = tokio::spawn(async move { dispatcher.await });
721
722        let req = crate::Request::builder()
723            .method("POST")
724            .body(crate::Body::from("reee"))
725            .unwrap();
726
727        let res = tx.try_send(req).unwrap().await.expect("response");
728        drop(res);
729
730        assert!(!tx.is_ready());
731    }
732
733    #[tokio::test]
734    async fn body_empty_chunks_ignored() {
735        let _ = pretty_env_logger::try_init();
736
737        let io = tokio_test::io::Builder::new()
738            // no reading or writing, just be blocked for the test...
739            .wait(Duration::from_secs(5))
740            .build();
741
742        let (mut tx, rx) = crate::client::dispatch::channel();
743        let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
744        let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
745
746        // First poll is needed to allow tx to send...
747        assert!(dispatcher.poll().is_pending());
748
749        let body = {
750            let (mut tx, body) = crate::Body::channel();
751            tx.try_send_data("".into()).unwrap();
752            body
753        };
754
755        let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
756
757        // Ensure conn.write_body wasn't called with the empty chunk.
758        // If it is, it will trigger an assertion.
759        assert!(dispatcher.poll().is_pending());
760    }
761}