bytes_utils/
segmented.rs

1#![forbid(unsafe_code)]
2
3use alloc::collections::VecDeque;
4use alloc::vec::Vec;
5use bytes::{Buf, BufMut, Bytes, BytesMut};
6use core::cmp;
7use core::iter::FromIterator;
8
9#[cfg(feature = "std")]
10use std::io::IoSlice;
11
12#[cfg(feature = "std")]
13fn chunks_vectored<'s, B, I>(bufs: I, dst: &mut [IoSlice<'s>]) -> usize
14where
15    I: Iterator<Item = &'s B>,
16    B: Buf + 's,
17{
18    let mut filled = 0;
19    for buf in bufs {
20        if filled == dst.len() {
21            break;
22        }
23        filled += buf.chunks_vectored(&mut dst[filled..]);
24    }
25    filled
26}
27
28/// A consumable view of a sequence of buffers.
29///
30/// This allows viewing a sequence of buffers as one buffer, without copying the bytes over. Unlike
31/// the [SegmentedBuf], this doesn't allow for appending more buffers and doesn't drop the buffers
32/// as they are exhausted (though they all get exhausted, no leftovers are kept in them as the
33/// caller advances through it). On the other hand, it doesn't require an internal allocation in
34/// the form of VecDeque and can be based on any kind of slice.
35///
36/// # Example
37///
38/// ```rust
39/// # use bytes_utils::SegmentedSlice;
40/// # use bytes::Buf;
41/// # use std::io::Read;
42/// let mut buffers = [b"Hello" as &[_], b"", b" ", b"", b"World"];
43/// let buf = SegmentedSlice::new(&mut buffers);
44///
45/// assert_eq!(11, buf.remaining());
46/// assert_eq!(b"Hello", buf.chunk());
47///
48/// let mut out = String::new();
49/// buf.reader().read_to_string(&mut out).expect("Doesn't cause IO errors");
50/// assert_eq!("Hello World", out);
51/// ```
52///
53/// # Optimizations
54///
55/// The [copy_to_bytes][SegmentedSlice::copy_to_bytes] method tries to avoid copies by delegating
56/// into the underlying buffer if possible (if the whole request can be fulfilled using only a
57/// single buffer). If that one is optimized (for example, the [Bytes] returns a shared instance
58/// instead of making a copy), the copying is avoided. If the request is across a buffer boundary,
59/// a copy is made.
60///
61/// The [chunks_vectored][SegmentedSlice::chunks_vectored] will properly output as many slices as
62/// possible, not just 1 as the default implementation does.
63#[derive(Debug, Default)]
64pub struct SegmentedSlice<'a, B> {
65    remaining: usize,
66    idx: usize,
67    bufs: &'a mut [B],
68}
69
70impl<'a, B: Buf> SegmentedSlice<'a, B> {
71    /// Creates a new buffer out of a slice of buffers.
72    ///
73    /// The buffers will then be taken in order to form one bigger buffer.
74    ///
75    /// Each of the buffers in turn will be exhausted using its [advance][Buf::advance] before
76    /// proceeding to the next one. Note that the buffers are not dropped (unlike with
77    /// [SegmentedBuf]).
78    pub fn new(bufs: &'a mut [B]) -> Self {
79        let remaining = bufs.iter().map(Buf::remaining).sum();
80        let mut me = Self {
81            remaining,
82            idx: 0,
83            bufs,
84        };
85        me.clean_empty();
86        me
87    }
88
89    fn clean_empty(&mut self) {
90        while self.idx < self.bufs.len() && !self.bufs[self.idx].has_remaining() {
91            self.idx += 1;
92        }
93    }
94}
95
96impl<'a, B: Buf> Buf for SegmentedSlice<'a, B> {
97    fn remaining(&self) -> usize {
98        self.remaining
99    }
100
101    fn chunk(&self) -> &[u8] {
102        self.bufs.get(self.idx).map(Buf::chunk).unwrap_or_default()
103    }
104
105    fn advance(&mut self, mut cnt: usize) {
106        self.remaining -= cnt;
107        while cnt > 0 {
108            let first = &mut self.bufs[self.idx];
109            let rem = first.remaining();
110            let segment = cmp::min(rem, cnt);
111            first.advance(segment);
112            cnt -= segment;
113            self.clean_empty();
114        }
115    }
116
117    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
118        assert!(len <= self.remaining(), "`len` greater than remaining");
119        match self.bufs.get_mut(self.idx) {
120            // Special optimized case. The whole request comes from the front buffer. That one may
121            // be optimized to do something more efficient, like slice the Bytes (if B == Bytes)
122            // instead of copying, so we take the opportunity if it offers itself.
123            Some(front) if front.remaining() >= len => {
124                self.remaining -= len;
125                let res = front.copy_to_bytes(len);
126                self.clean_empty();
127                res
128            }
129            // The general case, borrowed from the default implementation (there's no way to
130            // delegate to it, is there?)
131            _ => {
132                let mut res = BytesMut::with_capacity(len);
133                res.put(self.take(len));
134                res.freeze()
135            }
136        }
137    }
138
139    #[cfg(feature = "std")]
140    fn chunks_vectored<'s>(&'s self, dst: &mut [IoSlice<'s>]) -> usize {
141        let bufs = self.bufs.get(self.idx..).unwrap_or_default();
142        chunks_vectored(bufs.iter(), dst)
143    }
144}
145
146/// A concatenation of multiple buffers into a large one, without copying the bytes over.
147///
148/// Note that this doesn't provide a continuous slice view into them, it is split into the segments
149/// of the original smaller buffers.
150///
151/// This variants drop the inner buffers as they are exhausted and new ones can be added. But it
152/// internally keeps a [VecDeque], therefore needs a heap allocation. If you don't need the
153/// extending behaviour, but want to avoid the allocation, the [SegmentedSlice] can be used instead.
154///
155/// # Why
156///
157/// This can be used, for example, if data of unknown length is coming over the network (for
158/// example, the bodies in [hyper] act a bit like this, it returns a stream of [Bytes] buffers).
159/// One might want to accumulate the whole body before acting on it, possibly by parsing it through
160/// [serde] or [prost]. Options would include:
161///
162/// * Have a `Vec<u8>` and extend it with each chunk. This needlessly copy the bytes every time and
163///   reallocates if the vector grows too large.
164/// * Repeatedly use [chain][Buf::chain], but this changes the type of the whole buffer, therefore
165///   needs to be boxed.
166/// * Use [hyper::body::aggregate] to create a [Buf] implementation that concatenates all of them
167///   together, but lacks any kind of flexibility (like protecting against loading too much data
168///   into memory).
169///
170/// This type allows for concatenating multiple buffers, either all at once, or by incrementally
171/// pushing more buffers to the end.
172///
173/// # Heterogeneous buffers
174///
175/// This expects all the buffers are of the same type. If different-typed buffers are needed, one
176/// needs to use dynamic dispatch, either something like `SegmentedBuf<Box<Buf>>` or
177/// `SegmentedBuf<&mut Buf>`.
178///
179/// # Example
180///
181/// ```rust
182/// # use std::io::Read;
183/// # use bytes::{Bytes, Buf};
184/// # use bytes_utils::SegmentedBuf;
185/// let mut buf = SegmentedBuf::new();
186/// buf.push(Bytes::from("Hello"));
187/// buf.push(Bytes::from(" "));
188/// buf.push(Bytes::from("World"));
189///
190/// assert_eq!(3, buf.segments());
191/// assert_eq!(11, buf.remaining());
192/// assert_eq!(b"Hello", buf.chunk());
193///
194/// let mut out = String::new();
195/// buf.reader().read_to_string(&mut out).expect("Doesn't cause IO errors");
196/// assert_eq!("Hello World", out);
197/// ```
198///
199/// # FIFO behaviour
200///
201/// The buffers are dropped once their data are completely consumed. Additionally, it is possible
202/// to add more buffers to the end, even while some of the previous buffers were partially or fully
203/// consumed. That makes it usable as kind of a queue (that operates on the buffers, not individual
204/// bytes).
205///
206/// ```rust
207/// # use bytes::{Bytes, Buf};
208/// # use bytes_utils::SegmentedBuf;
209/// let mut buf = SegmentedBuf::new();
210/// buf.push(Bytes::from("Hello"));
211/// assert_eq!(1, buf.segments());
212///
213/// let mut out = [0; 3];
214/// buf.copy_to_slice(&mut out);
215/// assert_eq!(&out, b"Hel");
216/// assert_eq!(2, buf.remaining());
217/// assert_eq!(1, buf.segments());
218///
219/// buf.push(Bytes::from("World"));
220/// assert_eq!(7, buf.remaining());
221/// assert_eq!(2, buf.segments());
222///
223/// buf.copy_to_slice(&mut out);
224/// assert_eq!(&out, b"loW");
225/// assert_eq!(4, buf.remaining());
226/// assert_eq!(1, buf.segments());
227/// ```
228///
229/// # Optimizations
230///
231/// The [copy_to_bytes][SegmentedBuf::copy_to_bytes] method tries to avoid copies by delegating
232/// into the underlying buffer if possible (if the whole request can be fulfilled using only a
233/// single buffer). If that one is optimized (for example, the [Bytes] returns a shared instance
234/// instead of making a copy), the copying is avoided. If the request is across a buffer boundary,
235/// a copy is made.
236///
237/// The [chunks_vectored][SegmentedBuf::chunks_vectored] will properly output as many slices as
238/// possible, not just 1 as the default implementation does.
239///
240/// [hyper]: https://docs.rs/hyper
241/// [serde]: https://docs.rs/serde
242/// [prost]: https://docs.rs/prost
243/// [hyper::body::aggregate]: https://docs.rs/hyper/0.14.2/hyper/body/fn.aggregate.html
244#[derive(Clone, Debug)]
245pub struct SegmentedBuf<B> {
246    bufs: VecDeque<B>,
247    // Pre-computed sum of the total remaning
248    remaining: usize,
249}
250
251impl<B> SegmentedBuf<B> {
252    /// Creates a new empty instance.
253    ///
254    /// The instance can be [pushed][SegmentedBuf::push] or [extended][Extend] later.
255    ///
256    /// Alternatively, one may create it directly from an iterator, a [Vec] or a [VecDeque] of
257    /// buffers.
258    pub fn new() -> Self {
259        Self::default()
260    }
261
262    /// Returns the yet unconsumed sequence of buffers.
263    pub fn into_inner(self) -> VecDeque<B> {
264        self.into()
265    }
266
267    /// Returns the number of segments (buffers) this contains.
268    pub fn segments(&self) -> usize {
269        self.bufs.len()
270    }
271}
272
273impl<B: Buf> SegmentedBuf<B> {
274    /// Extends the buffer by another segment.
275    ///
276    /// The newly added segment is added to the end of the buffer (the buffer works as a FIFO).
277    pub fn push(&mut self, buf: B) {
278        self.remaining += buf.remaining();
279        self.bufs.push_back(buf);
280        self.clean_empty();
281    }
282    fn update_remaining(&mut self) {
283        self.remaining = self.bufs.iter().map(Buf::remaining).sum();
284    }
285    fn clean_empty(&mut self) {
286        loop {
287            match self.bufs.front() {
288                Some(b) if !b.has_remaining() => {
289                    self.bufs.pop_front();
290                }
291                _ => break,
292            }
293        }
294    }
295}
296
297impl<B> Default for SegmentedBuf<B> {
298    fn default() -> Self {
299        Self {
300            bufs: VecDeque::new(),
301            remaining: 0,
302        }
303    }
304}
305
306impl<B: Buf> From<Vec<B>> for SegmentedBuf<B> {
307    fn from(bufs: Vec<B>) -> Self {
308        Self::from(VecDeque::from(bufs))
309    }
310}
311
312impl<B: Buf> From<VecDeque<B>> for SegmentedBuf<B> {
313    fn from(bufs: VecDeque<B>) -> Self {
314        let mut me = Self { bufs, remaining: 0 };
315        me.clean_empty();
316        me.update_remaining();
317        me
318    }
319}
320
321impl<B> From<SegmentedBuf<B>> for VecDeque<B> {
322    fn from(me: SegmentedBuf<B>) -> Self {
323        me.bufs
324    }
325}
326
327impl<B: Buf> Extend<B> for SegmentedBuf<B> {
328    fn extend<T: IntoIterator<Item = B>>(&mut self, iter: T) {
329        self.bufs.extend(iter);
330        self.clean_empty();
331        self.update_remaining();
332    }
333}
334
335impl<B: Buf> FromIterator<B> for SegmentedBuf<B> {
336    fn from_iter<T: IntoIterator<Item = B>>(iter: T) -> Self {
337        let mut me = Self {
338            bufs: VecDeque::from_iter(iter),
339            remaining: 0,
340        };
341        me.clean_empty();
342        me.update_remaining();
343        me
344    }
345}
346
347impl<B: Buf> Buf for SegmentedBuf<B> {
348    fn remaining(&self) -> usize {
349        self.remaining
350    }
351
352    fn chunk(&self) -> &[u8] {
353        self.bufs.front().map(Buf::chunk).unwrap_or_default()
354    }
355
356    fn advance(&mut self, mut cnt: usize) {
357        assert!(cnt <= self.remaining, "Advance past the end of buffer");
358        self.remaining -= cnt;
359        while cnt > 0 {
360            let front = self
361                .bufs
362                .front_mut()
363                .expect("Missing buffers to provide remaining");
364            let front_remaining = front.remaining();
365            if front_remaining >= cnt {
366                front.advance(cnt);
367                break;
368            } else {
369                // We advance past the whole front buffer
370                cnt -= front_remaining;
371                self.bufs.pop_front();
372            }
373        }
374        self.clean_empty();
375    }
376
377    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
378        assert!(len <= self.remaining(), "`len` greater than remaining");
379        match self.bufs.front_mut() {
380            // Special optimized case. The whole request comes from the front buffer. That one may
381            // be optimized to do something more efficient, like slice the Bytes (if B == Bytes)
382            // instead of copying, so we take the opportunity if it offers itself.
383            Some(front) if front.remaining() >= len => {
384                self.remaining -= len;
385                let res = front.copy_to_bytes(len);
386                self.clean_empty();
387                res
388            }
389            // The general case, borrowed from the default implementation (there's no way to
390            // delegate to it, is there?)
391            _ => {
392                let mut res = BytesMut::with_capacity(len);
393                res.put(self.take(len));
394                res.freeze()
395            }
396        }
397    }
398
399    #[cfg(feature = "std")]
400    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
401        chunks_vectored(self.bufs.iter(), dst)
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use std::io::Read;
408
409    use super::*;
410
411    #[test]
412    fn empty() {
413        let mut b = SegmentedBuf::<Bytes>::new();
414
415        assert!(!b.has_remaining());
416        assert_eq!(0, b.remaining());
417        assert!(b.chunk().is_empty());
418        assert_eq!(0, b.segments());
419
420        b.copy_to_slice(&mut []);
421        b.advance(0);
422        assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
423    }
424
425    #[test]
426    fn empty_slices() {
427        let mut b = SegmentedSlice::<&[u8]>::default();
428
429        assert!(!b.has_remaining());
430        assert_eq!(0, b.remaining());
431        assert!(b.chunk().is_empty());
432
433        b.copy_to_slice(&mut []);
434        b.advance(0);
435        assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
436    }
437
438    fn segmented() -> SegmentedBuf<Bytes> {
439        vec![
440            Bytes::from("Hello"),
441            Bytes::from(" "),
442            Bytes::new(),
443            Bytes::from("World"),
444        ]
445        .into()
446    }
447
448    #[test]
449    fn segments() {
450        let mut b = segmented();
451        assert_eq!(11, b.remaining());
452        assert_eq!(b"Hello", b.chunk());
453        assert_eq!(4, b.segments());
454        b.advance(3);
455        assert_eq!(8, b.remaining());
456        assert_eq!(b"lo", b.chunk());
457        assert_eq!(4, b.segments());
458    }
459
460    #[test]
461    fn to_bytes_all() {
462        let mut b = segmented();
463        let bytes = b.copy_to_bytes(11);
464        assert_eq!("Hello World", &bytes);
465    }
466
467    #[test]
468    fn advance_within() {
469        let mut b = segmented();
470        b.advance(2);
471        assert_eq!(4, b.segments());
472        assert_eq!(9, b.remaining());
473        assert_eq!(b"llo", b.chunk());
474    }
475
476    #[test]
477    fn advance_border() {
478        let mut b = segmented();
479        b.advance(5);
480        assert_eq!(3, b.segments());
481        assert_eq!(6, b.remaining());
482        assert_eq!(b" ", b.chunk());
483    }
484
485    #[test]
486    fn advance_across() {
487        let mut b = segmented();
488        b.advance(7);
489        assert_eq!(1, b.segments());
490        assert_eq!(4, b.remaining());
491        assert_eq!(b"orld", b.chunk());
492    }
493
494    #[test]
495    fn empty_at_border() {
496        let mut b = segmented();
497        b.advance(6);
498        assert_eq!(1, b.segments());
499        assert_eq!(5, b.remaining());
500        assert_eq!(b"World", b.chunk());
501    }
502
503    #[test]
504    fn empty_bufs() {
505        fn is_empty(b: &SegmentedBuf<Bytes>) {
506            assert_eq!(0, b.segments());
507            assert_eq!(0, b.remaining());
508            assert_eq!(b"", b.chunk());
509        }
510
511        is_empty(&vec![].into());
512        is_empty(&vec![Bytes::new(), Bytes::new()].into());
513        is_empty(&vec![Bytes::new(), Bytes::new()].into_iter().collect());
514
515        let mut b = SegmentedBuf::new();
516        is_empty(&b);
517        b.push(Bytes::new());
518        is_empty(&b);
519        b.extend(vec![Bytes::new(), Bytes::new()]);
520        is_empty(&b);
521    }
522
523    #[test]
524    fn sliced_hello() {
525        let mut buffers = [b"Hello" as &[_], b"", b" ", b"", b"World"];
526        let buf = SegmentedSlice::new(&mut buffers);
527
528        assert_eq!(11, buf.remaining());
529        assert_eq!(b"Hello", buf.chunk());
530
531        let mut out = String::new();
532        buf.reader()
533            .read_to_string(&mut out)
534            .expect("Doesn't cause IO errors");
535        assert_eq!("Hello World", out);
536    }
537
538    #[test]
539    fn chunk_vectored() {
540        let mut b = segmented();
541        assert_eq!(b.chunks_vectored(&mut []), 0);
542        let mut slices = [IoSlice::new(&[]); 5];
543        assert_eq!(b.segments(), 4);
544        assert_eq!(b.chunks_vectored(&mut slices), 3);
545        assert_eq!(&*slices[0], b"Hello");
546        assert_eq!(&*slices[1], b" ");
547        assert_eq!(&*slices[2], b"World");
548        b.advance(2);
549        let mut slices = [IoSlice::new(&[]); 1];
550        assert_eq!(b.chunks_vectored(&mut slices), 1);
551        assert_eq!(&*slices[0], b"llo");
552    }
553
554    #[test]
555    fn chunk_vectored_nested() {
556        let mut bufs = [segmented(), segmented()];
557        let mut bufs = SegmentedSlice::new(&mut bufs);
558        let mut slices = [IoSlice::new(&[]); 10];
559        assert_eq!(bufs.chunks_vectored(&mut slices), 6);
560        assert_eq!(&*slices[0], b"Hello");
561        assert_eq!(&*slices[1], b" ");
562        assert_eq!(&*slices[2], b"World");
563        assert_eq!(&*slices[3], b"Hello");
564        assert_eq!(&*slices[4], b" ");
565        assert_eq!(&*slices[5], b"World");
566        bufs.advance(2);
567        let mut slices = [IoSlice::new(&[]); 1];
568        assert_eq!(bufs.chunks_vectored(&mut slices), 1);
569        assert_eq!(&*slices[0], b"llo");
570    }
571
572    #[cfg(not(miri))]
573    mod proptests {
574
575        use super::*;
576        use proptest::prelude::*;
577        use std::ops::Deref;
578
579        proptest! {
580            #[test]
581            fn random(bufs: Vec<Vec<u8>>, splits in proptest::collection::vec(0..10usize, 1..10)) {
582                let concat: Vec<u8> = bufs.iter().flat_map(|b| b.iter()).copied().collect();
583                let mut segmented = bufs.iter()
584                    .map(|b| &b[..])
585                    .collect::<SegmentedBuf<_>>();
586                assert_eq!(concat.len(), segmented.remaining());
587                assert!(segmented.segments() <= bufs.len());
588                assert!(concat.starts_with(segmented.chunk()));
589                let mut bytes = segmented.clone().copy_to_bytes(segmented.remaining());
590                assert_eq!(&concat[..], &bytes[..]);
591                let mut sliced = bufs.iter().map(Deref::deref).collect::<Vec<&[u8]>>();
592                let mut sliced = SegmentedSlice::new(&mut sliced);
593
594                let mut fifo = SegmentedBuf::new();
595                let mut buf_pos = bufs.iter();
596
597                for split in splits {
598                    if !bytes.has_remaining() {
599                        break;
600                    }
601                    let split = cmp::min(bytes.remaining(), split);
602                    while fifo.remaining() < split {
603                        fifo.push(&buf_pos.next().unwrap()[..]);
604                    }
605                    let c1 = bytes.copy_to_bytes(split);
606                    let c2 = segmented.copy_to_bytes(split);
607                    let c3 = sliced.copy_to_bytes(split);
608                    assert_eq!(c1, c2);
609                    assert_eq!(c1, c3);
610                    assert_eq!(bytes.remaining(), segmented.remaining());
611                    assert_eq!(bytes.remaining(), sliced.remaining());
612                }
613            }
614        }
615    }
616}