aws_smithy_types/
byte_stream.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! ByteStream Abstractions
7//!
8//! When the SDK returns streaming binary data, the inner Http Body is
9//! wrapped in [`ByteStream`]. ByteStream provides misuse-resistant primitives
10//! to make it easier to handle common patterns with streaming data.
11//!
12//! # Examples
13//!
14//! ### Writing a ByteStream into a file:
15//! ```no_run
16//! use aws_smithy_types::byte_stream::ByteStream;
17//! use std::error::Error;
18//! use tokio::fs::File;
19//! use tokio::io::AsyncWriteExt;
20//! struct SynthesizeSpeechOutput {
21//!     audio_stream: ByteStream,
22//! }
23//!
24//! async fn audio_to_file(
25//!     output: SynthesizeSpeechOutput,
26//! ) -> Result<(), Box<dyn Error + Send + Sync>> {
27//!     let mut buf = output.audio_stream.collect().await?;
28//!     let mut file = File::open("audio.mp3").await?;
29//!     file.write_all_buf(&mut buf).await?;
30//!     file.flush().await?;
31//!     Ok(())
32//! }
33//! ```
34//!
35//! ### Converting a ByteStream into Bytes
36//! ```no_run
37//! use bytes::Bytes;
38//! use aws_smithy_types::byte_stream::ByteStream;
39//! use std::error::Error;
40//! struct SynthesizeSpeechOutput {
41//!     audio_stream: ByteStream,
42//! }
43//! async fn load_audio(
44//!     output: SynthesizeSpeechOutput,
45//! ) -> Result<Bytes, Box<dyn Error + Send + Sync>> {
46//!     Ok(output.audio_stream.collect().await?.into_bytes())
47//! }
48//! ```
49//!
50//! ### Stream a ByteStream into a file
51//! The previous example is recommended in cases where loading the entire file into memory first is desirable. For extremely large
52//! files, you may wish to stream the data directly to the file system, chunk by chunk.
53//! This is possible using the [`.next()`](crate::byte_stream::ByteStream::next) method.
54//!
55//! ```no_run
56//! use bytes::{Buf, Bytes};
57//! use aws_smithy_types::byte_stream::ByteStream;
58//! use std::error::Error;
59//! use tokio::fs::File;
60//! use tokio::io::AsyncWriteExt;
61//! use tokio_stream::StreamExt;
62//! struct SynthesizeSpeechOutput {
63//!     audio_stream: ByteStream,
64//! }
65//!
66//! async fn audio_to_file(
67//!     output: SynthesizeSpeechOutput,
68//! ) -> Result<(), Box<dyn Error + Send + Sync>> {
69//!     let mut file = File::open("audio.mp3").await?;
70//!     let mut stream = output.audio_stream;
71//!     while let Some(bytes) = stream.next().await {
72//!         let bytes: Bytes = bytes?;
73//!         file.write_all(&bytes).await?;
74//!     }
75//!     file.flush().await?;
76//!     Ok(())
77//! }
78//! ```
79//!
80//! ### Create a ByteStream from a file
81//!
82//! _Note: This is only available with `rt-tokio` enabled._
83//!
84//! ```no_run
85//! # #[cfg(feature = "rt-tokio")]
86//! # {
87//! use aws_smithy_types::byte_stream::ByteStream;
88//! use std::path::Path;
89//! struct GetObjectInput {
90//!   body: ByteStream
91//! }
92//!
93//! async fn bytestream_from_file() -> GetObjectInput {
94//!     let bytestream = ByteStream::from_path("docs/some-large-file.csv")
95//!         .await
96//!         .expect("valid path");
97//!     GetObjectInput { body: bytestream }
98//! }
99//! # }
100//! ```
101//!
102//! If you want more control over how the file is read, such as specifying the size of the buffer used to read the file
103//! or the length of the file, use an `FsBuilder`.
104//!
105//! ```no_run
106//! # #[cfg(feature = "rt-tokio")]
107//! # {
108//! use aws_smithy_types::byte_stream::{ByteStream, Length};
109//! use std::path::Path;
110//! struct GetObjectInput {
111//!     body: ByteStream
112//! }
113//!
114//! async fn bytestream_from_file() -> GetObjectInput {
115//!     let bytestream = ByteStream::read_from().path("docs/some-large-file.csv")
116//!         .buffer_size(32_784)
117//!         .length(Length::Exact(123_456))
118//!         .build()
119//!         .await
120//!         .expect("valid path");
121//!     GetObjectInput { body: bytestream }
122//! }
123//! # }
124//! ```
125
126use crate::body::SdkBody;
127use crate::byte_stream::error::Error;
128use bytes::Buf;
129use bytes::Bytes;
130use bytes_utils::SegmentedBuf;
131use pin_project_lite::pin_project;
132use std::future::poll_fn;
133use std::io::IoSlice;
134use std::pin::Pin;
135use std::task::{Context, Poll};
136
137#[cfg(feature = "rt-tokio")]
138mod bytestream_util;
139#[cfg(feature = "rt-tokio")]
140pub use bytestream_util::Length;
141
142pub mod error;
143
144#[cfg(feature = "rt-tokio")]
145pub use self::bytestream_util::FsBuilder;
146
147/// This module is named after the `http-body` version number since we anticipate
148/// needing to provide equivalent functionality for 1.x of that crate in the future.
149/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
150#[cfg(feature = "http-body-0-4-x")]
151pub mod http_body_0_4_x;
152
153#[cfg(feature = "http-body-1-x")]
154pub mod http_body_1_x;
155
156pin_project! {
157    /// Stream of binary data
158    ///
159    /// `ByteStream` wraps a stream of binary data for ease of use.
160    ///
161    /// ## Getting data out of a `ByteStream`
162    ///
163    /// `ByteStream` provides two primary mechanisms for accessing the data:
164    /// 1. With `.collect()`:
165    ///
166    ///     [`.collect()`](crate::byte_stream::ByteStream::collect) reads the complete ByteStream into memory and stores it in `AggregatedBytes`,
167    ///     a non-contiguous ByteBuffer.
168    ///     ```no_run
169    ///     use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes};
170    ///     use aws_smithy_types::body::SdkBody;
171    ///     use bytes::Buf;
172    ///     async fn example() {
173    ///        let stream = ByteStream::new(SdkBody::from("hello! This is some data"));
174    ///        // Load data from the stream into memory:
175    ///        let data = stream.collect().await.expect("error reading data");
176    ///        // collect returns a `bytes::Buf`:
177    ///        println!("first chunk: {:?}", data.chunk());
178    ///     }
179    ///     ```
180    /// 2. Via [`.next()`](crate::byte_stream::ByteStream::next) or [`.try_next()`](crate::byte_stream::ByteStream::try_next):
181    ///
182    ///     For use-cases where holding the entire ByteStream in memory is unnecessary, use the
183    ///     `Stream` implementation:
184    ///     ```no_run
185    ///     # mod crc32 {
186    ///     #   pub struct Digest { }
187    ///     #   impl Digest {
188    ///     #       pub fn new() -> Self { Digest {} }
189    ///     #       pub fn write(&mut self, b: &[u8]) { }
190    ///     #       pub fn finish(&self) -> u64 { 6 }
191    ///     #   }
192    ///     # }
193    ///     use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes, error::Error};
194    ///     use aws_smithy_types::body::SdkBody;
195    ///
196    ///     async fn example() -> Result<(), Error> {
197    ///        let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]);
198    ///        let mut digest = crc32::Digest::new();
199    ///        while let Some(bytes) = stream.try_next().await? {
200    ///            digest.write(&bytes);
201    ///        }
202    ///        println!("digest: {}", digest.finish());
203    ///        Ok(())
204    ///     }
205    ///     ```
206    ///
207    /// 3. Via [`.into_async_read()`](crate::byte_stream::ByteStream::into_async_read):
208    ///
209    ///     _Note: The `rt-tokio` feature must be active to use `.into_async_read()`._
210    ///
211    ///     It's possible to convert a `ByteStream` into a struct that implements [`tokio::io::AsyncBufRead`](tokio::io::AsyncBufRead).
212    ///     ```no_run
213    ///     use aws_smithy_types::byte_stream::ByteStream;
214    ///     use aws_smithy_types::body::SdkBody;
215    ///     use tokio::io::AsyncBufReadExt;
216    ///     #[cfg(feature = "rt-tokio")]
217    ///     async fn example() -> std::io::Result<()> {
218    ///        let stream = ByteStream::new(SdkBody::from("hello!\nThis is some data"));
219    ///        // Convert the stream to a BufReader
220    ///        let buf_reader = stream.into_async_read();
221    ///        let mut lines = buf_reader.lines();
222    ///        assert_eq!(lines.next_line().await?, Some("hello!".to_owned()));
223    ///        assert_eq!(lines.next_line().await?, Some("This is some data".to_owned()));
224    ///        assert_eq!(lines.next_line().await?, None);
225    ///        Ok(())
226    ///     }
227    ///     ```
228    ///
229    /// ## Getting data into a ByteStream
230    /// ByteStreams can be created in one of three ways:
231    /// 1. **From in-memory binary data**: ByteStreams created from in-memory data are always retryable. Data
232    /// will be converted into `Bytes` enabling a cheap clone during retries.
233    ///     ```no_run
234    ///     use bytes::Bytes;
235    ///     use aws_smithy_types::byte_stream::ByteStream;
236    ///     let stream = ByteStream::from(vec![1,2,3]);
237    ///     let stream = ByteStream::from(Bytes::from_static(b"hello!"));
238    ///     ```
239    ///
240    /// 2. **From a file**: ByteStreams created from a path can be retried. A new file descriptor will be opened if a retry occurs.
241    ///     ```no_run
242    ///     #[cfg(feature = "tokio-rt")]
243    ///     # {
244    ///     use aws_smithy_types::byte_stream::ByteStream;
245    ///     let stream = ByteStream::from_path("big_file.csv");
246    ///     # }
247    ///     ```
248    ///
249    /// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly
250    /// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable
251    /// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable).
252    ///     ```ignore
253    ///     # use hyper_0_14 as hyper;
254    ///     use aws_smithy_types::byte_stream::ByteStream;
255    ///     use aws_smithy_types::body::SdkBody;
256    ///     use bytes::Bytes;
257    ///     let (mut tx, channel_body) = hyper::Body::channel();
258    ///     // this will not be retryable because the SDK has no way to replay this stream
259    ///     let stream = ByteStream::new(SdkBody::from_body_0_4(channel_body));
260    ///     tx.send_data(Bytes::from_static(b"hello world!"));
261    ///     tx.send_data(Bytes::from_static(b"hello again!"));
262    ///     // NOTE! You must ensure that `tx` is dropped to ensure that EOF is sent
263    ///     ```
264    ///
265    #[derive(Debug)]
266    pub struct ByteStream {
267        #[pin]
268        inner: Inner,
269    }
270}
271
272impl ByteStream {
273    /// Create a new `ByteStream` from an [`SdkBody`].
274    pub fn new(body: SdkBody) -> Self {
275        Self {
276            inner: Inner::new(body),
277        }
278    }
279
280    /// Create a new `ByteStream` from a static byte slice.
281    pub fn from_static(bytes: &'static [u8]) -> Self {
282        Self {
283            inner: Inner::new(SdkBody::from(Bytes::from_static(bytes))),
284        }
285    }
286
287    /// Consume the `ByteStream`, returning the wrapped SdkBody.
288    // Backwards compatibility note: Because SdkBody has a dyn variant,
289    // we will always be able to implement this method, even if we stop using
290    // SdkBody as the internal representation
291    pub fn into_inner(self) -> SdkBody {
292        self.inner.body
293    }
294
295    /// Return the next item in the `ByteStream`.
296    ///
297    /// There is also a sibling method [`try_next`](ByteStream::try_next), which returns a `Result<Option<Bytes>, Error>`
298    /// instead of an `Option<Result<Bytes, Error>>`.
299    pub async fn next(&mut self) -> Option<Result<Bytes, Error>> {
300        Some(self.inner.next().await?.map_err(Error::streaming))
301    }
302
303    #[cfg(feature = "byte-stream-poll-next")]
304    /// Attempt to pull out the next value of this stream, returning `None` if the stream is
305    /// exhausted.
306    // This should only be used when one needs to implement a trait method like
307    // `futures_core::stream::Stream::poll_next` on a new-type wrapping a `ByteStream`.
308    // In general, use the `next` method instead.
309    pub fn poll_next(
310        self: Pin<&mut Self>,
311        cx: &mut Context<'_>,
312    ) -> Poll<Option<Result<Bytes, Error>>> {
313        self.project().inner.poll_next(cx).map_err(Error::streaming)
314    }
315
316    /// Consume and return the next item in the `ByteStream` or return an error if an error is
317    /// encountered.
318    ///
319    /// Similar to the [`next`](ByteStream::next) method, but this returns a `Result<Option<Bytes>, Error>` rather than
320    /// an `Option<Result<Bytes, Error>>`, making for easy use with the `?` operator.
321    pub async fn try_next(&mut self) -> Result<Option<Bytes>, Error> {
322        self.next().await.transpose()
323    }
324
325    /// Returns a reference to the data if it is already available in memory
326    pub fn bytes(&self) -> Option<&[u8]> {
327        let Inner { body } = &self.inner;
328        body.bytes()
329    }
330
331    /// Return the bounds on the remaining length of the `ByteStream`.
332    pub fn size_hint(&self) -> (u64, Option<u64>) {
333        self.inner.size_hint()
334    }
335
336    /// Read all the data from this `ByteStream` into memory
337    ///
338    /// If an error in the underlying stream is encountered, `ByteStreamError` is returned.
339    ///
340    /// Data is read into an `AggregatedBytes` that stores data non-contiguously as it was received
341    /// over the network. If a contiguous slice is required, use `into_bytes()`.
342    /// ```no_run
343    /// use bytes::Bytes;
344    /// use aws_smithy_types::body;
345    /// use aws_smithy_types::body::SdkBody;
346    /// use aws_smithy_types::byte_stream::{ByteStream, error::Error};
347    /// async fn get_data() {
348    ///     let stream = ByteStream::new(SdkBody::from("hello!"));
349    ///     let data: Result<Bytes, Error> = stream.collect().await.map(|data| data.into_bytes());
350    /// }
351    /// ```
352    pub async fn collect(self) -> Result<AggregatedBytes, Error> {
353        self.inner.collect().await.map_err(Error::streaming)
354    }
355
356    /// Returns a [`FsBuilder`], allowing you to build a `ByteStream` with
357    /// full control over how the file is read (eg. specifying the length of
358    /// the file or the size of the buffer used to read the file).
359    ///
360    /// ```no_run
361    /// # #[cfg(feature = "rt-tokio")]
362    /// # {
363    /// use aws_smithy_types::byte_stream::{ByteStream, Length};
364    ///
365    /// async fn bytestream_from_file() -> ByteStream {
366    ///     let bytestream = ByteStream::read_from()
367    ///         .path("docs/some-large-file.csv")
368    ///         // Specify the size of the buffer used to read the file (in bytes, default is 4096)
369    ///         .buffer_size(32_784)
370    ///         // Specify the length of the file used (skips an additional call to retrieve the size)
371    ///         .length(Length::Exact(123_456))
372    ///         .build()
373    ///         .await
374    ///         .expect("valid path");
375    ///     bytestream
376    /// }
377    /// # }
378    /// ```
379    #[cfg(feature = "rt-tokio")]
380    pub fn read_from() -> crate::byte_stream::FsBuilder {
381        crate::byte_stream::FsBuilder::new()
382    }
383
384    /// Create a ByteStream that streams data from the filesystem
385    ///
386    /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
387    /// will provide a size hint when used as an HTTP body. If the request fails, the read will
388    /// begin again by reloading the file handle.
389    ///
390    /// ## Warning
391    /// The contents of the file MUST not change during retries. The length & checksum of the file
392    /// will be cached. If the contents of the file change, the operation will almost certainly fail.
393    ///
394    /// Furthermore, a partial write MAY seek in the file and resume from the previous location.
395    ///
396    /// Note: If you want more control, such as specifying the size of the buffer used to read the file
397    /// or the length of the file, use a [`FsBuilder`] as returned from `ByteStream::read_from`.
398    ///
399    /// # Examples
400    /// ```no_run
401    /// use aws_smithy_types::byte_stream::ByteStream;
402    /// use std::path::Path;
403    ///  async fn make_bytestream() -> ByteStream {
404    ///     ByteStream::from_path("docs/rows.csv").await.expect("file should be readable")
405    /// }
406    /// ```
407    #[cfg(feature = "rt-tokio")]
408    pub async fn from_path(
409        path: impl AsRef<std::path::Path>,
410    ) -> Result<Self, crate::byte_stream::error::Error> {
411        crate::byte_stream::FsBuilder::new()
412            .path(path)
413            .build()
414            .await
415    }
416
417    #[cfg(feature = "rt-tokio")]
418    /// Convert this `ByteStream` into a struct that implements [`AsyncBufRead`](tokio::io::AsyncBufRead).
419    ///
420    /// # Example
421    ///
422    /// ```rust
423    /// use tokio::io::AsyncBufReadExt;
424    /// use aws_smithy_types::byte_stream::ByteStream;
425    ///
426    /// # async fn dox(my_bytestream: ByteStream) -> std::io::Result<()> {
427    /// let mut lines =  my_bytestream.into_async_read().lines();
428    /// while let Some(line) = lines.next_line().await? {
429    ///   // Do something line by line
430    /// }
431    /// # Ok(())
432    /// # }
433    /// ```
434    pub fn into_async_read(self) -> impl tokio::io::AsyncBufRead {
435        // The `Stream` trait is currently unstable so we can only use it in private.
436        // Here, we create a local struct just to enable the trait for `ByteStream` and pass it
437        // to `StreamReader`.
438        struct FuturesStreamCompatByteStream(ByteStream);
439        impl futures_core::stream::Stream for FuturesStreamCompatByteStream {
440            type Item = Result<Bytes, Error>;
441            fn poll_next(
442                mut self: Pin<&mut Self>,
443                cx: &mut Context<'_>,
444            ) -> Poll<Option<Self::Item>> {
445                Pin::new(&mut self.0.inner)
446                    .poll_next(cx)
447                    .map_err(Error::streaming)
448            }
449        }
450        tokio_util::io::StreamReader::new(FuturesStreamCompatByteStream(self))
451    }
452
453    /// Given a function to modify an [`SdkBody`], run it on the `SdkBody` inside this `Bytestream`.
454    /// returning a new `Bytestream`.
455    pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Send + Sync + 'static) -> ByteStream {
456        ByteStream::new(self.into_inner().map(f))
457    }
458}
459
460impl Default for ByteStream {
461    fn default() -> Self {
462        Self {
463            inner: Inner {
464                body: SdkBody::from(""),
465            },
466        }
467    }
468}
469
470impl From<SdkBody> for ByteStream {
471    fn from(inp: SdkBody) -> Self {
472        ByteStream::new(inp)
473    }
474}
475
476/// Construct a retryable ByteStream from [`bytes::Bytes`].
477impl From<Bytes> for ByteStream {
478    fn from(input: Bytes) -> Self {
479        ByteStream::new(SdkBody::from(input))
480    }
481}
482
483/// Construct a retryable ByteStream from a `Vec<u8>`.
484///
485/// This will convert the `Vec<u8>` into [`bytes::Bytes`] to enable efficient retries.
486impl From<Vec<u8>> for ByteStream {
487    fn from(input: Vec<u8>) -> Self {
488        Self::from(Bytes::from(input))
489    }
490}
491
492/// Non-contiguous Binary Data Storage
493///
494/// When data is read from the network, it is read in a sequence of chunks that are
495/// not in contiguous memory. [`AggregatedBytes`] provides a view of this data via
496/// [`impl Buf`](bytes::Buf) or it can be copied into contiguous storage with
497/// [`.into_bytes()`](crate::byte_stream::AggregatedBytes::into_bytes).
498#[derive(Debug, Clone)]
499pub struct AggregatedBytes(SegmentedBuf<Bytes>);
500
501impl AggregatedBytes {
502    /// Convert this buffer into [`Bytes`].
503    ///
504    /// # Why does this consume `self`?
505    /// Technically, [`copy_to_bytes`](bytes::Buf::copy_to_bytes) can be called without ownership of self. However, since this
506    /// mutates the underlying buffer such that no data is remaining, it is more misuse resistant to
507    /// prevent the caller from attempting to reread the buffer.
508    ///
509    /// If the caller only holds a mutable reference, they may use [`copy_to_bytes`](bytes::Buf::copy_to_bytes)
510    /// directly on `AggregatedBytes`.
511    pub fn into_bytes(mut self) -> Bytes {
512        self.0.copy_to_bytes(self.0.remaining())
513    }
514
515    /// Convert this buffer into an [`Iterator`] of underlying non-contiguous segments of [`Bytes`]
516    pub fn into_segments(self) -> impl Iterator<Item = Bytes> {
517        self.0.into_inner().into_iter()
518    }
519
520    /// Convert this buffer into a `Vec<u8>`
521    pub fn to_vec(self) -> Vec<u8> {
522        self.0.into_inner().into_iter().flatten().collect()
523    }
524}
525
526impl Buf for AggregatedBytes {
527    // Forward all methods that SegmentedBuf has custom implementations of.
528    fn remaining(&self) -> usize {
529        self.0.remaining()
530    }
531
532    fn chunk(&self) -> &[u8] {
533        self.0.chunk()
534    }
535
536    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
537        self.0.chunks_vectored(dst)
538    }
539
540    fn advance(&mut self, cnt: usize) {
541        self.0.advance(cnt)
542    }
543
544    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
545        self.0.copy_to_bytes(len)
546    }
547}
548
549pin_project! {
550    #[derive(Debug)]
551    struct Inner {
552        #[pin]
553        body: SdkBody,
554    }
555}
556
557impl Inner {
558    fn new(body: SdkBody) -> Self {
559        Self { body }
560    }
561
562    async fn next(&mut self) -> Option<Result<Bytes, crate::body::Error>> {
563        let mut me = Pin::new(self);
564        poll_fn(|cx| me.as_mut().poll_next(cx)).await
565    }
566
567    fn poll_next(
568        self: Pin<&mut Self>,
569        cx: &mut Context<'_>,
570    ) -> Poll<Option<Result<Bytes, crate::body::Error>>> {
571        self.project().body.poll_next(cx)
572    }
573
574    async fn collect(self) -> Result<AggregatedBytes, crate::body::Error> {
575        let mut output = SegmentedBuf::new();
576        let body = self.body;
577        pin_utils::pin_mut!(body);
578        while let Some(buf) = body.next().await {
579            output.push(buf?);
580        }
581        Ok(AggregatedBytes(output))
582    }
583
584    fn size_hint(&self) -> (u64, Option<u64>) {
585        self.body.bounds_on_remaining_length()
586    }
587}
588
589#[cfg(all(test, feature = "rt-tokio"))]
590mod tests {
591    use super::{ByteStream, Inner};
592    use crate::body::SdkBody;
593    use bytes::Bytes;
594    use std::io::Write;
595    use tempfile::NamedTempFile;
596
597    #[tokio::test]
598    async fn read_from_string_body() {
599        let body = SdkBody::from("a simple body");
600        assert_eq!(
601            Inner::new(body)
602                .collect()
603                .await
604                .expect("no errors")
605                .into_bytes(),
606            Bytes::from("a simple body")
607        );
608    }
609
610    #[tokio::test]
611    async fn bytestream_into_async_read() {
612        use tokio::io::AsyncBufReadExt;
613
614        let byte_stream = ByteStream::from_static(b"data 1\ndata 2\ndata 3");
615        let async_buf_read = tokio::io::BufReader::new(byte_stream.into_async_read());
616
617        let mut lines = async_buf_read.lines();
618
619        assert_eq!(lines.next_line().await.unwrap(), Some("data 1".to_owned()));
620        assert_eq!(lines.next_line().await.unwrap(), Some("data 2".to_owned()));
621        assert_eq!(lines.next_line().await.unwrap(), Some("data 3".to_owned()));
622        assert_eq!(lines.next_line().await.unwrap(), None);
623    }
624
625    #[tokio::test]
626    async fn valid_size_hint() {
627        assert_eq!(ByteStream::from_static(b"hello").size_hint().1, Some(5));
628        assert_eq!(ByteStream::from_static(b"").size_hint().1, Some(0));
629
630        let mut f = NamedTempFile::new().unwrap();
631        f.write_all(b"hello").unwrap();
632        let body = ByteStream::from_path(f.path()).await.unwrap();
633        assert_eq!(body.inner.size_hint().1, Some(5));
634
635        let mut f = NamedTempFile::new().unwrap();
636        f.write_all(b"").unwrap();
637        let body = ByteStream::from_path(f.path()).await.unwrap();
638        assert_eq!(body.inner.size_hint().1, Some(0));
639    }
640
641    #[allow(clippy::bool_assert_comparison)]
642    #[tokio::test]
643    async fn valid_eos() {
644        assert_eq!(
645            ByteStream::from_static(b"hello").inner.body.is_end_stream(),
646            false
647        );
648        let mut f = NamedTempFile::new().unwrap();
649        f.write_all(b"hello").unwrap();
650        let body = ByteStream::from_path(f.path()).await.unwrap();
651        assert_eq!(body.inner.body.content_length(), Some(5));
652        assert!(!body.inner.body.is_end_stream());
653
654        assert_eq!(
655            ByteStream::from_static(b"").inner.body.is_end_stream(),
656            true
657        );
658        let mut f = NamedTempFile::new().unwrap();
659        f.write_all(b"").unwrap();
660        let body = ByteStream::from_path(f.path()).await.unwrap();
661        assert_eq!(body.inner.body.content_length(), Some(0));
662        assert!(body.inner.body.is_end_stream());
663    }
664}