hyper/proto/h1/
conn.rs

1use std::fmt;
2use std::io;
3use std::marker::PhantomData;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7#[cfg(all(feature = "server", feature = "runtime"))]
8use std::time::Duration;
9
10use bytes::{Buf, Bytes};
11use http::header::{HeaderValue, CONNECTION};
12use http::{HeaderMap, Method, Version};
13use httparse::ParserConfig;
14use tokio::io::{AsyncRead, AsyncWrite};
15#[cfg(all(feature = "server", feature = "runtime"))]
16use tokio::time::Sleep;
17use tracing::{debug, error, trace};
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22use crate::headers::connection_keep_alive;
23use crate::proto::{BodyLength, MessageHead};
24
25const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
26
27/// This handles a connection, which will have been established over an
28/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
29/// `Transaction`s over HTTP.
30///
31/// The connection will determine when a message begins and ends as well as
32/// determine if this connection can be kept alive after the message,
33/// or if it is complete.
34pub(crate) struct Conn<I, B, T> {
35    io: Buffered<I, EncodedBuf<B>>,
36    state: State,
37    _marker: PhantomData<fn(T)>,
38}
39
40impl<I, B, T> Conn<I, B, T>
41where
42    I: AsyncRead + AsyncWrite + Unpin,
43    B: Buf,
44    T: Http1Transaction,
45{
46    pub(crate) fn new(io: I) -> Conn<I, B, T> {
47        Conn {
48            io: Buffered::new(io),
49            state: State {
50                allow_half_close: false,
51                cached_headers: None,
52                error: None,
53                keep_alive: KA::Busy,
54                method: None,
55                h1_parser_config: ParserConfig::default(),
56                #[cfg(all(feature = "server", feature = "runtime"))]
57                h1_header_read_timeout: None,
58                #[cfg(all(feature = "server", feature = "runtime"))]
59                h1_header_read_timeout_fut: None,
60                #[cfg(all(feature = "server", feature = "runtime"))]
61                h1_header_read_timeout_running: false,
62                preserve_header_case: false,
63                #[cfg(feature = "ffi")]
64                preserve_header_order: false,
65                title_case_headers: false,
66                h09_responses: false,
67                #[cfg(feature = "ffi")]
68                on_informational: None,
69                #[cfg(feature = "ffi")]
70                raw_headers: false,
71                notify_read: false,
72                reading: Reading::Init,
73                writing: Writing::Init,
74                upgrade: None,
75                // We assume a modern world where the remote speaks HTTP/1.1.
76                // If they tell us otherwise, we'll downgrade in `read_head`.
77                version: Version::HTTP_11,
78            },
79            _marker: PhantomData,
80        }
81    }
82
83    #[cfg(feature = "server")]
84    pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
85        self.io.set_flush_pipeline(enabled);
86    }
87
88    pub(crate) fn set_write_strategy_queue(&mut self) {
89        self.io.set_write_strategy_queue();
90    }
91
92    pub(crate) fn set_max_buf_size(&mut self, max: usize) {
93        self.io.set_max_buf_size(max);
94    }
95
96    #[cfg(feature = "client")]
97    pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
98        self.io.set_read_buf_exact_size(sz);
99    }
100
101    pub(crate) fn set_write_strategy_flatten(&mut self) {
102        self.io.set_write_strategy_flatten();
103    }
104
105    #[cfg(feature = "client")]
106    pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
107        self.state.h1_parser_config = parser_config;
108    }
109
110    pub(crate) fn set_title_case_headers(&mut self) {
111        self.state.title_case_headers = true;
112    }
113
114    pub(crate) fn set_preserve_header_case(&mut self) {
115        self.state.preserve_header_case = true;
116    }
117
118    #[cfg(feature = "ffi")]
119    pub(crate) fn set_preserve_header_order(&mut self) {
120        self.state.preserve_header_order = true;
121    }
122
123    #[cfg(feature = "client")]
124    pub(crate) fn set_h09_responses(&mut self) {
125        self.state.h09_responses = true;
126    }
127
128    #[cfg(all(feature = "server", feature = "runtime"))]
129    pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
130        self.state.h1_header_read_timeout = Some(val);
131    }
132
133    #[cfg(feature = "server")]
134    pub(crate) fn set_allow_half_close(&mut self) {
135        self.state.allow_half_close = true;
136    }
137
138    #[cfg(feature = "ffi")]
139    pub(crate) fn set_raw_headers(&mut self, enabled: bool) {
140        self.state.raw_headers = enabled;
141    }
142
143    pub(crate) fn into_inner(self) -> (I, Bytes) {
144        self.io.into_inner()
145    }
146
147    pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
148        self.state.upgrade.take()
149    }
150
151    pub(crate) fn is_read_closed(&self) -> bool {
152        self.state.is_read_closed()
153    }
154
155    pub(crate) fn is_write_closed(&self) -> bool {
156        self.state.is_write_closed()
157    }
158
159    pub(crate) fn can_read_head(&self) -> bool {
160        if !matches!(self.state.reading, Reading::Init) {
161            return false;
162        }
163
164        if T::should_read_first() {
165            return true;
166        }
167
168        !matches!(self.state.writing, Writing::Init)
169    }
170
171    pub(crate) fn can_read_body(&self) -> bool {
172        match self.state.reading {
173            Reading::Body(..) | Reading::Continue(..) => true,
174            _ => false,
175        }
176    }
177
178    fn should_error_on_eof(&self) -> bool {
179        // If we're idle, it's probably just the connection closing gracefully.
180        T::should_error_on_parse_eof() && !self.state.is_idle()
181    }
182
183    fn has_h2_prefix(&self) -> bool {
184        let read_buf = self.io.read_buf();
185        read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
186    }
187
188    pub(super) fn poll_read_head(
189        &mut self,
190        cx: &mut Context<'_>,
191    ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
192        debug_assert!(self.can_read_head());
193        trace!("Conn::read_head");
194
195        let msg = match ready!(self.io.parse::<T>(
196            cx,
197            ParseContext {
198                cached_headers: &mut self.state.cached_headers,
199                req_method: &mut self.state.method,
200                h1_parser_config: self.state.h1_parser_config.clone(),
201                #[cfg(all(feature = "server", feature = "runtime"))]
202                h1_header_read_timeout: self.state.h1_header_read_timeout,
203                #[cfg(all(feature = "server", feature = "runtime"))]
204                h1_header_read_timeout_fut: &mut self.state.h1_header_read_timeout_fut,
205                #[cfg(all(feature = "server", feature = "runtime"))]
206                h1_header_read_timeout_running: &mut self.state.h1_header_read_timeout_running,
207                preserve_header_case: self.state.preserve_header_case,
208                #[cfg(feature = "ffi")]
209                preserve_header_order: self.state.preserve_header_order,
210                h09_responses: self.state.h09_responses,
211                #[cfg(feature = "ffi")]
212                on_informational: &mut self.state.on_informational,
213                #[cfg(feature = "ffi")]
214                raw_headers: self.state.raw_headers,
215            }
216        )) {
217            Ok(msg) => msg,
218            Err(e) => return self.on_read_head_error(e),
219        };
220
221        // Note: don't deconstruct `msg` into local variables, it appears
222        // the optimizer doesn't remove the extra copies.
223
224        debug!("incoming body is {}", msg.decode);
225
226        // Prevent accepting HTTP/0.9 responses after the initial one, if any.
227        self.state.h09_responses = false;
228
229        // Drop any OnInformational callbacks, we're done there!
230        #[cfg(feature = "ffi")]
231        {
232            self.state.on_informational = None;
233        }
234
235        self.state.busy();
236        self.state.keep_alive &= msg.keep_alive;
237        self.state.version = msg.head.version;
238
239        let mut wants = if msg.wants_upgrade {
240            Wants::UPGRADE
241        } else {
242            Wants::EMPTY
243        };
244
245        if msg.decode == DecodedLength::ZERO {
246            if msg.expect_continue {
247                debug!("ignoring expect-continue since body is empty");
248            }
249            self.state.reading = Reading::KeepAlive;
250            if !T::should_read_first() {
251                self.try_keep_alive(cx);
252            }
253        } else if msg.expect_continue {
254            self.state.reading = Reading::Continue(Decoder::new(msg.decode));
255            wants = wants.add(Wants::EXPECT);
256        } else {
257            self.state.reading = Reading::Body(Decoder::new(msg.decode));
258        }
259
260        Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
261    }
262
263    fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
264        // If we are currently waiting on a message, then an empty
265        // message should be reported as an error. If not, it is just
266        // the connection closing gracefully.
267        let must_error = self.should_error_on_eof();
268        self.close_read();
269        self.io.consume_leading_lines();
270        let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
271        if was_mid_parse || must_error {
272            // We check if the buf contains the h2 Preface
273            debug!(
274                "parse error ({}) with {} bytes",
275                e,
276                self.io.read_buf().len()
277            );
278            match self.on_parse_error(e) {
279                Ok(()) => Poll::Pending, // XXX: wat?
280                Err(e) => Poll::Ready(Some(Err(e))),
281            }
282        } else {
283            debug!("read eof");
284            self.close_write();
285            Poll::Ready(None)
286        }
287    }
288
289    pub(crate) fn poll_read_body(
290        &mut self,
291        cx: &mut Context<'_>,
292    ) -> Poll<Option<io::Result<Bytes>>> {
293        debug_assert!(self.can_read_body());
294
295        let (reading, ret) = match self.state.reading {
296            Reading::Body(ref mut decoder) => {
297                match ready!(decoder.decode(cx, &mut self.io)) {
298                    Ok(slice) => {
299                        let (reading, chunk) = if decoder.is_eof() {
300                            debug!("incoming body completed");
301                            (
302                                Reading::KeepAlive,
303                                if !slice.is_empty() {
304                                    Some(Ok(slice))
305                                } else {
306                                    None
307                                },
308                            )
309                        } else if slice.is_empty() {
310                            error!("incoming body unexpectedly ended");
311                            // This should be unreachable, since all 3 decoders
312                            // either set eof=true or return an Err when reading
313                            // an empty slice...
314                            (Reading::Closed, None)
315                        } else {
316                            return Poll::Ready(Some(Ok(slice)));
317                        };
318                        (reading, Poll::Ready(chunk))
319                    }
320                    Err(e) => {
321                        debug!("incoming body decode error: {}", e);
322                        (Reading::Closed, Poll::Ready(Some(Err(e))))
323                    }
324                }
325            }
326            Reading::Continue(ref decoder) => {
327                // Write the 100 Continue if not already responded...
328                if let Writing::Init = self.state.writing {
329                    trace!("automatically sending 100 Continue");
330                    let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
331                    self.io.headers_buf().extend_from_slice(cont);
332                }
333
334                // And now recurse once in the Reading::Body state...
335                self.state.reading = Reading::Body(decoder.clone());
336                return self.poll_read_body(cx);
337            }
338            _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
339        };
340
341        self.state.reading = reading;
342        self.try_keep_alive(cx);
343        ret
344    }
345
346    pub(crate) fn wants_read_again(&mut self) -> bool {
347        let ret = self.state.notify_read;
348        self.state.notify_read = false;
349        ret
350    }
351
352    pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
353        debug_assert!(!self.can_read_head() && !self.can_read_body());
354
355        if self.is_read_closed() {
356            Poll::Pending
357        } else if self.is_mid_message() {
358            self.mid_message_detect_eof(cx)
359        } else {
360            self.require_empty_read(cx)
361        }
362    }
363
364    fn is_mid_message(&self) -> bool {
365        !matches!(
366            (&self.state.reading, &self.state.writing),
367            (&Reading::Init, &Writing::Init)
368        )
369    }
370
371    // This will check to make sure the io object read is empty.
372    //
373    // This should only be called for Clients wanting to enter the idle
374    // state.
375    fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
376        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
377        debug_assert!(!self.is_mid_message());
378        debug_assert!(T::is_client());
379
380        if !self.io.read_buf().is_empty() {
381            debug!("received an unexpected {} bytes", self.io.read_buf().len());
382            return Poll::Ready(Err(crate::Error::new_unexpected_message()));
383        }
384
385        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
386
387        if num_read == 0 {
388            let ret = if self.should_error_on_eof() {
389                trace!("found unexpected EOF on busy connection: {:?}", self.state);
390                Poll::Ready(Err(crate::Error::new_incomplete()))
391            } else {
392                trace!("found EOF on idle connection, closing");
393                Poll::Ready(Ok(()))
394            };
395
396            // order is important: should_error needs state BEFORE close_read
397            self.state.close_read();
398            return ret;
399        }
400
401        debug!(
402            "received unexpected {} bytes on an idle connection",
403            num_read
404        );
405        Poll::Ready(Err(crate::Error::new_unexpected_message()))
406    }
407
408    fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
409        debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
410        debug_assert!(self.is_mid_message());
411
412        if self.state.allow_half_close || !self.io.read_buf().is_empty() {
413            return Poll::Pending;
414        }
415
416        let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
417
418        if num_read == 0 {
419            trace!("found unexpected EOF on busy connection: {:?}", self.state);
420            self.state.close_read();
421            Poll::Ready(Err(crate::Error::new_incomplete()))
422        } else {
423            Poll::Ready(Ok(()))
424        }
425    }
426
427    fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
428        debug_assert!(!self.state.is_read_closed());
429
430        let result = ready!(self.io.poll_read_from_io(cx));
431        Poll::Ready(result.map_err(|e| {
432            trace!("force_io_read; io error = {:?}", e);
433            self.state.close();
434            e
435        }))
436    }
437
438    fn maybe_notify(&mut self, cx: &mut Context<'_>) {
439        // its possible that we returned NotReady from poll() without having
440        // exhausted the underlying Io. We would have done this when we
441        // determined we couldn't keep reading until we knew how writing
442        // would finish.
443
444        match self.state.reading {
445            Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
446                return
447            }
448            Reading::Init => (),
449        };
450
451        match self.state.writing {
452            Writing::Body(..) => return,
453            Writing::Init | Writing::KeepAlive | Writing::Closed => (),
454        }
455
456        if !self.io.is_read_blocked() {
457            if self.io.read_buf().is_empty() {
458                match self.io.poll_read_from_io(cx) {
459                    Poll::Ready(Ok(n)) => {
460                        if n == 0 {
461                            trace!("maybe_notify; read eof");
462                            if self.state.is_idle() {
463                                self.state.close();
464                            } else {
465                                self.close_read()
466                            }
467                            return;
468                        }
469                    }
470                    Poll::Pending => {
471                        trace!("maybe_notify; read_from_io blocked");
472                        return;
473                    }
474                    Poll::Ready(Err(e)) => {
475                        trace!("maybe_notify; read_from_io error: {}", e);
476                        self.state.close();
477                        self.state.error = Some(crate::Error::new_io(e));
478                    }
479                }
480            }
481            self.state.notify_read = true;
482        }
483    }
484
485    fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
486        self.state.try_keep_alive::<T>();
487        self.maybe_notify(cx);
488    }
489
490    pub(crate) fn can_write_head(&self) -> bool {
491        if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
492            return false;
493        }
494
495        match self.state.writing {
496            Writing::Init => self.io.can_headers_buf(),
497            _ => false,
498        }
499    }
500
501    pub(crate) fn can_write_body(&self) -> bool {
502        match self.state.writing {
503            Writing::Body(..) => true,
504            Writing::Init | Writing::KeepAlive | Writing::Closed => false,
505        }
506    }
507
508    pub(crate) fn can_buffer_body(&self) -> bool {
509        self.io.can_buffer()
510    }
511
512    pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
513        if let Some(encoder) = self.encode_head(head, body) {
514            self.state.writing = if !encoder.is_eof() {
515                Writing::Body(encoder)
516            } else if encoder.is_last() {
517                Writing::Closed
518            } else {
519                Writing::KeepAlive
520            };
521        }
522    }
523
524    pub(crate) fn write_full_msg(&mut self, head: MessageHead<T::Outgoing>, body: B) {
525        if let Some(encoder) =
526            self.encode_head(head, Some(BodyLength::Known(body.remaining() as u64)))
527        {
528            let is_last = encoder.is_last();
529            // Make sure we don't write a body if we weren't actually allowed
530            // to do so, like because its a HEAD request.
531            if !encoder.is_eof() {
532                encoder.danger_full_buf(body, self.io.write_buf());
533            }
534            self.state.writing = if is_last {
535                Writing::Closed
536            } else {
537                Writing::KeepAlive
538            }
539        }
540    }
541
542    fn encode_head(
543        &mut self,
544        mut head: MessageHead<T::Outgoing>,
545        body: Option<BodyLength>,
546    ) -> Option<Encoder> {
547        debug_assert!(self.can_write_head());
548
549        if !T::should_read_first() {
550            self.state.busy();
551        }
552
553        self.enforce_version(&mut head);
554
555        let buf = self.io.headers_buf();
556        match super::role::encode_headers::<T>(
557            Encode {
558                head: &mut head,
559                body,
560                #[cfg(feature = "server")]
561                keep_alive: self.state.wants_keep_alive(),
562                req_method: &mut self.state.method,
563                title_case_headers: self.state.title_case_headers,
564            },
565            buf,
566        ) {
567            Ok(encoder) => {
568                debug_assert!(self.state.cached_headers.is_none());
569                debug_assert!(head.headers.is_empty());
570                self.state.cached_headers = Some(head.headers);
571
572                #[cfg(feature = "ffi")]
573                {
574                    self.state.on_informational =
575                        head.extensions.remove::<crate::ffi::OnInformational>();
576                }
577
578                Some(encoder)
579            }
580            Err(err) => {
581                self.state.error = Some(err);
582                self.state.writing = Writing::Closed;
583                None
584            }
585        }
586    }
587
588    // Fix keep-alive when Connection: keep-alive header is not present
589    fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
590        let outgoing_is_keep_alive = head
591            .headers
592            .get(CONNECTION)
593            .map(connection_keep_alive)
594            .unwrap_or(false);
595
596        if !outgoing_is_keep_alive {
597            match head.version {
598                // If response is version 1.0 and keep-alive is not present in the response,
599                // disable keep-alive so the server closes the connection
600                Version::HTTP_10 => self.state.disable_keep_alive(),
601                // If response is version 1.1 and keep-alive is wanted, add
602                // Connection: keep-alive header when not present
603                Version::HTTP_11 => {
604                    if self.state.wants_keep_alive() {
605                        head.headers
606                            .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
607                    }
608                }
609                _ => (),
610            }
611        }
612    }
613
614    // If we know the remote speaks an older version, we try to fix up any messages
615    // to work with our older peer.
616    fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
617        if let Version::HTTP_10 = self.state.version {
618            // Fixes response or connection when keep-alive header is not present
619            self.fix_keep_alive(head);
620            // If the remote only knows HTTP/1.0, we should force ourselves
621            // to do only speak HTTP/1.0 as well.
622            head.version = Version::HTTP_10;
623        }
624        // If the remote speaks HTTP/1.1, then it *should* be fine with
625        // both HTTP/1.0 and HTTP/1.1 from us. So again, we just let
626        // the user's headers be.
627    }
628
629    pub(crate) fn write_body(&mut self, chunk: B) {
630        debug_assert!(self.can_write_body() && self.can_buffer_body());
631        // empty chunks should be discarded at Dispatcher level
632        debug_assert!(chunk.remaining() != 0);
633
634        let state = match self.state.writing {
635            Writing::Body(ref mut encoder) => {
636                self.io.buffer(encoder.encode(chunk));
637
638                if !encoder.is_eof() {
639                    return;
640                }
641
642                if encoder.is_last() {
643                    Writing::Closed
644                } else {
645                    Writing::KeepAlive
646                }
647            }
648            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
649        };
650
651        self.state.writing = state;
652    }
653
654    pub(crate) fn write_body_and_end(&mut self, chunk: B) {
655        debug_assert!(self.can_write_body() && self.can_buffer_body());
656        // empty chunks should be discarded at Dispatcher level
657        debug_assert!(chunk.remaining() != 0);
658
659        let state = match self.state.writing {
660            Writing::Body(ref encoder) => {
661                let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
662                if can_keep_alive {
663                    Writing::KeepAlive
664                } else {
665                    Writing::Closed
666                }
667            }
668            _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
669        };
670
671        self.state.writing = state;
672    }
673
674    pub(crate) fn end_body(&mut self) -> crate::Result<()> {
675        debug_assert!(self.can_write_body());
676
677        let encoder = match self.state.writing {
678            Writing::Body(ref mut enc) => enc,
679            _ => return Ok(()),
680        };
681
682        // end of stream, that means we should try to eof
683        match encoder.end() {
684            Ok(end) => {
685                if let Some(end) = end {
686                    self.io.buffer(end);
687                }
688
689                self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
690                    Writing::Closed
691                } else {
692                    Writing::KeepAlive
693                };
694
695                Ok(())
696            }
697            Err(not_eof) => {
698                self.state.writing = Writing::Closed;
699                Err(crate::Error::new_body_write_aborted().with(not_eof))
700            }
701        }
702    }
703
704    // When we get a parse error, depending on what side we are, we might be able
705    // to write a response before closing the connection.
706    //
707    // - Client: there is nothing we can do
708    // - Server: if Response hasn't been written yet, we can send a 4xx response
709    fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
710        if let Writing::Init = self.state.writing {
711            if self.has_h2_prefix() {
712                return Err(crate::Error::new_version_h2());
713            }
714            if let Some(msg) = T::on_error(&err) {
715                // Drop the cached headers so as to not trigger a debug
716                // assert in `write_head`...
717                self.state.cached_headers.take();
718                self.write_head(msg, None);
719                self.state.error = Some(err);
720                return Ok(());
721            }
722        }
723
724        // fallback is pass the error back up
725        Err(err)
726    }
727
728    pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
729        ready!(Pin::new(&mut self.io).poll_flush(cx))?;
730        self.try_keep_alive(cx);
731        trace!("flushed({}): {:?}", T::LOG, self.state);
732        Poll::Ready(Ok(()))
733    }
734
735    pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
736        match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
737            Ok(()) => {
738                trace!("shut down IO complete");
739                Poll::Ready(Ok(()))
740            }
741            Err(e) => {
742                debug!("error shutting down IO: {}", e);
743                Poll::Ready(Err(e))
744            }
745        }
746    }
747
748    /// If the read side can be cheaply drained, do so. Otherwise, close.
749    pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
750        if let Reading::Continue(ref decoder) = self.state.reading {
751            // skip sending the 100-continue
752            // just move forward to a read, in case a tiny body was included
753            self.state.reading = Reading::Body(decoder.clone());
754        }
755
756        let _ = self.poll_read_body(cx);
757
758        // If still in Reading::Body, just give up
759        match self.state.reading {
760            Reading::Init | Reading::KeepAlive => trace!("body drained"),
761            _ => self.close_read(),
762        }
763    }
764
765    pub(crate) fn close_read(&mut self) {
766        self.state.close_read();
767    }
768
769    pub(crate) fn close_write(&mut self) {
770        self.state.close_write();
771    }
772
773    #[cfg(feature = "server")]
774    pub(crate) fn disable_keep_alive(&mut self) {
775        if self.state.is_idle() {
776            trace!("disable_keep_alive; closing idle connection");
777            self.state.close();
778        } else {
779            trace!("disable_keep_alive; in-progress connection");
780            self.state.disable_keep_alive();
781        }
782    }
783
784    pub(crate) fn take_error(&mut self) -> crate::Result<()> {
785        if let Some(err) = self.state.error.take() {
786            Err(err)
787        } else {
788            Ok(())
789        }
790    }
791
792    pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
793        trace!("{}: prepare possible HTTP upgrade", T::LOG);
794        self.state.prepare_upgrade()
795    }
796}
797
798impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
799    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800        f.debug_struct("Conn")
801            .field("state", &self.state)
802            .field("io", &self.io)
803            .finish()
804    }
805}
806
807// B and T are never pinned
808impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
809
810struct State {
811    allow_half_close: bool,
812    /// Re-usable HeaderMap to reduce allocating new ones.
813    cached_headers: Option<HeaderMap>,
814    /// If an error occurs when there wasn't a direct way to return it
815    /// back to the user, this is set.
816    error: Option<crate::Error>,
817    /// Current keep-alive status.
818    keep_alive: KA,
819    /// If mid-message, the HTTP Method that started it.
820    ///
821    /// This is used to know things such as if the message can include
822    /// a body or not.
823    method: Option<Method>,
824    h1_parser_config: ParserConfig,
825    #[cfg(all(feature = "server", feature = "runtime"))]
826    h1_header_read_timeout: Option<Duration>,
827    #[cfg(all(feature = "server", feature = "runtime"))]
828    h1_header_read_timeout_fut: Option<Pin<Box<Sleep>>>,
829    #[cfg(all(feature = "server", feature = "runtime"))]
830    h1_header_read_timeout_running: bool,
831    preserve_header_case: bool,
832    #[cfg(feature = "ffi")]
833    preserve_header_order: bool,
834    title_case_headers: bool,
835    h09_responses: bool,
836    /// If set, called with each 1xx informational response received for
837    /// the current request. MUST be unset after a non-1xx response is
838    /// received.
839    #[cfg(feature = "ffi")]
840    on_informational: Option<crate::ffi::OnInformational>,
841    #[cfg(feature = "ffi")]
842    raw_headers: bool,
843    /// Set to true when the Dispatcher should poll read operations
844    /// again. See the `maybe_notify` method for more.
845    notify_read: bool,
846    /// State of allowed reads
847    reading: Reading,
848    /// State of allowed writes
849    writing: Writing,
850    /// An expected pending HTTP upgrade.
851    upgrade: Option<crate::upgrade::Pending>,
852    /// Either HTTP/1.0 or 1.1 connection
853    version: Version,
854}
855
856#[derive(Debug)]
857enum Reading {
858    Init,
859    Continue(Decoder),
860    Body(Decoder),
861    KeepAlive,
862    Closed,
863}
864
865enum Writing {
866    Init,
867    Body(Encoder),
868    KeepAlive,
869    Closed,
870}
871
872impl fmt::Debug for State {
873    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874        let mut builder = f.debug_struct("State");
875        builder
876            .field("reading", &self.reading)
877            .field("writing", &self.writing)
878            .field("keep_alive", &self.keep_alive);
879
880        // Only show error field if it's interesting...
881        if let Some(ref error) = self.error {
882            builder.field("error", error);
883        }
884
885        if self.allow_half_close {
886            builder.field("allow_half_close", &true);
887        }
888
889        // Purposefully leaving off other fields..
890
891        builder.finish()
892    }
893}
894
895impl fmt::Debug for Writing {
896    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
897        match *self {
898            Writing::Init => f.write_str("Init"),
899            Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
900            Writing::KeepAlive => f.write_str("KeepAlive"),
901            Writing::Closed => f.write_str("Closed"),
902        }
903    }
904}
905
906impl std::ops::BitAndAssign<bool> for KA {
907    fn bitand_assign(&mut self, enabled: bool) {
908        if !enabled {
909            trace!("remote disabling keep-alive");
910            *self = KA::Disabled;
911        }
912    }
913}
914
915#[derive(Clone, Copy, Debug)]
916enum KA {
917    Idle,
918    Busy,
919    Disabled,
920}
921
922impl Default for KA {
923    fn default() -> KA {
924        KA::Busy
925    }
926}
927
928impl KA {
929    fn idle(&mut self) {
930        *self = KA::Idle;
931    }
932
933    fn busy(&mut self) {
934        *self = KA::Busy;
935    }
936
937    fn disable(&mut self) {
938        *self = KA::Disabled;
939    }
940
941    fn status(&self) -> KA {
942        *self
943    }
944}
945
946impl State {
947    fn close(&mut self) {
948        trace!("State::close()");
949        self.reading = Reading::Closed;
950        self.writing = Writing::Closed;
951        self.keep_alive.disable();
952    }
953
954    fn close_read(&mut self) {
955        trace!("State::close_read()");
956        self.reading = Reading::Closed;
957        self.keep_alive.disable();
958    }
959
960    fn close_write(&mut self) {
961        trace!("State::close_write()");
962        self.writing = Writing::Closed;
963        self.keep_alive.disable();
964    }
965
966    fn wants_keep_alive(&self) -> bool {
967        if let KA::Disabled = self.keep_alive.status() {
968            false
969        } else {
970            true
971        }
972    }
973
974    fn try_keep_alive<T: Http1Transaction>(&mut self) {
975        match (&self.reading, &self.writing) {
976            (&Reading::KeepAlive, &Writing::KeepAlive) => {
977                if let KA::Busy = self.keep_alive.status() {
978                    self.idle::<T>();
979                } else {
980                    trace!(
981                        "try_keep_alive({}): could keep-alive, but status = {:?}",
982                        T::LOG,
983                        self.keep_alive
984                    );
985                    self.close();
986                }
987            }
988            (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
989                self.close()
990            }
991            _ => (),
992        }
993    }
994
995    fn disable_keep_alive(&mut self) {
996        self.keep_alive.disable()
997    }
998
999    fn busy(&mut self) {
1000        if let KA::Disabled = self.keep_alive.status() {
1001            return;
1002        }
1003        self.keep_alive.busy();
1004    }
1005
1006    fn idle<T: Http1Transaction>(&mut self) {
1007        debug_assert!(!self.is_idle(), "State::idle() called while idle");
1008
1009        self.method = None;
1010        self.keep_alive.idle();
1011
1012        if !self.is_idle() {
1013            self.close();
1014            return;
1015        }
1016
1017        self.reading = Reading::Init;
1018        self.writing = Writing::Init;
1019
1020        // !T::should_read_first() means Client.
1021        //
1022        // If Client connection has just gone idle, the Dispatcher
1023        // should try the poll loop one more time, so as to poll the
1024        // pending requests stream.
1025        if !T::should_read_first() {
1026            self.notify_read = true;
1027        }
1028    }
1029
1030    fn is_idle(&self) -> bool {
1031        matches!(self.keep_alive.status(), KA::Idle)
1032    }
1033
1034    fn is_read_closed(&self) -> bool {
1035        matches!(self.reading, Reading::Closed)
1036    }
1037
1038    fn is_write_closed(&self) -> bool {
1039        matches!(self.writing, Writing::Closed)
1040    }
1041
1042    fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1043        let (tx, rx) = crate::upgrade::pending();
1044        self.upgrade = Some(tx);
1045        rx
1046    }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051    #[cfg(feature = "nightly")]
1052    #[bench]
1053    fn bench_read_head_short(b: &mut ::test::Bencher) {
1054        use super::*;
1055        let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1056        let len = s.len();
1057        b.bytes = len as u64;
1058
1059        // an empty IO, we'll be skipping and using the read buffer anyways
1060        let io = tokio_test::io::Builder::new().build();
1061        let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1062        *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1063        conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1064
1065        let rt = tokio::runtime::Builder::new_current_thread()
1066            .enable_all()
1067            .build()
1068            .unwrap();
1069
1070        b.iter(|| {
1071            rt.block_on(futures_util::future::poll_fn(|cx| {
1072                match conn.poll_read_head(cx) {
1073                    Poll::Ready(Some(Ok(x))) => {
1074                        ::test::black_box(&x);
1075                        let mut headers = x.0.headers;
1076                        headers.clear();
1077                        conn.state.cached_headers = Some(headers);
1078                    }
1079                    f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1080                }
1081
1082                conn.io.read_buf_mut().reserve(1);
1083                unsafe {
1084                    conn.io.read_buf_mut().set_len(len);
1085                }
1086                conn.state.reading = Reading::Init;
1087                Poll::Ready(())
1088            }));
1089        });
1090    }
1091
1092    /*
1093    //TODO: rewrite these using dispatch... someday...
1094    use futures::{Async, Future, Stream, Sink};
1095    use futures::future;
1096
1097    use proto::{self, ClientTransaction, MessageHead, ServerTransaction};
1098    use super::super::Encoder;
1099    use mock::AsyncIo;
1100
1101    use super::{Conn, Decoder, Reading, Writing};
1102    use ::uri::Uri;
1103
1104    use std::str::FromStr;
1105
1106    #[test]
1107    fn test_conn_init_read() {
1108        let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec();
1109        let len = good_message.len();
1110        let io = AsyncIo::new_buf(good_message, len);
1111        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1112
1113        match conn.poll().unwrap() {
1114            Async::Ready(Some(Frame::Message { message, body: false })) => {
1115                assert_eq!(message, MessageHead {
1116                    subject: ::proto::RequestLine(::Get, Uri::from_str("/").unwrap()),
1117                    .. MessageHead::default()
1118                })
1119            },
1120            f => panic!("frame is not Frame::Message: {:?}", f)
1121        }
1122    }
1123
1124    #[test]
1125    fn test_conn_parse_partial() {
1126        let _: Result<(), ()> = future::lazy(|| {
1127            let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
1128            let io = AsyncIo::new_buf(good_message, 10);
1129            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1130            assert!(conn.poll().unwrap().is_not_ready());
1131            conn.io.io_mut().block_in(50);
1132            let async = conn.poll().unwrap();
1133            assert!(async.is_ready());
1134            match async {
1135                Async::Ready(Some(Frame::Message { .. })) => (),
1136                f => panic!("frame is not Message: {:?}", f),
1137            }
1138            Ok(())
1139        }).wait();
1140    }
1141
1142    #[test]
1143    fn test_conn_init_read_eof_idle() {
1144        let io = AsyncIo::new_buf(vec![], 1);
1145        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1146        conn.state.idle();
1147
1148        match conn.poll().unwrap() {
1149            Async::Ready(None) => {},
1150            other => panic!("frame is not None: {:?}", other)
1151        }
1152    }
1153
1154    #[test]
1155    fn test_conn_init_read_eof_idle_partial_parse() {
1156        let io = AsyncIo::new_buf(b"GET / HTTP/1.1".to_vec(), 100);
1157        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1158        conn.state.idle();
1159
1160        match conn.poll() {
1161            Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1162            other => panic!("unexpected frame: {:?}", other)
1163        }
1164    }
1165
1166    #[test]
1167    fn test_conn_init_read_eof_busy() {
1168        let _: Result<(), ()> = future::lazy(|| {
1169            // server ignores
1170            let io = AsyncIo::new_eof();
1171            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1172            conn.state.busy();
1173
1174            match conn.poll().unwrap() {
1175                Async::Ready(None) => {},
1176                other => panic!("unexpected frame: {:?}", other)
1177            }
1178
1179            // client
1180            let io = AsyncIo::new_eof();
1181            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1182            conn.state.busy();
1183
1184            match conn.poll() {
1185                Err(ref err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {},
1186                other => panic!("unexpected frame: {:?}", other)
1187            }
1188            Ok(())
1189        }).wait();
1190    }
1191
1192    #[test]
1193    fn test_conn_body_finish_read_eof() {
1194        let _: Result<(), ()> = future::lazy(|| {
1195            let io = AsyncIo::new_eof();
1196            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1197            conn.state.busy();
1198            conn.state.writing = Writing::KeepAlive;
1199            conn.state.reading = Reading::Body(Decoder::length(0));
1200
1201            match conn.poll() {
1202                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1203                other => panic!("unexpected frame: {:?}", other)
1204            }
1205
1206            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1207            // the conn eof in this case is perfectly fine
1208
1209            match conn.poll() {
1210                Ok(Async::Ready(None)) => (),
1211                other => panic!("unexpected frame: {:?}", other)
1212            }
1213            Ok(())
1214        }).wait();
1215    }
1216
1217    #[test]
1218    fn test_conn_message_empty_body_read_eof() {
1219        let _: Result<(), ()> = future::lazy(|| {
1220            let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".to_vec(), 1024);
1221            let mut conn = Conn::<_, proto::Bytes, ClientTransaction>::new(io);
1222            conn.state.busy();
1223            conn.state.writing = Writing::KeepAlive;
1224
1225            match conn.poll() {
1226                Ok(Async::Ready(Some(Frame::Message { body: false, .. }))) => (),
1227                other => panic!("unexpected frame: {:?}", other)
1228            }
1229
1230            // conn eofs, but tokio-proto will call poll() again, before calling flush()
1231            // the conn eof in this case is perfectly fine
1232
1233            match conn.poll() {
1234                Ok(Async::Ready(None)) => (),
1235                other => panic!("unexpected frame: {:?}", other)
1236            }
1237            Ok(())
1238        }).wait();
1239    }
1240
1241    #[test]
1242    fn test_conn_read_body_end() {
1243        let _: Result<(), ()> = future::lazy(|| {
1244            let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024);
1245            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1246            conn.state.busy();
1247
1248            match conn.poll() {
1249                Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (),
1250                other => panic!("unexpected frame: {:?}", other)
1251            }
1252
1253            match conn.poll() {
1254                Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (),
1255                other => panic!("unexpected frame: {:?}", other)
1256            }
1257
1258            // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None`
1259            match conn.poll() {
1260                Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (),
1261                other => panic!("unexpected frame: {:?}", other)
1262            }
1263
1264            match conn.poll() {
1265                Ok(Async::NotReady) => (),
1266                other => panic!("unexpected frame: {:?}", other)
1267            }
1268            Ok(())
1269        }).wait();
1270    }
1271
1272    #[test]
1273    fn test_conn_closed_read() {
1274        let io = AsyncIo::new_buf(vec![], 0);
1275        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1276        conn.state.close();
1277
1278        match conn.poll().unwrap() {
1279            Async::Ready(None) => {},
1280            other => panic!("frame is not None: {:?}", other)
1281        }
1282    }
1283
1284    #[test]
1285    fn test_conn_body_write_length() {
1286        let _ = pretty_env_logger::try_init();
1287        let _: Result<(), ()> = future::lazy(|| {
1288            let io = AsyncIo::new_buf(vec![], 0);
1289            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1290            let max = super::super::io::DEFAULT_MAX_BUFFER_SIZE + 4096;
1291            conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64));
1292
1293            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready());
1294            assert!(!conn.can_buffer_body());
1295
1296            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready());
1297
1298            conn.io.io_mut().block_in(1024 * 3);
1299            assert!(conn.poll_complete().unwrap().is_not_ready());
1300            conn.io.io_mut().block_in(1024 * 3);
1301            assert!(conn.poll_complete().unwrap().is_not_ready());
1302            conn.io.io_mut().block_in(max * 2);
1303            assert!(conn.poll_complete().unwrap().is_ready());
1304
1305            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'c'; 1024 * 8].into()) }).unwrap().is_ready());
1306            Ok(())
1307        }).wait();
1308    }
1309
1310    #[test]
1311    fn test_conn_body_write_chunked() {
1312        let _: Result<(), ()> = future::lazy(|| {
1313            let io = AsyncIo::new_buf(vec![], 4096);
1314            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1315            conn.state.writing = Writing::Body(Encoder::chunked());
1316
1317            assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready());
1318            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready());
1319            Ok(())
1320        }).wait();
1321    }
1322
1323    #[test]
1324    fn test_conn_body_flush() {
1325        let _: Result<(), ()> = future::lazy(|| {
1326            let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
1327            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1328            conn.state.writing = Writing::Body(Encoder::length(1024 * 1024));
1329            assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready());
1330            assert!(!conn.can_buffer_body());
1331            conn.io.io_mut().block_in(1024 * 1024 * 5);
1332            assert!(conn.poll_complete().unwrap().is_ready());
1333            assert!(conn.can_buffer_body());
1334            assert!(conn.io.io_mut().flushed());
1335
1336            Ok(())
1337        }).wait();
1338    }
1339
1340    #[test]
1341    fn test_conn_parking() {
1342        use std::sync::Arc;
1343        use futures::executor::Notify;
1344        use futures::executor::NotifyHandle;
1345
1346        struct Car {
1347            permit: bool,
1348        }
1349        impl Notify for Car {
1350            fn notify(&self, _id: usize) {
1351                assert!(self.permit, "unparked without permit");
1352            }
1353        }
1354
1355        fn car(permit: bool) -> NotifyHandle {
1356            Arc::new(Car {
1357                permit: permit,
1358            }).into()
1359        }
1360
1361        // test that once writing is done, unparks
1362        let f = future::lazy(|| {
1363            let io = AsyncIo::new_buf(vec![], 4096);
1364            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1365            conn.state.reading = Reading::KeepAlive;
1366            assert!(conn.poll().unwrap().is_not_ready());
1367
1368            conn.state.writing = Writing::KeepAlive;
1369            assert!(conn.poll_complete().unwrap().is_ready());
1370            Ok::<(), ()>(())
1371        });
1372        ::futures::executor::spawn(f).poll_future_notify(&car(true), 0).unwrap();
1373
1374
1375        // test that flushing when not waiting on read doesn't unpark
1376        let f = future::lazy(|| {
1377            let io = AsyncIo::new_buf(vec![], 4096);
1378            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1379            conn.state.writing = Writing::KeepAlive;
1380            assert!(conn.poll_complete().unwrap().is_ready());
1381            Ok::<(), ()>(())
1382        });
1383        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1384
1385
1386        // test that flushing and writing isn't done doesn't unpark
1387        let f = future::lazy(|| {
1388            let io = AsyncIo::new_buf(vec![], 4096);
1389            let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1390            conn.state.reading = Reading::KeepAlive;
1391            assert!(conn.poll().unwrap().is_not_ready());
1392            conn.state.writing = Writing::Body(Encoder::length(5_000));
1393            assert!(conn.poll_complete().unwrap().is_ready());
1394            Ok::<(), ()>(())
1395        });
1396        ::futures::executor::spawn(f).poll_future_notify(&car(false), 0).unwrap();
1397    }
1398
1399    #[test]
1400    fn test_conn_closed_write() {
1401        let io = AsyncIo::new_buf(vec![], 0);
1402        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1403        conn.state.close();
1404
1405        match conn.start_send(Frame::Body { chunk: Some(b"foobar".to_vec().into()) }) {
1406            Err(_e) => {},
1407            other => panic!("did not return Err: {:?}", other)
1408        }
1409
1410        assert!(conn.state.is_write_closed());
1411    }
1412
1413    #[test]
1414    fn test_conn_write_empty_chunk() {
1415        let io = AsyncIo::new_buf(vec![], 0);
1416        let mut conn = Conn::<_, proto::Bytes, ServerTransaction>::new(io);
1417        conn.state.writing = Writing::KeepAlive;
1418
1419        assert!(conn.start_send(Frame::Body { chunk: None }).unwrap().is_ready());
1420        assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready());
1421        conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err();
1422    }
1423    */
1424}