aws_smithy_types/byte_stream.rs
1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! ByteStream Abstractions
7//!
8//! When the SDK returns streaming binary data, the inner Http Body is
9//! wrapped in [`ByteStream`]. ByteStream provides misuse-resistant primitives
10//! to make it easier to handle common patterns with streaming data.
11//!
12//! # Examples
13//!
14//! ### Writing a ByteStream into a file:
15//! ```no_run
16//! use aws_smithy_types::byte_stream::ByteStream;
17//! use std::error::Error;
18//! use tokio::fs::File;
19//! use tokio::io::AsyncWriteExt;
20//! struct SynthesizeSpeechOutput {
21//! audio_stream: ByteStream,
22//! }
23//!
24//! async fn audio_to_file(
25//! output: SynthesizeSpeechOutput,
26//! ) -> Result<(), Box<dyn Error + Send + Sync>> {
27//! let mut buf = output.audio_stream.collect().await?;
28//! let mut file = File::open("audio.mp3").await?;
29//! file.write_all_buf(&mut buf).await?;
30//! file.flush().await?;
31//! Ok(())
32//! }
33//! ```
34//!
35//! ### Converting a ByteStream into Bytes
36//! ```no_run
37//! use bytes::Bytes;
38//! use aws_smithy_types::byte_stream::ByteStream;
39//! use std::error::Error;
40//! struct SynthesizeSpeechOutput {
41//! audio_stream: ByteStream,
42//! }
43//! async fn load_audio(
44//! output: SynthesizeSpeechOutput,
45//! ) -> Result<Bytes, Box<dyn Error + Send + Sync>> {
46//! Ok(output.audio_stream.collect().await?.into_bytes())
47//! }
48//! ```
49//!
50//! ### Stream a ByteStream into a file
51//! The previous example is recommended in cases where loading the entire file into memory first is desirable. For extremely large
52//! files, you may wish to stream the data directly to the file system, chunk by chunk.
53//! This is possible using the [`.next()`](crate::byte_stream::ByteStream::next) method.
54//!
55//! ```no_run
56//! use bytes::{Buf, Bytes};
57//! use aws_smithy_types::byte_stream::ByteStream;
58//! use std::error::Error;
59//! use tokio::fs::File;
60//! use tokio::io::AsyncWriteExt;
61//! use tokio_stream::StreamExt;
62//! struct SynthesizeSpeechOutput {
63//! audio_stream: ByteStream,
64//! }
65//!
66//! async fn audio_to_file(
67//! output: SynthesizeSpeechOutput,
68//! ) -> Result<(), Box<dyn Error + Send + Sync>> {
69//! let mut file = File::open("audio.mp3").await?;
70//! let mut stream = output.audio_stream;
71//! while let Some(bytes) = stream.next().await {
72//! let bytes: Bytes = bytes?;
73//! file.write_all(&bytes).await?;
74//! }
75//! file.flush().await?;
76//! Ok(())
77//! }
78//! ```
79//!
80//! ### Create a ByteStream from a file
81//!
82//! _Note: This is only available with `rt-tokio` enabled._
83//!
84//! ```no_run
85//! # #[cfg(feature = "rt-tokio")]
86//! # {
87//! use aws_smithy_types::byte_stream::ByteStream;
88//! use std::path::Path;
89//! struct GetObjectInput {
90//! body: ByteStream
91//! }
92//!
93//! async fn bytestream_from_file() -> GetObjectInput {
94//! let bytestream = ByteStream::from_path("docs/some-large-file.csv")
95//! .await
96//! .expect("valid path");
97//! GetObjectInput { body: bytestream }
98//! }
99//! # }
100//! ```
101//!
102//! If you want more control over how the file is read, such as specifying the size of the buffer used to read the file
103//! or the length of the file, use an `FsBuilder`.
104//!
105//! ```no_run
106//! # #[cfg(feature = "rt-tokio")]
107//! # {
108//! use aws_smithy_types::byte_stream::{ByteStream, Length};
109//! use std::path::Path;
110//! struct GetObjectInput {
111//! body: ByteStream
112//! }
113//!
114//! async fn bytestream_from_file() -> GetObjectInput {
115//! let bytestream = ByteStream::read_from().path("docs/some-large-file.csv")
116//! .buffer_size(32_784)
117//! .length(Length::Exact(123_456))
118//! .build()
119//! .await
120//! .expect("valid path");
121//! GetObjectInput { body: bytestream }
122//! }
123//! # }
124//! ```
125
126use crate::body::SdkBody;
127use crate::byte_stream::error::Error;
128use bytes::Buf;
129use bytes::Bytes;
130use bytes_utils::SegmentedBuf;
131use pin_project_lite::pin_project;
132use std::future::poll_fn;
133use std::io::IoSlice;
134use std::pin::Pin;
135use std::task::{Context, Poll};
136
137#[cfg(feature = "rt-tokio")]
138mod bytestream_util;
139#[cfg(feature = "rt-tokio")]
140pub use bytestream_util::Length;
141
142pub mod error;
143
144#[cfg(feature = "rt-tokio")]
145pub use self::bytestream_util::FsBuilder;
146
147/// This module is named after the `http-body` version number since we anticipate
148/// needing to provide equivalent functionality for 1.x of that crate in the future.
149/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
150#[cfg(feature = "http-body-0-4-x")]
151pub mod http_body_0_4_x;
152
153#[cfg(feature = "http-body-1-x")]
154pub mod http_body_1_x;
155
156pin_project! {
157 /// Stream of binary data
158 ///
159 /// `ByteStream` wraps a stream of binary data for ease of use.
160 ///
161 /// ## Getting data out of a `ByteStream`
162 ///
163 /// `ByteStream` provides two primary mechanisms for accessing the data:
164 /// 1. With `.collect()`:
165 ///
166 /// [`.collect()`](crate::byte_stream::ByteStream::collect) reads the complete ByteStream into memory and stores it in `AggregatedBytes`,
167 /// a non-contiguous ByteBuffer.
168 /// ```no_run
169 /// use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes};
170 /// use aws_smithy_types::body::SdkBody;
171 /// use bytes::Buf;
172 /// async fn example() {
173 /// let stream = ByteStream::new(SdkBody::from("hello! This is some data"));
174 /// // Load data from the stream into memory:
175 /// let data = stream.collect().await.expect("error reading data");
176 /// // collect returns a `bytes::Buf`:
177 /// println!("first chunk: {:?}", data.chunk());
178 /// }
179 /// ```
180 /// 2. Via [`.next()`](crate::byte_stream::ByteStream::next) or [`.try_next()`](crate::byte_stream::ByteStream::try_next):
181 ///
182 /// For use-cases where holding the entire ByteStream in memory is unnecessary, use the
183 /// `Stream` implementation:
184 /// ```no_run
185 /// # mod crc32 {
186 /// # pub struct Digest { }
187 /// # impl Digest {
188 /// # pub fn new() -> Self { Digest {} }
189 /// # pub fn write(&mut self, b: &[u8]) { }
190 /// # pub fn finish(&self) -> u64 { 6 }
191 /// # }
192 /// # }
193 /// use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes, error::Error};
194 /// use aws_smithy_types::body::SdkBody;
195 ///
196 /// async fn example() -> Result<(), Error> {
197 /// let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]);
198 /// let mut digest = crc32::Digest::new();
199 /// while let Some(bytes) = stream.try_next().await? {
200 /// digest.write(&bytes);
201 /// }
202 /// println!("digest: {}", digest.finish());
203 /// Ok(())
204 /// }
205 /// ```
206 ///
207 /// 3. Via [`.into_async_read()`](crate::byte_stream::ByteStream::into_async_read):
208 ///
209 /// _Note: The `rt-tokio` feature must be active to use `.into_async_read()`._
210 ///
211 /// It's possible to convert a `ByteStream` into a struct that implements [`tokio::io::AsyncBufRead`](tokio::io::AsyncBufRead).
212 /// ```no_run
213 /// use aws_smithy_types::byte_stream::ByteStream;
214 /// use aws_smithy_types::body::SdkBody;
215 /// use tokio::io::AsyncBufReadExt;
216 /// #[cfg(feature = "rt-tokio")]
217 /// async fn example() -> std::io::Result<()> {
218 /// let stream = ByteStream::new(SdkBody::from("hello!\nThis is some data"));
219 /// // Convert the stream to a BufReader
220 /// let buf_reader = stream.into_async_read();
221 /// let mut lines = buf_reader.lines();
222 /// assert_eq!(lines.next_line().await?, Some("hello!".to_owned()));
223 /// assert_eq!(lines.next_line().await?, Some("This is some data".to_owned()));
224 /// assert_eq!(lines.next_line().await?, None);
225 /// Ok(())
226 /// }
227 /// ```
228 ///
229 /// ## Getting data into a ByteStream
230 /// ByteStreams can be created in one of three ways:
231 /// 1. **From in-memory binary data**: ByteStreams created from in-memory data are always retryable. Data
232 /// will be converted into `Bytes` enabling a cheap clone during retries.
233 /// ```no_run
234 /// use bytes::Bytes;
235 /// use aws_smithy_types::byte_stream::ByteStream;
236 /// let stream = ByteStream::from(vec![1,2,3]);
237 /// let stream = ByteStream::from(Bytes::from_static(b"hello!"));
238 /// ```
239 ///
240 /// 2. **From a file**: ByteStreams created from a path can be retried. A new file descriptor will be opened if a retry occurs.
241 /// ```no_run
242 /// #[cfg(feature = "tokio-rt")]
243 /// # {
244 /// use aws_smithy_types::byte_stream::ByteStream;
245 /// let stream = ByteStream::from_path("big_file.csv");
246 /// # }
247 /// ```
248 ///
249 /// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly
250 /// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable
251 /// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable).
252 /// ```ignore
253 /// # use hyper_0_14 as hyper;
254 /// use aws_smithy_types::byte_stream::ByteStream;
255 /// use aws_smithy_types::body::SdkBody;
256 /// use bytes::Bytes;
257 /// let (mut tx, channel_body) = hyper::Body::channel();
258 /// // this will not be retryable because the SDK has no way to replay this stream
259 /// let stream = ByteStream::new(SdkBody::from_body_0_4(channel_body));
260 /// tx.send_data(Bytes::from_static(b"hello world!"));
261 /// tx.send_data(Bytes::from_static(b"hello again!"));
262 /// // NOTE! You must ensure that `tx` is dropped to ensure that EOF is sent
263 /// ```
264 ///
265 #[derive(Debug)]
266 pub struct ByteStream {
267 #[pin]
268 inner: Inner,
269 }
270}
271
272impl ByteStream {
273 /// Create a new `ByteStream` from an [`SdkBody`].
274 pub fn new(body: SdkBody) -> Self {
275 Self {
276 inner: Inner::new(body),
277 }
278 }
279
280 /// Create a new `ByteStream` from a static byte slice.
281 pub fn from_static(bytes: &'static [u8]) -> Self {
282 Self {
283 inner: Inner::new(SdkBody::from(Bytes::from_static(bytes))),
284 }
285 }
286
287 /// Consume the `ByteStream`, returning the wrapped SdkBody.
288 // Backwards compatibility note: Because SdkBody has a dyn variant,
289 // we will always be able to implement this method, even if we stop using
290 // SdkBody as the internal representation
291 pub fn into_inner(self) -> SdkBody {
292 self.inner.body
293 }
294
295 /// Return the next item in the `ByteStream`.
296 ///
297 /// There is also a sibling method [`try_next`](ByteStream::try_next), which returns a `Result<Option<Bytes>, Error>`
298 /// instead of an `Option<Result<Bytes, Error>>`.
299 pub async fn next(&mut self) -> Option<Result<Bytes, Error>> {
300 Some(self.inner.next().await?.map_err(Error::streaming))
301 }
302
303 #[cfg(feature = "byte-stream-poll-next")]
304 /// Attempt to pull out the next value of this stream, returning `None` if the stream is
305 /// exhausted.
306 // This should only be used when one needs to implement a trait method like
307 // `futures_core::stream::Stream::poll_next` on a new-type wrapping a `ByteStream`.
308 // In general, use the `next` method instead.
309 pub fn poll_next(
310 self: Pin<&mut Self>,
311 cx: &mut Context<'_>,
312 ) -> Poll<Option<Result<Bytes, Error>>> {
313 self.project().inner.poll_next(cx).map_err(Error::streaming)
314 }
315
316 /// Consume and return the next item in the `ByteStream` or return an error if an error is
317 /// encountered.
318 ///
319 /// Similar to the [`next`](ByteStream::next) method, but this returns a `Result<Option<Bytes>, Error>` rather than
320 /// an `Option<Result<Bytes, Error>>`, making for easy use with the `?` operator.
321 pub async fn try_next(&mut self) -> Result<Option<Bytes>, Error> {
322 self.next().await.transpose()
323 }
324
325 /// Returns a reference to the data if it is already available in memory
326 pub fn bytes(&self) -> Option<&[u8]> {
327 let Inner { body } = &self.inner;
328 body.bytes()
329 }
330
331 /// Return the bounds on the remaining length of the `ByteStream`.
332 pub fn size_hint(&self) -> (u64, Option<u64>) {
333 self.inner.size_hint()
334 }
335
336 /// Read all the data from this `ByteStream` into memory
337 ///
338 /// If an error in the underlying stream is encountered, `ByteStreamError` is returned.
339 ///
340 /// Data is read into an `AggregatedBytes` that stores data non-contiguously as it was received
341 /// over the network. If a contiguous slice is required, use `into_bytes()`.
342 /// ```no_run
343 /// use bytes::Bytes;
344 /// use aws_smithy_types::body;
345 /// use aws_smithy_types::body::SdkBody;
346 /// use aws_smithy_types::byte_stream::{ByteStream, error::Error};
347 /// async fn get_data() {
348 /// let stream = ByteStream::new(SdkBody::from("hello!"));
349 /// let data: Result<Bytes, Error> = stream.collect().await.map(|data| data.into_bytes());
350 /// }
351 /// ```
352 pub async fn collect(self) -> Result<AggregatedBytes, Error> {
353 self.inner.collect().await.map_err(Error::streaming)
354 }
355
356 /// Returns a [`FsBuilder`], allowing you to build a `ByteStream` with
357 /// full control over how the file is read (eg. specifying the length of
358 /// the file or the size of the buffer used to read the file).
359 ///
360 /// ```no_run
361 /// # #[cfg(feature = "rt-tokio")]
362 /// # {
363 /// use aws_smithy_types::byte_stream::{ByteStream, Length};
364 ///
365 /// async fn bytestream_from_file() -> ByteStream {
366 /// let bytestream = ByteStream::read_from()
367 /// .path("docs/some-large-file.csv")
368 /// // Specify the size of the buffer used to read the file (in bytes, default is 4096)
369 /// .buffer_size(32_784)
370 /// // Specify the length of the file used (skips an additional call to retrieve the size)
371 /// .length(Length::Exact(123_456))
372 /// .build()
373 /// .await
374 /// .expect("valid path");
375 /// bytestream
376 /// }
377 /// # }
378 /// ```
379 #[cfg(feature = "rt-tokio")]
380 pub fn read_from() -> crate::byte_stream::FsBuilder {
381 crate::byte_stream::FsBuilder::new()
382 }
383
384 /// Create a ByteStream that streams data from the filesystem
385 ///
386 /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
387 /// will provide a size hint when used as an HTTP body. If the request fails, the read will
388 /// begin again by reloading the file handle.
389 ///
390 /// ## Warning
391 /// The contents of the file MUST not change during retries. The length & checksum of the file
392 /// will be cached. If the contents of the file change, the operation will almost certainly fail.
393 ///
394 /// Furthermore, a partial write MAY seek in the file and resume from the previous location.
395 ///
396 /// Note: If you want more control, such as specifying the size of the buffer used to read the file
397 /// or the length of the file, use a [`FsBuilder`] as returned from `ByteStream::read_from`.
398 ///
399 /// # Examples
400 /// ```no_run
401 /// use aws_smithy_types::byte_stream::ByteStream;
402 /// use std::path::Path;
403 /// async fn make_bytestream() -> ByteStream {
404 /// ByteStream::from_path("docs/rows.csv").await.expect("file should be readable")
405 /// }
406 /// ```
407 #[cfg(feature = "rt-tokio")]
408 pub async fn from_path(
409 path: impl AsRef<std::path::Path>,
410 ) -> Result<Self, crate::byte_stream::error::Error> {
411 crate::byte_stream::FsBuilder::new()
412 .path(path)
413 .build()
414 .await
415 }
416
417 #[cfg(feature = "rt-tokio")]
418 /// Convert this `ByteStream` into a struct that implements [`AsyncBufRead`](tokio::io::AsyncBufRead).
419 ///
420 /// # Example
421 ///
422 /// ```rust
423 /// use tokio::io::AsyncBufReadExt;
424 /// use aws_smithy_types::byte_stream::ByteStream;
425 ///
426 /// # async fn dox(my_bytestream: ByteStream) -> std::io::Result<()> {
427 /// let mut lines = my_bytestream.into_async_read().lines();
428 /// while let Some(line) = lines.next_line().await? {
429 /// // Do something line by line
430 /// }
431 /// # Ok(())
432 /// # }
433 /// ```
434 pub fn into_async_read(self) -> impl tokio::io::AsyncBufRead {
435 // The `Stream` trait is currently unstable so we can only use it in private.
436 // Here, we create a local struct just to enable the trait for `ByteStream` and pass it
437 // to `StreamReader`.
438 struct FuturesStreamCompatByteStream(ByteStream);
439 impl futures_core::stream::Stream for FuturesStreamCompatByteStream {
440 type Item = Result<Bytes, Error>;
441 fn poll_next(
442 mut self: Pin<&mut Self>,
443 cx: &mut Context<'_>,
444 ) -> Poll<Option<Self::Item>> {
445 Pin::new(&mut self.0.inner)
446 .poll_next(cx)
447 .map_err(Error::streaming)
448 }
449 }
450 tokio_util::io::StreamReader::new(FuturesStreamCompatByteStream(self))
451 }
452
453 /// Given a function to modify an [`SdkBody`], run it on the `SdkBody` inside this `Bytestream`.
454 /// returning a new `Bytestream`.
455 pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Send + Sync + 'static) -> ByteStream {
456 ByteStream::new(self.into_inner().map(f))
457 }
458}
459
460impl Default for ByteStream {
461 fn default() -> Self {
462 Self {
463 inner: Inner {
464 body: SdkBody::from(""),
465 },
466 }
467 }
468}
469
470impl From<SdkBody> for ByteStream {
471 fn from(inp: SdkBody) -> Self {
472 ByteStream::new(inp)
473 }
474}
475
476/// Construct a retryable ByteStream from [`bytes::Bytes`].
477impl From<Bytes> for ByteStream {
478 fn from(input: Bytes) -> Self {
479 ByteStream::new(SdkBody::from(input))
480 }
481}
482
483/// Construct a retryable ByteStream from a `Vec<u8>`.
484///
485/// This will convert the `Vec<u8>` into [`bytes::Bytes`] to enable efficient retries.
486impl From<Vec<u8>> for ByteStream {
487 fn from(input: Vec<u8>) -> Self {
488 Self::from(Bytes::from(input))
489 }
490}
491
492/// Non-contiguous Binary Data Storage
493///
494/// When data is read from the network, it is read in a sequence of chunks that are
495/// not in contiguous memory. [`AggregatedBytes`] provides a view of this data via
496/// [`impl Buf`](bytes::Buf) or it can be copied into contiguous storage with
497/// [`.into_bytes()`](crate::byte_stream::AggregatedBytes::into_bytes).
498#[derive(Debug, Clone)]
499pub struct AggregatedBytes(SegmentedBuf<Bytes>);
500
501impl AggregatedBytes {
502 /// Convert this buffer into [`Bytes`].
503 ///
504 /// # Why does this consume `self`?
505 /// Technically, [`copy_to_bytes`](bytes::Buf::copy_to_bytes) can be called without ownership of self. However, since this
506 /// mutates the underlying buffer such that no data is remaining, it is more misuse resistant to
507 /// prevent the caller from attempting to reread the buffer.
508 ///
509 /// If the caller only holds a mutable reference, they may use [`copy_to_bytes`](bytes::Buf::copy_to_bytes)
510 /// directly on `AggregatedBytes`.
511 pub fn into_bytes(mut self) -> Bytes {
512 self.0.copy_to_bytes(self.0.remaining())
513 }
514
515 /// Convert this buffer into an [`Iterator`] of underlying non-contiguous segments of [`Bytes`]
516 pub fn into_segments(self) -> impl Iterator<Item = Bytes> {
517 self.0.into_inner().into_iter()
518 }
519
520 /// Convert this buffer into a `Vec<u8>`
521 pub fn to_vec(self) -> Vec<u8> {
522 self.0.into_inner().into_iter().flatten().collect()
523 }
524}
525
526impl Buf for AggregatedBytes {
527 // Forward all methods that SegmentedBuf has custom implementations of.
528 fn remaining(&self) -> usize {
529 self.0.remaining()
530 }
531
532 fn chunk(&self) -> &[u8] {
533 self.0.chunk()
534 }
535
536 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
537 self.0.chunks_vectored(dst)
538 }
539
540 fn advance(&mut self, cnt: usize) {
541 self.0.advance(cnt)
542 }
543
544 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
545 self.0.copy_to_bytes(len)
546 }
547}
548
549pin_project! {
550 #[derive(Debug)]
551 struct Inner {
552 #[pin]
553 body: SdkBody,
554 }
555}
556
557impl Inner {
558 fn new(body: SdkBody) -> Self {
559 Self { body }
560 }
561
562 async fn next(&mut self) -> Option<Result<Bytes, crate::body::Error>> {
563 let mut me = Pin::new(self);
564 poll_fn(|cx| me.as_mut().poll_next(cx)).await
565 }
566
567 fn poll_next(
568 self: Pin<&mut Self>,
569 cx: &mut Context<'_>,
570 ) -> Poll<Option<Result<Bytes, crate::body::Error>>> {
571 self.project().body.poll_next(cx)
572 }
573
574 async fn collect(self) -> Result<AggregatedBytes, crate::body::Error> {
575 let mut output = SegmentedBuf::new();
576 let body = self.body;
577 pin_utils::pin_mut!(body);
578 while let Some(buf) = body.next().await {
579 output.push(buf?);
580 }
581 Ok(AggregatedBytes(output))
582 }
583
584 fn size_hint(&self) -> (u64, Option<u64>) {
585 self.body.bounds_on_remaining_length()
586 }
587}
588
589#[cfg(all(test, feature = "rt-tokio"))]
590mod tests {
591 use super::{ByteStream, Inner};
592 use crate::body::SdkBody;
593 use bytes::Bytes;
594 use std::io::Write;
595 use tempfile::NamedTempFile;
596
597 #[tokio::test]
598 async fn read_from_string_body() {
599 let body = SdkBody::from("a simple body");
600 assert_eq!(
601 Inner::new(body)
602 .collect()
603 .await
604 .expect("no errors")
605 .into_bytes(),
606 Bytes::from("a simple body")
607 );
608 }
609
610 #[tokio::test]
611 async fn bytestream_into_async_read() {
612 use tokio::io::AsyncBufReadExt;
613
614 let byte_stream = ByteStream::from_static(b"data 1\ndata 2\ndata 3");
615 let async_buf_read = tokio::io::BufReader::new(byte_stream.into_async_read());
616
617 let mut lines = async_buf_read.lines();
618
619 assert_eq!(lines.next_line().await.unwrap(), Some("data 1".to_owned()));
620 assert_eq!(lines.next_line().await.unwrap(), Some("data 2".to_owned()));
621 assert_eq!(lines.next_line().await.unwrap(), Some("data 3".to_owned()));
622 assert_eq!(lines.next_line().await.unwrap(), None);
623 }
624
625 #[tokio::test]
626 async fn valid_size_hint() {
627 assert_eq!(ByteStream::from_static(b"hello").size_hint().1, Some(5));
628 assert_eq!(ByteStream::from_static(b"").size_hint().1, Some(0));
629
630 let mut f = NamedTempFile::new().unwrap();
631 f.write_all(b"hello").unwrap();
632 let body = ByteStream::from_path(f.path()).await.unwrap();
633 assert_eq!(body.inner.size_hint().1, Some(5));
634
635 let mut f = NamedTempFile::new().unwrap();
636 f.write_all(b"").unwrap();
637 let body = ByteStream::from_path(f.path()).await.unwrap();
638 assert_eq!(body.inner.size_hint().1, Some(0));
639 }
640
641 #[allow(clippy::bool_assert_comparison)]
642 #[tokio::test]
643 async fn valid_eos() {
644 assert_eq!(
645 ByteStream::from_static(b"hello").inner.body.is_end_stream(),
646 false
647 );
648 let mut f = NamedTempFile::new().unwrap();
649 f.write_all(b"hello").unwrap();
650 let body = ByteStream::from_path(f.path()).await.unwrap();
651 assert_eq!(body.inner.body.content_length(), Some(5));
652 assert!(!body.inner.body.is_end_stream());
653
654 assert_eq!(
655 ByteStream::from_static(b"").inner.body.is_end_stream(),
656 true
657 );
658 let mut f = NamedTempFile::new().unwrap();
659 f.write_all(b"").unwrap();
660 let body = ByteStream::from_path(f.path()).await.unwrap();
661 assert_eq!(body.inner.body.content_length(), Some(0));
662 assert!(body.inner.body.is_end_stream());
663 }
664}