1use std::error::Error as StdError;
2use std::fmt;
3use std::io;
4use std::task::{Context, Poll};
5use std::usize;
6
7use bytes::Bytes;
8use tracing::{debug, trace};
9
10use super::io::MemRead;
11use super::DecodedLength;
12
13use self::Kind::{Chunked, Eof, Length};
14
15const CHUNKED_EXTENSIONS_LIMIT: u64 = 1024 * 16;
19
20#[derive(Clone, PartialEq)]
25pub(crate) struct Decoder {
26 kind: Kind,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq)]
30enum Kind {
31 Length(u64),
33 Chunked {
35 state: ChunkedState,
36 chunk_len: u64,
37 extensions_cnt: u64,
38 },
39 Eof(bool),
56}
57
58#[derive(Debug, PartialEq, Clone, Copy)]
59enum ChunkedState {
60 Start,
61 Size,
62 SizeLws,
63 Extension,
64 SizeLf,
65 Body,
66 BodyCr,
67 BodyLf,
68 Trailer,
69 TrailerLf,
70 EndCr,
71 EndLf,
72 End,
73}
74
75impl Decoder {
76 pub(crate) fn length(x: u64) -> Decoder {
79 Decoder {
80 kind: Kind::Length(x),
81 }
82 }
83
84 pub(crate) fn chunked() -> Decoder {
85 Decoder {
86 kind: Kind::Chunked {
87 state: ChunkedState::new(),
88 chunk_len: 0,
89 extensions_cnt: 0,
90 },
91 }
92 }
93
94 pub(crate) fn eof() -> Decoder {
95 Decoder {
96 kind: Kind::Eof(false),
97 }
98 }
99
100 pub(super) fn new(len: DecodedLength) -> Self {
101 match len {
102 DecodedLength::CHUNKED => Decoder::chunked(),
103 DecodedLength::CLOSE_DELIMITED => Decoder::eof(),
104 length => Decoder::length(length.danger_len()),
105 }
106 }
107
108 pub(crate) fn is_eof(&self) -> bool {
111 matches!(
112 self.kind,
113 Length(0)
114 | Chunked {
115 state: ChunkedState::End,
116 ..
117 }
118 | Eof(true)
119 )
120 }
121
122 pub(crate) fn decode<R: MemRead>(
123 &mut self,
124 cx: &mut Context<'_>,
125 body: &mut R,
126 ) -> Poll<Result<Bytes, io::Error>> {
127 trace!("decode; state={:?}", self.kind);
128 match self.kind {
129 Length(ref mut remaining) => {
130 if *remaining == 0 {
131 Poll::Ready(Ok(Bytes::new()))
132 } else {
133 let to_read = *remaining as usize;
134 let buf = ready!(body.read_mem(cx, to_read))?;
135 let num = buf.as_ref().len() as u64;
136 if num > *remaining {
137 *remaining = 0;
138 } else if num == 0 {
139 return Poll::Ready(Err(io::Error::new(
140 io::ErrorKind::UnexpectedEof,
141 IncompleteBody,
142 )));
143 } else {
144 *remaining -= num;
145 }
146 Poll::Ready(Ok(buf))
147 }
148 }
149 Chunked {
150 ref mut state,
151 ref mut chunk_len,
152 ref mut extensions_cnt,
153 } => {
154 loop {
155 let mut buf = None;
156 *state = ready!(state.step(cx, body, chunk_len, extensions_cnt, &mut buf))?;
158 if *state == ChunkedState::End {
159 trace!("end of chunked");
160 return Poll::Ready(Ok(Bytes::new()));
161 }
162 if let Some(buf) = buf {
163 return Poll::Ready(Ok(buf));
164 }
165 }
166 }
167 Eof(ref mut is_eof) => {
168 if *is_eof {
169 Poll::Ready(Ok(Bytes::new()))
170 } else {
171 body.read_mem(cx, 8192).map_ok(|slice| {
175 *is_eof = slice.is_empty();
176 slice
177 })
178 }
179 }
180 }
181 }
182
183 #[cfg(test)]
184 async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> {
185 futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await
186 }
187}
188
189impl fmt::Debug for Decoder {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 fmt::Debug::fmt(&self.kind, f)
192 }
193}
194
195macro_rules! byte (
196 ($rdr:ident, $cx:expr) => ({
197 let buf = ready!($rdr.read_mem($cx, 1))?;
198 if !buf.is_empty() {
199 buf[0]
200 } else {
201 return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof,
202 "unexpected EOF during chunk size line")));
203 }
204 })
205);
206
207macro_rules! or_overflow {
208 ($e:expr) => (
209 match $e {
210 Some(val) => val,
211 None => return Poll::Ready(Err(io::Error::new(
212 io::ErrorKind::InvalidData,
213 "invalid chunk size: overflow",
214 ))),
215 }
216 )
217}
218
219impl ChunkedState {
220 fn new() -> ChunkedState {
221 ChunkedState::Start
222 }
223 fn step<R: MemRead>(
224 &self,
225 cx: &mut Context<'_>,
226 body: &mut R,
227 size: &mut u64,
228 extensions_cnt: &mut u64,
229 buf: &mut Option<Bytes>,
230 ) -> Poll<Result<ChunkedState, io::Error>> {
231 use self::ChunkedState::*;
232 match *self {
233 Start => ChunkedState::read_start(cx, body, size),
234 Size => ChunkedState::read_size(cx, body, size),
235 SizeLws => ChunkedState::read_size_lws(cx, body),
236 Extension => ChunkedState::read_extension(cx, body, extensions_cnt),
237 SizeLf => ChunkedState::read_size_lf(cx, body, *size),
238 Body => ChunkedState::read_body(cx, body, size, buf),
239 BodyCr => ChunkedState::read_body_cr(cx, body),
240 BodyLf => ChunkedState::read_body_lf(cx, body),
241 Trailer => ChunkedState::read_trailer(cx, body),
242 TrailerLf => ChunkedState::read_trailer_lf(cx, body),
243 EndCr => ChunkedState::read_end_cr(cx, body),
244 EndLf => ChunkedState::read_end_lf(cx, body),
245 End => Poll::Ready(Ok(ChunkedState::End)),
246 }
247 }
248
249 fn read_start<R: MemRead>(
250 cx: &mut Context<'_>,
251 rdr: &mut R,
252 size: &mut u64,
253 ) -> Poll<Result<ChunkedState, io::Error>> {
254 trace!("Read chunk start");
255
256 let radix = 16;
257 match byte!(rdr, cx) {
258 b @ b'0'..=b'9' => {
259 *size = or_overflow!(size.checked_mul(radix));
260 *size = or_overflow!(size.checked_add((b - b'0') as u64));
261 }
262 b @ b'a'..=b'f' => {
263 *size = or_overflow!(size.checked_mul(radix));
264 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
265 }
266 b @ b'A'..=b'F' => {
267 *size = or_overflow!(size.checked_mul(radix));
268 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
269 }
270 _ => {
271 return Poll::Ready(Err(io::Error::new(
272 io::ErrorKind::InvalidInput,
273 "Invalid chunk size line: missing size digit",
274 )));
275 }
276 }
277
278 Poll::Ready(Ok(ChunkedState::Size))
279 }
280
281 fn read_size<R: MemRead>(
282 cx: &mut Context<'_>,
283 rdr: &mut R,
284 size: &mut u64,
285 ) -> Poll<Result<ChunkedState, io::Error>> {
286 trace!("Read chunk hex size");
287
288 let radix = 16;
289 match byte!(rdr, cx) {
290 b @ b'0'..=b'9' => {
291 *size = or_overflow!(size.checked_mul(radix));
292 *size = or_overflow!(size.checked_add((b - b'0') as u64));
293 }
294 b @ b'a'..=b'f' => {
295 *size = or_overflow!(size.checked_mul(radix));
296 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
297 }
298 b @ b'A'..=b'F' => {
299 *size = or_overflow!(size.checked_mul(radix));
300 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
301 }
302 b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)),
303 b';' => return Poll::Ready(Ok(ChunkedState::Extension)),
304 b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)),
305 _ => {
306 return Poll::Ready(Err(io::Error::new(
307 io::ErrorKind::InvalidInput,
308 "Invalid chunk size line: Invalid Size",
309 )));
310 }
311 }
312 Poll::Ready(Ok(ChunkedState::Size))
313 }
314 fn read_size_lws<R: MemRead>(
315 cx: &mut Context<'_>,
316 rdr: &mut R,
317 ) -> Poll<Result<ChunkedState, io::Error>> {
318 trace!("read_size_lws");
319 match byte!(rdr, cx) {
320 b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)),
322 b';' => Poll::Ready(Ok(ChunkedState::Extension)),
323 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
324 _ => Poll::Ready(Err(io::Error::new(
325 io::ErrorKind::InvalidInput,
326 "Invalid chunk size linear white space",
327 ))),
328 }
329 }
330 fn read_extension<R: MemRead>(
331 cx: &mut Context<'_>,
332 rdr: &mut R,
333 extensions_cnt: &mut u64,
334 ) -> Poll<Result<ChunkedState, io::Error>> {
335 trace!("read_extension");
336 match byte!(rdr, cx) {
343 b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
344 b'\n' => Poll::Ready(Err(io::Error::new(
345 io::ErrorKind::InvalidData,
346 "invalid chunk extension contains newline",
347 ))),
348 _ => {
349 *extensions_cnt += 1;
350 if *extensions_cnt >= CHUNKED_EXTENSIONS_LIMIT {
351 Poll::Ready(Err(io::Error::new(
352 io::ErrorKind::InvalidData,
353 "chunk extensions over limit",
354 )))
355 } else {
356 Poll::Ready(Ok(ChunkedState::Extension))
357 }
358 } }
360 }
361 fn read_size_lf<R: MemRead>(
362 cx: &mut Context<'_>,
363 rdr: &mut R,
364 size: u64,
365 ) -> Poll<Result<ChunkedState, io::Error>> {
366 trace!("Chunk size is {:?}", size);
367 match byte!(rdr, cx) {
368 b'\n' => {
369 if size == 0 {
370 Poll::Ready(Ok(ChunkedState::EndCr))
371 } else {
372 debug!("incoming chunked header: {0:#X} ({0} bytes)", size);
373 Poll::Ready(Ok(ChunkedState::Body))
374 }
375 }
376 _ => Poll::Ready(Err(io::Error::new(
377 io::ErrorKind::InvalidInput,
378 "Invalid chunk size LF",
379 ))),
380 }
381 }
382
383 fn read_body<R: MemRead>(
384 cx: &mut Context<'_>,
385 rdr: &mut R,
386 rem: &mut u64,
387 buf: &mut Option<Bytes>,
388 ) -> Poll<Result<ChunkedState, io::Error>> {
389 trace!("Chunked read, remaining={:?}", rem);
390
391 let rem_cap = match *rem {
393 r if r > usize::MAX as u64 => usize::MAX,
394 r => r as usize,
395 };
396
397 let to_read = rem_cap;
398 let slice = ready!(rdr.read_mem(cx, to_read))?;
399 let count = slice.len();
400
401 if count == 0 {
402 *rem = 0;
403 return Poll::Ready(Err(io::Error::new(
404 io::ErrorKind::UnexpectedEof,
405 IncompleteBody,
406 )));
407 }
408 *buf = Some(slice);
409 *rem -= count as u64;
410
411 if *rem > 0 {
412 Poll::Ready(Ok(ChunkedState::Body))
413 } else {
414 Poll::Ready(Ok(ChunkedState::BodyCr))
415 }
416 }
417 fn read_body_cr<R: MemRead>(
418 cx: &mut Context<'_>,
419 rdr: &mut R,
420 ) -> Poll<Result<ChunkedState, io::Error>> {
421 match byte!(rdr, cx) {
422 b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)),
423 _ => Poll::Ready(Err(io::Error::new(
424 io::ErrorKind::InvalidInput,
425 "Invalid chunk body CR",
426 ))),
427 }
428 }
429 fn read_body_lf<R: MemRead>(
430 cx: &mut Context<'_>,
431 rdr: &mut R,
432 ) -> Poll<Result<ChunkedState, io::Error>> {
433 match byte!(rdr, cx) {
434 b'\n' => Poll::Ready(Ok(ChunkedState::Start)),
435 _ => Poll::Ready(Err(io::Error::new(
436 io::ErrorKind::InvalidInput,
437 "Invalid chunk body LF",
438 ))),
439 }
440 }
441
442 fn read_trailer<R: MemRead>(
443 cx: &mut Context<'_>,
444 rdr: &mut R,
445 ) -> Poll<Result<ChunkedState, io::Error>> {
446 trace!("read_trailer");
447 match byte!(rdr, cx) {
448 b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)),
449 _ => Poll::Ready(Ok(ChunkedState::Trailer)),
450 }
451 }
452 fn read_trailer_lf<R: MemRead>(
453 cx: &mut Context<'_>,
454 rdr: &mut R,
455 ) -> Poll<Result<ChunkedState, io::Error>> {
456 match byte!(rdr, cx) {
457 b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)),
458 _ => Poll::Ready(Err(io::Error::new(
459 io::ErrorKind::InvalidInput,
460 "Invalid trailer end LF",
461 ))),
462 }
463 }
464
465 fn read_end_cr<R: MemRead>(
466 cx: &mut Context<'_>,
467 rdr: &mut R,
468 ) -> Poll<Result<ChunkedState, io::Error>> {
469 match byte!(rdr, cx) {
470 b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)),
471 _ => Poll::Ready(Ok(ChunkedState::Trailer)),
472 }
473 }
474 fn read_end_lf<R: MemRead>(
475 cx: &mut Context<'_>,
476 rdr: &mut R,
477 ) -> Poll<Result<ChunkedState, io::Error>> {
478 match byte!(rdr, cx) {
479 b'\n' => Poll::Ready(Ok(ChunkedState::End)),
480 _ => Poll::Ready(Err(io::Error::new(
481 io::ErrorKind::InvalidInput,
482 "Invalid chunk end LF",
483 ))),
484 }
485 }
486}
487
488#[derive(Debug)]
489struct IncompleteBody;
490
491impl fmt::Display for IncompleteBody {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 write!(f, "end of file before message length reached")
494 }
495}
496
497impl StdError for IncompleteBody {}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use std::pin::Pin;
503 use std::time::Duration;
504 use tokio::io::{AsyncRead, ReadBuf};
505
506 impl<'a> MemRead for &'a [u8] {
507 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
508 let n = std::cmp::min(len, self.len());
509 if n > 0 {
510 let (a, b) = self.split_at(n);
511 let buf = Bytes::copy_from_slice(a);
512 *self = b;
513 Poll::Ready(Ok(buf))
514 } else {
515 Poll::Ready(Ok(Bytes::new()))
516 }
517 }
518 }
519
520 impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) {
521 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
522 let mut v = vec![0; len];
523 let mut buf = ReadBuf::new(&mut v);
524 ready!(Pin::new(self).poll_read(cx, &mut buf)?);
525 Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled())))
526 }
527 }
528
529 impl MemRead for Bytes {
530 fn read_mem(&mut self, _: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
531 let n = std::cmp::min(len, self.len());
532 let ret = self.split_to(n);
533 Poll::Ready(Ok(ret))
534 }
535 }
536
537 #[tokio::test]
548 async fn test_read_chunk_size() {
549 use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof};
550
551 async fn read(s: &str) -> u64 {
552 let mut state = ChunkedState::new();
553 let rdr = &mut s.as_bytes();
554 let mut size = 0;
555 let mut ext_cnt = 0;
556 loop {
557 let result = futures_util::future::poll_fn(|cx| {
558 state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None)
559 })
560 .await;
561 let desc = format!("read_size failed for {:?}", s);
562 state = result.expect(desc.as_str());
563 if state == ChunkedState::Body || state == ChunkedState::EndCr {
564 break;
565 }
566 }
567 size
568 }
569
570 async fn read_err(s: &str, expected_err: io::ErrorKind) {
571 let mut state = ChunkedState::new();
572 let rdr = &mut s.as_bytes();
573 let mut size = 0;
574 let mut ext_cnt = 0;
575 loop {
576 let result = futures_util::future::poll_fn(|cx| {
577 state.step(cx, rdr, &mut size, &mut ext_cnt, &mut None)
578 })
579 .await;
580 state = match result {
581 Ok(s) => s,
582 Err(e) => {
583 assert!(
584 expected_err == e.kind(),
585 "Reading {:?}, expected {:?}, but got {:?}",
586 s,
587 expected_err,
588 e.kind()
589 );
590 return;
591 }
592 };
593 if state == ChunkedState::Body || state == ChunkedState::End {
594 panic!("Was Ok. Expected Err for {:?}", s);
595 }
596 }
597 }
598
599 assert_eq!(1, read("1\r\n").await);
600 assert_eq!(1, read("01\r\n").await);
601 assert_eq!(0, read("0\r\n").await);
602 assert_eq!(0, read("00\r\n").await);
603 assert_eq!(10, read("A\r\n").await);
604 assert_eq!(10, read("a\r\n").await);
605 assert_eq!(255, read("Ff\r\n").await);
606 assert_eq!(255, read("Ff \r\n").await);
607 read_err("F\rF", InvalidInput).await;
609 read_err("F", UnexpectedEof).await;
610 read_err("\r\n\r\n", InvalidInput).await;
612 read_err("\r\n", InvalidInput).await;
613 read_err("X\r\n", InvalidInput).await;
615 read_err("1X\r\n", InvalidInput).await;
616 read_err("-\r\n", InvalidInput).await;
617 read_err("-1\r\n", InvalidInput).await;
618 assert_eq!(1, read("1;extension\r\n").await);
620 assert_eq!(10, read("a;ext name=value\r\n").await);
621 assert_eq!(1, read("1;extension;extension2\r\n").await);
622 assert_eq!(1, read("1;;; ;\r\n").await);
623 assert_eq!(2, read("2; extension...\r\n").await);
624 assert_eq!(3, read("3 ; extension=123\r\n").await);
625 assert_eq!(3, read("3 ;\r\n").await);
626 assert_eq!(3, read("3 ; \r\n").await);
627 read_err("1 invalid extension\r\n", InvalidInput).await;
629 read_err("1 A\r\n", InvalidInput).await;
630 read_err("1;no CRLF", UnexpectedEof).await;
631 read_err("1;reject\nnewlines\r\n", InvalidData).await;
632 read_err("f0000000000000003\r\n", InvalidData).await;
634 }
635
636 #[tokio::test]
637 async fn test_read_sized_early_eof() {
638 let mut bytes = &b"foo bar"[..];
639 let mut decoder = Decoder::length(10);
640 assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
641 let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
642 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
643 }
644
645 #[tokio::test]
646 async fn test_read_chunked_early_eof() {
647 let mut bytes = &b"\
648 9\r\n\
649 foo bar\
650 "[..];
651 let mut decoder = Decoder::chunked();
652 assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
653 let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
654 assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
655 }
656
657 #[tokio::test]
658 async fn test_read_chunked_single_read() {
659 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
660 let buf = Decoder::chunked()
661 .decode_fut(&mut mock_buf)
662 .await
663 .expect("decode");
664 assert_eq!(16, buf.len());
665 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
666 assert_eq!("1234567890abcdef", &result);
667 }
668
669 #[tokio::test]
670 async fn test_read_chunked_with_missing_zero_digit() {
671 let mut mock_buf = &b"1\r\nZ\r\n\r\n\r\n"[..];
673 let mut decoder = Decoder::chunked();
674 let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
675 assert_eq!("Z", buf);
676
677 let err = decoder
678 .decode_fut(&mut mock_buf)
679 .await
680 .expect_err("decode 2");
681 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
682 }
683
684 #[tokio::test]
685 async fn test_read_chunked_extensions_over_limit() {
686 let per_chunk = super::CHUNKED_EXTENSIONS_LIMIT * 2 / 3;
689 let mut scratch = vec![];
690 for _ in 0..2 {
691 scratch.extend(b"1;");
692 scratch.extend(b"x".repeat(per_chunk as usize));
693 scratch.extend(b"\r\nA\r\n");
694 }
695 scratch.extend(b"0\r\n\r\n");
696 let mut mock_buf = Bytes::from(scratch);
697
698 let mut decoder = Decoder::chunked();
699 let buf1 = decoder.decode_fut(&mut mock_buf).await.expect("decode1");
700 assert_eq!(&buf1[..], b"A");
701
702 let err = decoder
703 .decode_fut(&mut mock_buf)
704 .await
705 .expect_err("decode2");
706 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
707 assert_eq!(err.to_string(), "chunk extensions over limit");
708 }
709
710 #[cfg(not(miri))]
711 #[tokio::test]
712 async fn test_read_chunked_trailer_with_missing_lf() {
713 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..];
714 let mut decoder = Decoder::chunked();
715 decoder.decode_fut(&mut mock_buf).await.expect("decode");
716 let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err();
717 assert_eq!(e.kind(), io::ErrorKind::InvalidInput);
718 }
719
720 #[tokio::test]
721 async fn test_read_chunked_after_eof() {
722 let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
723 let mut decoder = Decoder::chunked();
724
725 let buf = decoder.decode_fut(&mut mock_buf).await.unwrap();
727 assert_eq!(16, buf.len());
728 let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
729 assert_eq!("1234567890abcdef", &result);
730
731 let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
733 assert_eq!(0, buf.len());
734
735 let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
737 assert_eq!(0, buf.len());
738 }
739
740 async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String {
743 let mut outs = Vec::new();
744
745 let mut ins = if block_at == 0 {
746 tokio_test::io::Builder::new()
747 .wait(Duration::from_millis(10))
748 .read(content)
749 .build()
750 } else {
751 tokio_test::io::Builder::new()
752 .read(&content[..block_at])
753 .wait(Duration::from_millis(10))
754 .read(&content[block_at..])
755 .build()
756 };
757
758 let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin);
759
760 loop {
761 let buf = decoder
762 .decode_fut(&mut ins)
763 .await
764 .expect("unexpected decode error");
765 if buf.is_empty() {
766 break; }
768 outs.extend(buf.as_ref());
769 }
770
771 String::from_utf8(outs).expect("decode String")
772 }
773
774 async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) {
777 let content_len = content.len();
778 for block_at in 0..content_len {
779 let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await;
780 assert_eq!(expected, &actual) }
782 }
783
784 #[tokio::test]
785 async fn test_read_length_async() {
786 let content = "foobar";
787 all_async_cases(content, content, Decoder::length(content.len() as u64)).await;
788 }
789
790 #[tokio::test]
791 async fn test_read_chunked_async() {
792 let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n";
793 let expected = "foobar";
794 all_async_cases(content, expected, Decoder::chunked()).await;
795 }
796
797 #[tokio::test]
798 async fn test_read_eof_async() {
799 let content = "foobar";
800 all_async_cases(content, content, Decoder::eof()).await;
801 }
802
803 #[cfg(feature = "nightly")]
804 #[bench]
805 fn bench_decode_chunked_1kb(b: &mut test::Bencher) {
806 let rt = new_runtime();
807
808 const LEN: usize = 1024;
809 let mut vec = Vec::new();
810 vec.extend(format!("{:x}\r\n", LEN).as_bytes());
811 vec.extend(&[0; LEN][..]);
812 vec.extend(b"\r\n");
813 let content = Bytes::from(vec);
814
815 b.bytes = LEN as u64;
816
817 b.iter(|| {
818 let mut decoder = Decoder::chunked();
819 rt.block_on(async {
820 let mut raw = content.clone();
821 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
822 assert_eq!(chunk.len(), LEN);
823 });
824 });
825 }
826
827 #[cfg(feature = "nightly")]
828 #[bench]
829 fn bench_decode_length_1kb(b: &mut test::Bencher) {
830 let rt = new_runtime();
831
832 const LEN: usize = 1024;
833 let content = Bytes::from(&[0; LEN][..]);
834 b.bytes = LEN as u64;
835
836 b.iter(|| {
837 let mut decoder = Decoder::length(LEN as u64);
838 rt.block_on(async {
839 let mut raw = content.clone();
840 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
841 assert_eq!(chunk.len(), LEN);
842 });
843 });
844 }
845
846 #[cfg(feature = "nightly")]
847 fn new_runtime() -> tokio::runtime::Runtime {
848 tokio::runtime::Builder::new_current_thread()
849 .enable_all()
850 .build()
851 .expect("rt build")
852 }
853}