1use std::cmp;
2use std::fmt;
3#[cfg(all(feature = "server", feature = "runtime"))]
4use std::future::Future;
5use std::io::{self, IoSlice};
6use std::marker::Unpin;
7use std::mem::MaybeUninit;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10#[cfg(all(feature = "server", feature = "runtime"))]
11use std::time::Duration;
12
13use bytes::{Buf, BufMut, Bytes, BytesMut};
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15#[cfg(all(feature = "server", feature = "runtime"))]
16use tokio::time::Instant;
17use tracing::{debug, trace};
18
19use super::{Http1Transaction, ParseContext, ParsedMessage};
20use crate::common::buf::BufList;
21
22pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
24
25pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
27
28pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
32
33const MAX_BUF_LIST_BUFFERS: usize = 16;
39
40pub(crate) struct Buffered<T, B> {
41 flush_pipeline: bool,
42 io: T,
43 partial_len: Option<usize>,
44 read_blocked: bool,
45 read_buf: BytesMut,
46 read_buf_strategy: ReadStrategy,
47 write_buf: WriteBuf<B>,
48}
49
50impl<T, B> fmt::Debug for Buffered<T, B>
51where
52 B: Buf,
53{
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 f.debug_struct("Buffered")
56 .field("read_buf", &self.read_buf)
57 .field("write_buf", &self.write_buf)
58 .finish()
59 }
60}
61
62impl<T, B> Buffered<T, B>
63where
64 T: AsyncRead + AsyncWrite + Unpin,
65 B: Buf,
66{
67 pub(crate) fn new(io: T) -> Buffered<T, B> {
68 let strategy = if io.is_write_vectored() {
69 WriteStrategy::Queue
70 } else {
71 WriteStrategy::Flatten
72 };
73 let write_buf = WriteBuf::new(strategy);
74 Buffered {
75 flush_pipeline: false,
76 io,
77 partial_len: None,
78 read_blocked: false,
79 read_buf: BytesMut::with_capacity(0),
80 read_buf_strategy: ReadStrategy::default(),
81 write_buf,
82 }
83 }
84
85 #[cfg(feature = "server")]
86 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
87 debug_assert!(!self.write_buf.has_remaining());
88 self.flush_pipeline = enabled;
89 if enabled {
90 self.set_write_strategy_flatten();
91 }
92 }
93
94 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
95 assert!(
96 max >= MINIMUM_MAX_BUFFER_SIZE,
97 "The max_buf_size cannot be smaller than {}.",
98 MINIMUM_MAX_BUFFER_SIZE,
99 );
100 self.read_buf_strategy = ReadStrategy::with_max(max);
101 self.write_buf.max_buf_size = max;
102 }
103
104 #[cfg(feature = "client")]
105 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
106 self.read_buf_strategy = ReadStrategy::Exact(sz);
107 }
108
109 pub(crate) fn set_write_strategy_flatten(&mut self) {
110 debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
113 self.write_buf.set_strategy(WriteStrategy::Flatten);
114 }
115
116 pub(crate) fn set_write_strategy_queue(&mut self) {
117 debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
120 self.write_buf.set_strategy(WriteStrategy::Queue);
121 }
122
123 pub(crate) fn read_buf(&self) -> &[u8] {
124 self.read_buf.as_ref()
125 }
126
127 #[cfg(test)]
128 #[cfg(feature = "nightly")]
129 pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
130 &mut self.read_buf
131 }
132
133 fn read_buf_remaining_mut(&self) -> usize {
136 self.read_buf.capacity() - self.read_buf.len()
137 }
138
139 pub(crate) fn can_headers_buf(&self) -> bool {
145 !self.write_buf.queue.has_remaining()
146 }
147
148 pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
149 let buf = self.write_buf.headers_mut();
150 &mut buf.bytes
151 }
152
153 pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
154 &mut self.write_buf
155 }
156
157 pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
158 self.write_buf.buffer(buf)
159 }
160
161 pub(crate) fn can_buffer(&self) -> bool {
162 self.flush_pipeline || self.write_buf.can_buffer()
163 }
164
165 pub(crate) fn consume_leading_lines(&mut self) {
166 if !self.read_buf.is_empty() {
167 let mut i = 0;
168 while i < self.read_buf.len() {
169 match self.read_buf[i] {
170 b'\r' | b'\n' => i += 1,
171 _ => break,
172 }
173 }
174 self.read_buf.advance(i);
175 }
176 }
177
178 pub(super) fn parse<S>(
179 &mut self,
180 cx: &mut Context<'_>,
181 parse_ctx: ParseContext<'_>,
182 ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
183 where
184 S: Http1Transaction,
185 {
186 loop {
187 match super::role::parse_headers::<S>(
188 &mut self.read_buf,
189 self.partial_len,
190 ParseContext {
191 cached_headers: parse_ctx.cached_headers,
192 req_method: parse_ctx.req_method,
193 h1_parser_config: parse_ctx.h1_parser_config.clone(),
194 #[cfg(all(feature = "server", feature = "runtime"))]
195 h1_header_read_timeout: parse_ctx.h1_header_read_timeout,
196 #[cfg(all(feature = "server", feature = "runtime"))]
197 h1_header_read_timeout_fut: parse_ctx.h1_header_read_timeout_fut,
198 #[cfg(all(feature = "server", feature = "runtime"))]
199 h1_header_read_timeout_running: parse_ctx.h1_header_read_timeout_running,
200 preserve_header_case: parse_ctx.preserve_header_case,
201 #[cfg(feature = "ffi")]
202 preserve_header_order: parse_ctx.preserve_header_order,
203 h09_responses: parse_ctx.h09_responses,
204 #[cfg(feature = "ffi")]
205 on_informational: parse_ctx.on_informational,
206 #[cfg(feature = "ffi")]
207 raw_headers: parse_ctx.raw_headers,
208 },
209 )? {
210 Some(msg) => {
211 debug!("parsed {} headers", msg.head.headers.len());
212
213 #[cfg(all(feature = "server", feature = "runtime"))]
214 {
215 *parse_ctx.h1_header_read_timeout_running = false;
216
217 if let Some(h1_header_read_timeout_fut) =
218 parse_ctx.h1_header_read_timeout_fut
219 {
220 h1_header_read_timeout_fut
222 .as_mut()
223 .reset(Instant::now() + Duration::from_secs(30 * 24 * 60 * 60));
224 }
225 }
226 self.partial_len = None;
227 return Poll::Ready(Ok(msg));
228 }
229 None => {
230 let max = self.read_buf_strategy.max();
231 let curr_len = self.read_buf.len();
232 if curr_len >= max {
233 debug!("max_buf_size ({}) reached, closing", max);
234 return Poll::Ready(Err(crate::Error::new_too_large()));
235 }
236
237 #[cfg(all(feature = "server", feature = "runtime"))]
238 if *parse_ctx.h1_header_read_timeout_running {
239 if let Some(h1_header_read_timeout_fut) =
240 parse_ctx.h1_header_read_timeout_fut
241 {
242 if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
243 *parse_ctx.h1_header_read_timeout_running = false;
244
245 tracing::warn!("read header from client timeout");
246 return Poll::Ready(Err(crate::Error::new_header_timeout()));
247 }
248 }
249 }
250 if curr_len > 0 {
251 trace!("partial headers; {} bytes so far", curr_len);
252 self.partial_len = Some(curr_len);
253 } else {
254 self.partial_len = None;
256 }
257 }
258 }
259 if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
260 trace!("parse eof");
261 return Poll::Ready(Err(crate::Error::new_incomplete()));
262 }
263 }
264 }
265
266 pub(crate) fn poll_read_from_io(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
267 self.read_blocked = false;
268 let next = self.read_buf_strategy.next();
269 if self.read_buf_remaining_mut() < next {
270 self.read_buf.reserve(next);
271 }
272
273 let dst = self.read_buf.chunk_mut();
274 let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
275 let mut buf = ReadBuf::uninit(dst);
276 match Pin::new(&mut self.io).poll_read(cx, &mut buf) {
277 Poll::Ready(Ok(_)) => {
278 let n = buf.filled().len();
279 trace!("received {} bytes", n);
280 unsafe {
281 self.read_buf.advance_mut(n);
285 }
286 self.read_buf_strategy.record(n);
287 Poll::Ready(Ok(n))
288 }
289 Poll::Pending => {
290 self.read_blocked = true;
291 Poll::Pending
292 }
293 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
294 }
295 }
296
297 pub(crate) fn into_inner(self) -> (T, Bytes) {
298 (self.io, self.read_buf.freeze())
299 }
300
301 pub(crate) fn io_mut(&mut self) -> &mut T {
302 &mut self.io
303 }
304
305 pub(crate) fn is_read_blocked(&self) -> bool {
306 self.read_blocked
307 }
308
309 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
310 if self.flush_pipeline && !self.read_buf.is_empty() {
311 Poll::Ready(Ok(()))
312 } else if self.write_buf.remaining() == 0 {
313 Pin::new(&mut self.io).poll_flush(cx)
314 } else {
315 if let WriteStrategy::Flatten = self.write_buf.strategy {
316 return self.poll_flush_flattened(cx);
317 }
318
319 const MAX_WRITEV_BUFS: usize = 64;
320 loop {
321 let n = {
322 let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
323 let len = self.write_buf.chunks_vectored(&mut iovs);
324 ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
325 };
326 self.write_buf.advance(n);
330 debug!("flushed {} bytes", n);
331 if self.write_buf.remaining() == 0 {
332 break;
333 } else if n == 0 {
334 trace!(
335 "write returned zero, but {} bytes remaining",
336 self.write_buf.remaining()
337 );
338 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
339 }
340 }
341 Pin::new(&mut self.io).poll_flush(cx)
342 }
343 }
344
345 fn poll_flush_flattened(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
350 loop {
351 let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
352 debug!("flushed {} bytes", n);
353 self.write_buf.headers.advance(n);
354 if self.write_buf.headers.remaining() == 0 {
355 self.write_buf.headers.reset();
356 break;
357 } else if n == 0 {
358 trace!(
359 "write returned zero, but {} bytes remaining",
360 self.write_buf.remaining()
361 );
362 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
363 }
364 }
365 Pin::new(&mut self.io).poll_flush(cx)
366 }
367
368 #[cfg(test)]
369 fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a {
370 futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
371 }
372}
373
374impl<T: Unpin, B> Unpin for Buffered<T, B> {}
376
377pub(crate) trait MemRead {
379 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
380}
381
382impl<T, B> MemRead for Buffered<T, B>
383where
384 T: AsyncRead + AsyncWrite + Unpin,
385 B: Buf,
386{
387 fn read_mem(&mut self, cx: &mut Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
388 if !self.read_buf.is_empty() {
389 let n = std::cmp::min(len, self.read_buf.len());
390 Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
391 } else {
392 let n = ready!(self.poll_read_from_io(cx))?;
393 Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
394 }
395 }
396}
397
398#[derive(Clone, Copy, Debug)]
399enum ReadStrategy {
400 Adaptive {
401 decrease_now: bool,
402 next: usize,
403 max: usize,
404 },
405 #[cfg(feature = "client")]
406 Exact(usize),
407}
408
409impl ReadStrategy {
410 fn with_max(max: usize) -> ReadStrategy {
411 ReadStrategy::Adaptive {
412 decrease_now: false,
413 next: INIT_BUFFER_SIZE,
414 max,
415 }
416 }
417
418 fn next(&self) -> usize {
419 match *self {
420 ReadStrategy::Adaptive { next, .. } => next,
421 #[cfg(feature = "client")]
422 ReadStrategy::Exact(exact) => exact,
423 }
424 }
425
426 fn max(&self) -> usize {
427 match *self {
428 ReadStrategy::Adaptive { max, .. } => max,
429 #[cfg(feature = "client")]
430 ReadStrategy::Exact(exact) => exact,
431 }
432 }
433
434 fn record(&mut self, bytes_read: usize) {
435 match *self {
436 ReadStrategy::Adaptive {
437 ref mut decrease_now,
438 ref mut next,
439 max,
440 ..
441 } => {
442 if bytes_read >= *next {
443 *next = cmp::min(incr_power_of_two(*next), max);
444 *decrease_now = false;
445 } else {
446 let decr_to = prev_power_of_two(*next);
447 if bytes_read < decr_to {
448 if *decrease_now {
449 *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
450 *decrease_now = false;
451 } else {
452 *decrease_now = true;
454 }
455 } else {
456 *decrease_now = false;
460 }
461 }
462 }
463 #[cfg(feature = "client")]
464 ReadStrategy::Exact(_) => (),
465 }
466 }
467}
468
469fn incr_power_of_two(n: usize) -> usize {
470 n.saturating_mul(2)
471}
472
473fn prev_power_of_two(n: usize) -> usize {
474 debug_assert!(n >= 4);
477 (::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
478}
479
480impl Default for ReadStrategy {
481 fn default() -> ReadStrategy {
482 ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
483 }
484}
485
486#[derive(Clone)]
487pub(crate) struct Cursor<T> {
488 bytes: T,
489 pos: usize,
490}
491
492impl<T: AsRef<[u8]>> Cursor<T> {
493 #[inline]
494 pub(crate) fn new(bytes: T) -> Cursor<T> {
495 Cursor { bytes, pos: 0 }
496 }
497}
498
499impl Cursor<Vec<u8>> {
500 fn maybe_unshift(&mut self, additional: usize) {
504 if self.pos == 0 {
505 return;
507 }
508
509 if self.bytes.capacity() - self.bytes.len() >= additional {
510 return;
512 }
513
514 self.bytes.drain(0..self.pos);
515 self.pos = 0;
516 }
517
518 fn reset(&mut self) {
519 self.pos = 0;
520 self.bytes.clear();
521 }
522}
523
524impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
525 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526 f.debug_struct("Cursor")
527 .field("pos", &self.pos)
528 .field("len", &self.bytes.as_ref().len())
529 .finish()
530 }
531}
532
533impl<T: AsRef<[u8]>> Buf for Cursor<T> {
534 #[inline]
535 fn remaining(&self) -> usize {
536 self.bytes.as_ref().len() - self.pos
537 }
538
539 #[inline]
540 fn chunk(&self) -> &[u8] {
541 &self.bytes.as_ref()[self.pos..]
542 }
543
544 #[inline]
545 fn advance(&mut self, cnt: usize) {
546 debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
547 self.pos += cnt;
548 }
549}
550
551pub(super) struct WriteBuf<B> {
553 headers: Cursor<Vec<u8>>,
555 max_buf_size: usize,
556 queue: BufList<B>,
558 strategy: WriteStrategy,
559}
560
561impl<B: Buf> WriteBuf<B> {
562 fn new(strategy: WriteStrategy) -> WriteBuf<B> {
563 WriteBuf {
564 headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
565 max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
566 queue: BufList::new(),
567 strategy,
568 }
569 }
570}
571
572impl<B> WriteBuf<B>
573where
574 B: Buf,
575{
576 fn set_strategy(&mut self, strategy: WriteStrategy) {
577 self.strategy = strategy;
578 }
579
580 pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
581 debug_assert!(buf.has_remaining());
582 match self.strategy {
583 WriteStrategy::Flatten => {
584 let head = self.headers_mut();
585
586 head.maybe_unshift(buf.remaining());
587 trace!(
588 self.len = head.remaining(),
589 buf.len = buf.remaining(),
590 "buffer.flatten"
591 );
592 loop {
595 let adv = {
596 let slice = buf.chunk();
597 if slice.is_empty() {
598 return;
599 }
600 head.bytes.extend_from_slice(slice);
601 slice.len()
602 };
603 buf.advance(adv);
604 }
605 }
606 WriteStrategy::Queue => {
607 trace!(
608 self.len = self.remaining(),
609 buf.len = buf.remaining(),
610 "buffer.queue"
611 );
612 self.queue.push(buf.into());
613 }
614 }
615 }
616
617 fn can_buffer(&self) -> bool {
618 match self.strategy {
619 WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
620 WriteStrategy::Queue => {
621 self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
622 }
623 }
624 }
625
626 fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
627 debug_assert!(!self.queue.has_remaining());
628 &mut self.headers
629 }
630}
631
632impl<B: Buf> fmt::Debug for WriteBuf<B> {
633 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634 f.debug_struct("WriteBuf")
635 .field("remaining", &self.remaining())
636 .field("strategy", &self.strategy)
637 .finish()
638 }
639}
640
641impl<B: Buf> Buf for WriteBuf<B> {
642 #[inline]
643 fn remaining(&self) -> usize {
644 self.headers.remaining() + self.queue.remaining()
645 }
646
647 #[inline]
648 fn chunk(&self) -> &[u8] {
649 let headers = self.headers.chunk();
650 if !headers.is_empty() {
651 headers
652 } else {
653 self.queue.chunk()
654 }
655 }
656
657 #[inline]
658 fn advance(&mut self, cnt: usize) {
659 let hrem = self.headers.remaining();
660
661 match hrem.cmp(&cnt) {
662 cmp::Ordering::Equal => self.headers.reset(),
663 cmp::Ordering::Greater => self.headers.advance(cnt),
664 cmp::Ordering::Less => {
665 let qcnt = cnt - hrem;
666 self.headers.reset();
667 self.queue.advance(qcnt);
668 }
669 }
670 }
671
672 #[inline]
673 fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
674 let n = self.headers.chunks_vectored(dst);
675 self.queue.chunks_vectored(&mut dst[n..]) + n
676 }
677}
678
679#[derive(Debug)]
680enum WriteStrategy {
681 Flatten,
682 Queue,
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use std::time::Duration;
689
690 use tokio_test::io::Builder as Mock;
691
692 #[tokio::test]
706 #[ignore]
707 async fn iobuf_write_empty_slice() {
708 }
725
726 #[tokio::test]
727 async fn parse_reads_until_blocked() {
728 use crate::proto::h1::ClientTransaction;
729
730 let _ = pretty_env_logger::try_init();
731 let mock = Mock::new()
732 .read(b"HTTP/1.1 200 OK\r\n")
734 .read(b"Server: hyper\r\n")
735 .wait(Duration::from_secs(1))
737 .build();
738
739 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
740
741 futures_util::future::poll_fn(|cx| {
744 let parse_ctx = ParseContext {
745 cached_headers: &mut None,
746 req_method: &mut None,
747 h1_parser_config: Default::default(),
748 #[cfg(feature = "runtime")]
749 h1_header_read_timeout: None,
750 #[cfg(feature = "runtime")]
751 h1_header_read_timeout_fut: &mut None,
752 #[cfg(feature = "runtime")]
753 h1_header_read_timeout_running: &mut false,
754 preserve_header_case: false,
755 #[cfg(feature = "ffi")]
756 preserve_header_order: false,
757 h09_responses: false,
758 #[cfg(feature = "ffi")]
759 on_informational: &mut None,
760 #[cfg(feature = "ffi")]
761 raw_headers: false,
762 };
763 assert!(buffered
764 .parse::<ClientTransaction>(cx, parse_ctx)
765 .is_pending());
766 Poll::Ready(())
767 })
768 .await;
769
770 assert_eq!(
771 buffered.read_buf,
772 b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
773 );
774 }
775
776 #[test]
777 fn read_strategy_adaptive_increments() {
778 let mut strategy = ReadStrategy::default();
779 assert_eq!(strategy.next(), 8192);
780
781 strategy.record(8192);
783 assert_eq!(strategy.next(), 16384);
784
785 strategy.record(16384);
786 assert_eq!(strategy.next(), 32768);
787
788 strategy.record(::std::usize::MAX);
790 assert_eq!(strategy.next(), 65536);
791
792 let max = strategy.max();
793 while strategy.next() < max {
794 strategy.record(max);
795 }
796
797 assert_eq!(strategy.next(), max, "never goes over max");
798 strategy.record(max + 1);
799 assert_eq!(strategy.next(), max, "never goes over max");
800 }
801
802 #[test]
803 fn read_strategy_adaptive_decrements() {
804 let mut strategy = ReadStrategy::default();
805 strategy.record(8192);
806 assert_eq!(strategy.next(), 16384);
807
808 strategy.record(1);
809 assert_eq!(
810 strategy.next(),
811 16384,
812 "first smaller record doesn't decrement yet"
813 );
814 strategy.record(8192);
815 assert_eq!(strategy.next(), 16384, "record was with range");
816
817 strategy.record(1);
818 assert_eq!(
819 strategy.next(),
820 16384,
821 "in-range record should make this the 'first' again"
822 );
823
824 strategy.record(1);
825 assert_eq!(strategy.next(), 8192, "second smaller record decrements");
826
827 strategy.record(1);
828 assert_eq!(strategy.next(), 8192, "first doesn't decrement");
829 strategy.record(1);
830 assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
831 }
832
833 #[test]
834 fn read_strategy_adaptive_stays_the_same() {
835 let mut strategy = ReadStrategy::default();
836 strategy.record(8192);
837 assert_eq!(strategy.next(), 16384);
838
839 strategy.record(8193);
840 assert_eq!(
841 strategy.next(),
842 16384,
843 "first smaller record doesn't decrement yet"
844 );
845
846 strategy.record(8193);
847 assert_eq!(
848 strategy.next(),
849 16384,
850 "with current step does not decrement"
851 );
852 }
853
854 #[test]
855 fn read_strategy_adaptive_max_fuzz() {
856 fn fuzz(max: usize) {
857 let mut strategy = ReadStrategy::with_max(max);
858 while strategy.next() < max {
859 strategy.record(::std::usize::MAX);
860 }
861 let mut next = strategy.next();
862 while next > 8192 {
863 strategy.record(1);
864 strategy.record(1);
865 next = strategy.next();
866 assert!(
867 next.is_power_of_two(),
868 "decrement should be powers of two: {} (max = {})",
869 next,
870 max,
871 );
872 }
873 }
874
875 let mut max = 8192;
876 while max < std::usize::MAX {
877 fuzz(max);
878 max = (max / 2).saturating_mul(3);
879 }
880 fuzz(::std::usize::MAX);
881 }
882
883 #[test]
884 #[should_panic]
885 #[cfg(debug_assertions)] fn write_buf_requires_non_empty_bufs() {
887 let mock = Mock::new().build();
888 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
889
890 buffered.buffer(Cursor::new(Vec::new()));
891 }
892
893 #[tokio::test]
917 async fn write_buf_flatten() {
918 let _ = pretty_env_logger::try_init();
919
920 let mock = Mock::new().write(b"hello world, it's hyper!").build();
921
922 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
923 buffered.write_buf.set_strategy(WriteStrategy::Flatten);
924
925 buffered.headers_buf().extend(b"hello ");
926 buffered.buffer(Cursor::new(b"world, ".to_vec()));
927 buffered.buffer(Cursor::new(b"it's ".to_vec()));
928 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
929 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
930
931 buffered.flush().await.expect("flush");
932 }
933
934 #[test]
935 fn write_buf_flatten_partially_flushed() {
936 let _ = pretty_env_logger::try_init();
937
938 let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
939
940 let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
941
942 write_buf.buffer(b("hello "));
943 write_buf.buffer(b("world, "));
944
945 assert_eq!(write_buf.chunk(), b"hello world, ");
946
947 write_buf.advance(11);
949
950 assert_eq!(write_buf.chunk(), b", ");
951 assert_eq!(write_buf.headers.pos, 11);
952 assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
953
954 write_buf.buffer(b("it's hyper!"));
956
957 assert_eq!(write_buf.chunk(), b", it's hyper!");
958 assert_eq!(write_buf.headers.pos, 11);
959
960 let rem1 = write_buf.remaining();
961 let cap = write_buf.headers.bytes.capacity();
962
963 write_buf.buffer(Cursor::new(vec![b'X'; cap]));
965 assert_eq!(write_buf.remaining(), cap + rem1);
966 assert_eq!(write_buf.headers.pos, 0);
967 }
968
969 #[tokio::test]
970 async fn write_buf_queue_disable_auto() {
971 let _ = pretty_env_logger::try_init();
972
973 let mock = Mock::new()
974 .write(b"hello ")
975 .write(b"world, ")
976 .write(b"it's ")
977 .write(b"hyper!")
978 .build();
979
980 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
981 buffered.write_buf.set_strategy(WriteStrategy::Queue);
982
983 buffered.headers_buf().extend(b"hello ");
987 buffered.buffer(Cursor::new(b"world, ".to_vec()));
988 buffered.buffer(Cursor::new(b"it's ".to_vec()));
989 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
990 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
991
992 buffered.flush().await.expect("flush");
993
994 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
995 }
996
997 }