1#![forbid(unsafe_code)]
2
3use alloc::collections::VecDeque;
4use alloc::vec::Vec;
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use core::cmp;
7use core::iter::FromIterator;
8
9#[cfg(feature = "std")]
10use std::io::IoSlice;
11
12#[cfg(feature = "std")]
13fn chunks_vectored<'s, B, I>(bufs: I, dst: &mut [IoSlice<'s>]) -> usize
14where
15 I: Iterator<Item = &'s B>,
16 B: Buf + 's,
17{
18 let mut filled = 0;
19 for buf in bufs {
20 if filled == dst.len() {
21 break;
22 }
23 filled += buf.chunks_vectored(&mut dst[filled..]);
24 }
25 filled
26}
27
28#[derive(Debug, Default)]
64pub struct SegmentedSlice<'a, B> {
65 remaining: usize,
66 idx: usize,
67 bufs: &'a mut [B],
68}
69
70impl<'a, B: Buf> SegmentedSlice<'a, B> {
71 pub fn new(bufs: &'a mut [B]) -> Self {
79 let remaining = bufs.iter().map(Buf::remaining).sum();
80 let mut me = Self {
81 remaining,
82 idx: 0,
83 bufs,
84 };
85 me.clean_empty();
86 me
87 }
88
89 fn clean_empty(&mut self) {
90 while self.idx < self.bufs.len() && !self.bufs[self.idx].has_remaining() {
91 self.idx += 1;
92 }
93 }
94}
95
96impl<'a, B: Buf> Buf for SegmentedSlice<'a, B> {
97 fn remaining(&self) -> usize {
98 self.remaining
99 }
100
101 fn chunk(&self) -> &[u8] {
102 self.bufs.get(self.idx).map(Buf::chunk).unwrap_or_default()
103 }
104
105 fn advance(&mut self, mut cnt: usize) {
106 self.remaining -= cnt;
107 while cnt > 0 {
108 let first = &mut self.bufs[self.idx];
109 let rem = first.remaining();
110 let segment = cmp::min(rem, cnt);
111 first.advance(segment);
112 cnt -= segment;
113 self.clean_empty();
114 }
115 }
116
117 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
118 assert!(len <= self.remaining(), "`len` greater than remaining");
119 match self.bufs.get_mut(self.idx) {
120 Some(front) if front.remaining() >= len => {
124 self.remaining -= len;
125 let res = front.copy_to_bytes(len);
126 self.clean_empty();
127 res
128 }
129 _ => {
132 let mut res = BytesMut::with_capacity(len);
133 res.put(self.take(len));
134 res.freeze()
135 }
136 }
137 }
138
139 #[cfg(feature = "std")]
140 fn chunks_vectored<'s>(&'s self, dst: &mut [IoSlice<'s>]) -> usize {
141 let bufs = self.bufs.get(self.idx..).unwrap_or_default();
142 chunks_vectored(bufs.iter(), dst)
143 }
144}
145
146#[derive(Clone, Debug)]
245pub struct SegmentedBuf<B> {
246 bufs: VecDeque<B>,
247 remaining: usize,
249}
250
251impl<B> SegmentedBuf<B> {
252 pub fn new() -> Self {
259 Self::default()
260 }
261
262 pub fn into_inner(self) -> VecDeque<B> {
264 self.into()
265 }
266
267 pub fn segments(&self) -> usize {
269 self.bufs.len()
270 }
271}
272
273impl<B: Buf> SegmentedBuf<B> {
274 pub fn push(&mut self, buf: B) {
278 self.remaining += buf.remaining();
279 self.bufs.push_back(buf);
280 self.clean_empty();
281 }
282 fn update_remaining(&mut self) {
283 self.remaining = self.bufs.iter().map(Buf::remaining).sum();
284 }
285 fn clean_empty(&mut self) {
286 loop {
287 match self.bufs.front() {
288 Some(b) if !b.has_remaining() => {
289 self.bufs.pop_front();
290 }
291 _ => break,
292 }
293 }
294 }
295}
296
297impl<B> Default for SegmentedBuf<B> {
298 fn default() -> Self {
299 Self {
300 bufs: VecDeque::new(),
301 remaining: 0,
302 }
303 }
304}
305
306impl<B: Buf> From<Vec<B>> for SegmentedBuf<B> {
307 fn from(bufs: Vec<B>) -> Self {
308 Self::from(VecDeque::from(bufs))
309 }
310}
311
312impl<B: Buf> From<VecDeque<B>> for SegmentedBuf<B> {
313 fn from(bufs: VecDeque<B>) -> Self {
314 let mut me = Self { bufs, remaining: 0 };
315 me.clean_empty();
316 me.update_remaining();
317 me
318 }
319}
320
321impl<B> From<SegmentedBuf<B>> for VecDeque<B> {
322 fn from(me: SegmentedBuf<B>) -> Self {
323 me.bufs
324 }
325}
326
327impl<B: Buf> Extend<B> for SegmentedBuf<B> {
328 fn extend<T: IntoIterator<Item = B>>(&mut self, iter: T) {
329 self.bufs.extend(iter);
330 self.clean_empty();
331 self.update_remaining();
332 }
333}
334
335impl<B: Buf> FromIterator<B> for SegmentedBuf<B> {
336 fn from_iter<T: IntoIterator<Item = B>>(iter: T) -> Self {
337 let mut me = Self {
338 bufs: VecDeque::from_iter(iter),
339 remaining: 0,
340 };
341 me.clean_empty();
342 me.update_remaining();
343 me
344 }
345}
346
347impl<B: Buf> Buf for SegmentedBuf<B> {
348 fn remaining(&self) -> usize {
349 self.remaining
350 }
351
352 fn chunk(&self) -> &[u8] {
353 self.bufs.front().map(Buf::chunk).unwrap_or_default()
354 }
355
356 fn advance(&mut self, mut cnt: usize) {
357 assert!(cnt <= self.remaining, "Advance past the end of buffer");
358 self.remaining -= cnt;
359 while cnt > 0 {
360 let front = self
361 .bufs
362 .front_mut()
363 .expect("Missing buffers to provide remaining");
364 let front_remaining = front.remaining();
365 if front_remaining >= cnt {
366 front.advance(cnt);
367 break;
368 } else {
369 cnt -= front_remaining;
371 self.bufs.pop_front();
372 }
373 }
374 self.clean_empty();
375 }
376
377 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
378 assert!(len <= self.remaining(), "`len` greater than remaining");
379 match self.bufs.front_mut() {
380 Some(front) if front.remaining() >= len => {
384 self.remaining -= len;
385 let res = front.copy_to_bytes(len);
386 self.clean_empty();
387 res
388 }
389 _ => {
392 let mut res = BytesMut::with_capacity(len);
393 res.put(self.take(len));
394 res.freeze()
395 }
396 }
397 }
398
399 #[cfg(feature = "std")]
400 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
401 chunks_vectored(self.bufs.iter(), dst)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use std::io::Read;
408
409 use super::*;
410
411 #[test]
412 fn empty() {
413 let mut b = SegmentedBuf::<Bytes>::new();
414
415 assert!(!b.has_remaining());
416 assert_eq!(0, b.remaining());
417 assert!(b.chunk().is_empty());
418 assert_eq!(0, b.segments());
419
420 b.copy_to_slice(&mut []);
421 b.advance(0);
422 assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
423 }
424
425 #[test]
426 fn empty_slices() {
427 let mut b = SegmentedSlice::<&[u8]>::default();
428
429 assert!(!b.has_remaining());
430 assert_eq!(0, b.remaining());
431 assert!(b.chunk().is_empty());
432
433 b.copy_to_slice(&mut []);
434 b.advance(0);
435 assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
436 }
437
438 fn segmented() -> SegmentedBuf<Bytes> {
439 vec![
440 Bytes::from("Hello"),
441 Bytes::from(" "),
442 Bytes::new(),
443 Bytes::from("World"),
444 ]
445 .into()
446 }
447
448 #[test]
449 fn segments() {
450 let mut b = segmented();
451 assert_eq!(11, b.remaining());
452 assert_eq!(b"Hello", b.chunk());
453 assert_eq!(4, b.segments());
454 b.advance(3);
455 assert_eq!(8, b.remaining());
456 assert_eq!(b"lo", b.chunk());
457 assert_eq!(4, b.segments());
458 }
459
460 #[test]
461 fn to_bytes_all() {
462 let mut b = segmented();
463 let bytes = b.copy_to_bytes(11);
464 assert_eq!("Hello World", &bytes);
465 }
466
467 #[test]
468 fn advance_within() {
469 let mut b = segmented();
470 b.advance(2);
471 assert_eq!(4, b.segments());
472 assert_eq!(9, b.remaining());
473 assert_eq!(b"llo", b.chunk());
474 }
475
476 #[test]
477 fn advance_border() {
478 let mut b = segmented();
479 b.advance(5);
480 assert_eq!(3, b.segments());
481 assert_eq!(6, b.remaining());
482 assert_eq!(b" ", b.chunk());
483 }
484
485 #[test]
486 fn advance_across() {
487 let mut b = segmented();
488 b.advance(7);
489 assert_eq!(1, b.segments());
490 assert_eq!(4, b.remaining());
491 assert_eq!(b"orld", b.chunk());
492 }
493
494 #[test]
495 fn empty_at_border() {
496 let mut b = segmented();
497 b.advance(6);
498 assert_eq!(1, b.segments());
499 assert_eq!(5, b.remaining());
500 assert_eq!(b"World", b.chunk());
501 }
502
503 #[test]
504 fn empty_bufs() {
505 fn is_empty(b: &SegmentedBuf<Bytes>) {
506 assert_eq!(0, b.segments());
507 assert_eq!(0, b.remaining());
508 assert_eq!(b"", b.chunk());
509 }
510
511 is_empty(&vec![].into());
512 is_empty(&vec![Bytes::new(), Bytes::new()].into());
513 is_empty(&vec![Bytes::new(), Bytes::new()].into_iter().collect());
514
515 let mut b = SegmentedBuf::new();
516 is_empty(&b);
517 b.push(Bytes::new());
518 is_empty(&b);
519 b.extend(vec![Bytes::new(), Bytes::new()]);
520 is_empty(&b);
521 }
522
523 #[test]
524 fn sliced_hello() {
525 let mut buffers = [b"Hello" as &[_], b"", b" ", b"", b"World"];
526 let buf = SegmentedSlice::new(&mut buffers);
527
528 assert_eq!(11, buf.remaining());
529 assert_eq!(b"Hello", buf.chunk());
530
531 let mut out = String::new();
532 buf.reader()
533 .read_to_string(&mut out)
534 .expect("Doesn't cause IO errors");
535 assert_eq!("Hello World", out);
536 }
537
538 #[test]
539 fn chunk_vectored() {
540 let mut b = segmented();
541 assert_eq!(b.chunks_vectored(&mut []), 0);
542 let mut slices = [IoSlice::new(&[]); 5];
543 assert_eq!(b.segments(), 4);
544 assert_eq!(b.chunks_vectored(&mut slices), 3);
545 assert_eq!(&*slices[0], b"Hello");
546 assert_eq!(&*slices[1], b" ");
547 assert_eq!(&*slices[2], b"World");
548 b.advance(2);
549 let mut slices = [IoSlice::new(&[]); 1];
550 assert_eq!(b.chunks_vectored(&mut slices), 1);
551 assert_eq!(&*slices[0], b"llo");
552 }
553
554 #[test]
555 fn chunk_vectored_nested() {
556 let mut bufs = [segmented(), segmented()];
557 let mut bufs = SegmentedSlice::new(&mut bufs);
558 let mut slices = [IoSlice::new(&[]); 10];
559 assert_eq!(bufs.chunks_vectored(&mut slices), 6);
560 assert_eq!(&*slices[0], b"Hello");
561 assert_eq!(&*slices[1], b" ");
562 assert_eq!(&*slices[2], b"World");
563 assert_eq!(&*slices[3], b"Hello");
564 assert_eq!(&*slices[4], b" ");
565 assert_eq!(&*slices[5], b"World");
566 bufs.advance(2);
567 let mut slices = [IoSlice::new(&[]); 1];
568 assert_eq!(bufs.chunks_vectored(&mut slices), 1);
569 assert_eq!(&*slices[0], b"llo");
570 }
571
572 #[cfg(not(miri))]
573 mod proptests {
574
575 use super::*;
576 use proptest::prelude::*;
577 use std::ops::Deref;
578
579 proptest! {
580 #[test]
581 fn random(bufs: Vec<Vec<u8>>, splits in proptest::collection::vec(0..10usize, 1..10)) {
582 let concat: Vec<u8> = bufs.iter().flat_map(|b| b.iter()).copied().collect();
583 let mut segmented = bufs.iter()
584 .map(|b| &b[..])
585 .collect::<SegmentedBuf<_>>();
586 assert_eq!(concat.len(), segmented.remaining());
587 assert!(segmented.segments() <= bufs.len());
588 assert!(concat.starts_with(segmented.chunk()));
589 let mut bytes = segmented.clone().copy_to_bytes(segmented.remaining());
590 assert_eq!(&concat[..], &bytes[..]);
591 let mut sliced = bufs.iter().map(Deref::deref).collect::<Vec<&[u8]>>();
592 let mut sliced = SegmentedSlice::new(&mut sliced);
593
594 let mut fifo = SegmentedBuf::new();
595 let mut buf_pos = bufs.iter();
596
597 for split in splits {
598 if !bytes.has_remaining() {
599 break;
600 }
601 let split = cmp::min(bytes.remaining(), split);
602 while fifo.remaining() < split {
603 fifo.push(&buf_pos.next().unwrap()[..]);
604 }
605 let c1 = bytes.copy_to_bytes(split);
606 let c2 = segmented.copy_to_bytes(split);
607 let c3 = sliced.copy_to_bytes(split);
608 assert_eq!(c1, c2);
609 assert_eq!(c1, c3);
610 assert_eq!(bytes.remaining(), segmented.remaining());
611 assert_eq!(bytes.remaining(), sliced.remaining());
612 }
613 }
614 }
615 }
616}