aws_smithy_http/
futures_stream_adapter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use aws_smithy_types::body::SdkBody;
7use aws_smithy_types::byte_stream::error::Error as ByteStreamError;
8use aws_smithy_types::byte_stream::ByteStream;
9use bytes::Bytes;
10use futures_core::stream::Stream;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14/// A new-type wrapper to enable the impl of the `futures_core::stream::Stream` trait
15///
16/// [`ByteStream`] no longer implements `futures_core::stream::Stream` so we wrap it in the
17/// new-type to enable the trait when it is required.
18///
19/// This is meant to be used by codegen code, and users should not need to use it directly.
20#[derive(Debug)]
21pub struct FuturesStreamCompatByteStream(ByteStream);
22
23impl FuturesStreamCompatByteStream {
24    /// Creates a new `FuturesStreamCompatByteStream` by wrapping `stream`.
25    pub fn new(stream: ByteStream) -> Self {
26        Self(stream)
27    }
28
29    /// Returns [`SdkBody`] of the wrapped [`ByteStream`].
30    pub fn into_inner(self) -> SdkBody {
31        self.0.into_inner()
32    }
33}
34
35impl Stream for FuturesStreamCompatByteStream {
36    type Item = Result<Bytes, ByteStreamError>;
37
38    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
39        Pin::new(&mut self.0).poll_next(cx)
40    }
41}
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46    use futures_core::stream::Stream;
47
48    fn check_compatible_with_hyper_wrap_stream<S, O, E>(stream: S) -> S
49    where
50        S: Stream<Item = Result<O, E>> + Send + 'static,
51        O: Into<Bytes> + 'static,
52        E: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + 'static,
53    {
54        stream
55    }
56
57    #[test]
58    fn test_byte_stream_stream_can_be_made_compatible_with_hyper_wrap_stream() {
59        let stream = ByteStream::from_static(b"Hello world");
60        check_compatible_with_hyper_wrap_stream(FuturesStreamCompatByteStream::new(stream));
61    }
62}