aws_smithy_types/
body.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Types for representing the body of an HTTP request or response
7
8use bytes::Bytes;
9use pin_project_lite::pin_project;
10use std::error::Error as StdError;
11use std::fmt::{self, Debug, Formatter};
12use std::future::poll_fn;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16
17/// This module is named after the `http-body` version number since we anticipate
18/// needing to provide equivalent functionality for 1.x of that crate in the future.
19/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
20#[cfg(feature = "http-body-0-4-x")]
21pub mod http_body_0_4_x;
22#[cfg(feature = "http-body-1-x")]
23pub mod http_body_1_x;
24
25/// A generic, boxed error that's `Send` and `Sync`
26pub type Error = Box<dyn StdError + Send + Sync>;
27
28pin_project! {
29    /// SdkBody type
30    ///
31    /// This is the Body used for dispatching all HTTP Requests.
32    /// For handling responses, the type of the body will be controlled
33    /// by the HTTP stack.
34    ///
35    pub struct SdkBody {
36        #[pin]
37        inner: Inner,
38        // An optional function to recreate the inner body
39        //
40        // In the event of retry, this function will be called to generate a new body. See
41        // [`try_clone()`](SdkBody::try_clone)
42        rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
43        bytes_contents: Option<Bytes>
44    }
45}
46
47impl Debug for SdkBody {
48    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49        f.debug_struct("SdkBody")
50            .field("inner", &self.inner)
51            .field("retryable", &self.rebuild.is_some())
52            .finish()
53    }
54}
55
56/// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`].
57enum BoxBody {
58    // This is enabled by the **dependency**, not the feature. This allows us to construct it
59    // whenever we have the dependency and keep the APIs private
60    #[cfg(any(
61        feature = "http-body-0-4-x",
62        feature = "http-body-1-x",
63        feature = "rt-tokio"
64    ))]
65    // will be dead code with `--no-default-features --features rt-tokio`
66    HttpBody04(#[allow(dead_code)] http_body_0_4::combinators::BoxBody<Bytes, Error>),
67}
68
69pin_project! {
70    #[project = InnerProj]
71    enum Inner {
72        // An in-memory body
73        Once {
74            inner: Option<Bytes>
75        },
76        // A streaming body
77        Dyn {
78            #[pin]
79            inner: BoxBody,
80        },
81
82        /// When a streaming body is transferred out to a stream parser, the body is replaced with
83        /// `Taken`. This will return an Error when polled. Attempting to read data out of a `Taken`
84        /// Body is a bug.
85        Taken,
86    }
87}
88
89impl Debug for Inner {
90    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
91        match &self {
92            Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
93            Inner::Dyn { .. } => write!(f, "BoxBody"),
94            Inner::Taken => f.debug_tuple("Taken").finish(),
95        }
96    }
97}
98
99impl SdkBody {
100    /// Construct an explicitly retryable SDK body
101    ///
102    /// _Note: This is probably not what you want_
103    ///
104    /// All bodies constructed from in-memory data (`String`, `Vec<u8>`, `Bytes`, etc.) will be
105    /// retryable out of the box. If you want to read data from a file, you should use
106    /// [`ByteStream::from_path`](crate::byte_stream::ByteStream::from_path). This function
107    /// is only necessary when you need to enable retries for your own streaming container.
108    pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
109        let initial = f();
110        SdkBody {
111            inner: initial.inner,
112            rebuild: Some(Arc::new(move || f().inner)),
113            bytes_contents: initial.bytes_contents,
114        }
115    }
116
117    /// When an SdkBody is read, the inner data must be consumed. In order to do this, the SdkBody
118    /// is swapped with a "taken" body. This "taken" body cannot be read but aids in debugging.
119    pub fn taken() -> Self {
120        Self {
121            inner: Inner::Taken,
122            rebuild: None,
123            bytes_contents: None,
124        }
125    }
126
127    /// Create an empty SdkBody for requests and responses that don't transfer any data in the body.
128    pub fn empty() -> Self {
129        Self {
130            inner: Inner::Once { inner: None },
131            rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
132            bytes_contents: Some(Bytes::new()),
133        }
134    }
135
136    pub(crate) async fn next(&mut self) -> Option<Result<Bytes, Error>> {
137        let mut me = Pin::new(self);
138        poll_fn(|cx| me.as_mut().poll_next(cx)).await
139    }
140
141    pub(crate) fn poll_next(
142        self: Pin<&mut Self>,
143        #[allow(unused)] cx: &mut Context<'_>,
144    ) -> Poll<Option<Result<Bytes, Error>>> {
145        let this = self.project();
146        match this.inner.project() {
147            InnerProj::Once { ref mut inner } => {
148                let data = inner.take();
149                match data {
150                    Some(bytes) if bytes.is_empty() => Poll::Ready(None),
151                    Some(bytes) => Poll::Ready(Some(Ok(bytes))),
152                    None => Poll::Ready(None),
153                }
154            }
155            InnerProj::Dyn { inner: body } => match body.get_mut() {
156                #[cfg(feature = "http-body-0-4-x")]
157                BoxBody::HttpBody04(box_body) => {
158                    use http_body_0_4::Body;
159                    Pin::new(box_body).poll_data(cx)
160                }
161                #[allow(unreachable_patterns)]
162                _ => unreachable!(
163                    "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
164                ),
165            },
166            InnerProj::Taken => {
167                Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
168            }
169        }
170    }
171
172    #[cfg(any(
173        feature = "http-body-0-4-x",
174        feature = "http-body-1-x",
175        feature = "rt-tokio"
176    ))]
177    pub(crate) fn from_body_0_4_internal<T, E>(body: T) -> Self
178    where
179        T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
180        E: Into<Error> + 'static,
181    {
182        Self {
183            inner: Inner::Dyn {
184                inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
185                    body.map_err(Into::into),
186                )),
187            },
188            rebuild: None,
189            bytes_contents: None,
190        }
191    }
192
193    #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
194    pub(crate) fn poll_next_trailers(
195        self: Pin<&mut Self>,
196        cx: &mut Context<'_>,
197    ) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Error>> {
198        let this = self.project();
199        match this.inner.project() {
200            InnerProj::Once { .. } => Poll::Ready(Ok(None)),
201            InnerProj::Dyn { inner } => match inner.get_mut() {
202                BoxBody::HttpBody04(box_body) => {
203                    use http_body_0_4::Body;
204                    Pin::new(box_body).poll_trailers(cx)
205                }
206            },
207            InnerProj::Taken => Poll::Ready(Err(
208                "A `Taken` body should never be polled for trailers".into(),
209            )),
210        }
211    }
212
213    /// If possible, return a reference to this body as `&[u8]`
214    ///
215    /// If this SdkBody is NOT streaming, this will return the byte slab
216    /// If this SdkBody is streaming, this will return `None`
217    pub fn bytes(&self) -> Option<&[u8]> {
218        match &self.bytes_contents {
219            Some(b) => Some(b),
220            None => None,
221        }
222    }
223
224    /// Attempt to clone this SdkBody. This will fail if the inner data is not cloneable, such as when
225    /// it is a single-use stream that can't be recreated.
226    pub fn try_clone(&self) -> Option<Self> {
227        self.rebuild.as_ref().map(|rebuild| {
228            let next = rebuild();
229            Self {
230                inner: next,
231                rebuild: self.rebuild.clone(),
232                bytes_contents: self.bytes_contents.clone(),
233            }
234        })
235    }
236
237    /// Return `true` if this SdkBody is streaming, `false` if it is in-memory.
238    pub fn is_streaming(&self) -> bool {
239        matches!(self.inner, Inner::Dyn { .. })
240    }
241
242    /// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not
243    /// have a known length.
244    pub fn content_length(&self) -> Option<u64> {
245        match self.bounds_on_remaining_length() {
246            (lo, Some(hi)) if lo == hi => Some(lo),
247            _ => None,
248        }
249    }
250
251    #[allow(dead_code)] // used by a feature-gated `http-body`'s trait method
252    pub(crate) fn is_end_stream(&self) -> bool {
253        match &self.inner {
254            Inner::Once { inner: None } => true,
255            Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
256            Inner::Dyn { inner: box_body } => match box_body {
257                #[cfg(feature = "http-body-0-4-x")]
258                BoxBody::HttpBody04(box_body) => {
259                    use http_body_0_4::Body;
260                    box_body.is_end_stream()
261                }
262                #[allow(unreachable_patterns)]
263                _ => unreachable!(
264                    "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
265                ),
266            },
267            Inner::Taken => true,
268        }
269    }
270
271    pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option<u64>) {
272        match &self.inner {
273            Inner::Once { inner: None } => (0, Some(0)),
274            Inner::Once { inner: Some(bytes) } => {
275                let len = bytes.len() as u64;
276                (len, Some(len))
277            }
278            Inner::Dyn { inner: box_body } => match box_body {
279                #[cfg(feature = "http-body-0-4-x")]
280                BoxBody::HttpBody04(box_body) => {
281                    use http_body_0_4::Body;
282                    let hint = box_body.size_hint();
283                    (hint.lower(), hint.upper())
284                }
285                #[allow(unreachable_patterns)]
286                _ => unreachable!(
287                    "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
288                ),
289            },
290            Inner::Taken => (0, Some(0)),
291        }
292    }
293
294    /// Given a function to modify an `SdkBody`, run that function against this `SdkBody` before
295    /// returning the result.
296    pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
297        if self.rebuild.is_some() {
298            SdkBody::retryable(move || f(self.try_clone().unwrap()))
299        } else {
300            f(self)
301        }
302    }
303
304    /// Update this `SdkBody` with `map`. **This function MUST NOT alter the data of the body.**
305    ///
306    /// This function is useful for adding metadata like progress tracking to an [`SdkBody`] that
307    /// does not alter the actual byte data. If your mapper alters the contents of the body, use [`SdkBody::map`]
308    /// instead.
309    pub fn map_preserve_contents(
310        self,
311        f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static,
312    ) -> SdkBody {
313        let contents = self.bytes_contents.clone();
314        let mut out = if self.rebuild.is_some() {
315            SdkBody::retryable(move || f(self.try_clone().unwrap()))
316        } else {
317            f(self)
318        };
319        out.bytes_contents = contents;
320        out
321    }
322}
323
324impl From<&str> for SdkBody {
325    fn from(s: &str) -> Self {
326        Self::from(s.as_bytes())
327    }
328}
329
330impl From<Bytes> for SdkBody {
331    fn from(bytes: Bytes) -> Self {
332        let b = bytes.clone();
333        SdkBody {
334            inner: Inner::Once {
335                inner: Some(bytes.clone()),
336            },
337            rebuild: Some(Arc::new(move || Inner::Once {
338                inner: Some(bytes.clone()),
339            })),
340            bytes_contents: Some(b),
341        }
342    }
343}
344
345impl From<Vec<u8>> for SdkBody {
346    fn from(data: Vec<u8>) -> Self {
347        Self::from(Bytes::from(data))
348    }
349}
350
351impl From<String> for SdkBody {
352    fn from(s: String) -> Self {
353        Self::from(s.into_bytes())
354    }
355}
356
357impl From<&[u8]> for SdkBody {
358    fn from(data: &[u8]) -> Self {
359        Self::from(Bytes::copy_from_slice(data))
360    }
361}
362
363#[cfg(test)]
364mod test {
365    use crate::body::SdkBody;
366    use std::pin::Pin;
367
368    #[test]
369    fn valid_size_hint() {
370        assert_eq!(SdkBody::from("hello").content_length(), Some(5));
371        assert_eq!(SdkBody::from("").content_length(), Some(0));
372    }
373
374    #[allow(clippy::bool_assert_comparison)]
375    #[test]
376    fn valid_eos() {
377        assert_eq!(SdkBody::from("hello").is_end_stream(), false);
378        assert_eq!(SdkBody::from("").is_end_stream(), true);
379    }
380
381    #[tokio::test]
382    async fn http_body_consumes_data() {
383        let mut body = SdkBody::from("hello!");
384        let mut body = Pin::new(&mut body);
385        assert!(!body.is_end_stream());
386        let data = body.next().await;
387        assert!(data.is_some());
388        let data = body.next().await;
389        assert!(data.is_none());
390        assert!(body.is_end_stream());
391    }
392
393    #[tokio::test]
394    async fn empty_body_returns_none() {
395        // Its important to avoid sending empty chunks of data to avoid H2 data frame problems
396        let mut body = SdkBody::from("");
397        let mut body = Pin::new(&mut body);
398        let data = body.next().await;
399        assert!(data.is_none());
400    }
401
402    #[test]
403    fn sdkbody_debug_once() {
404        let body = SdkBody::from("123");
405        assert!(format!("{:?}", body).contains("Once"));
406    }
407
408    #[test]
409    fn sdk_body_is_send() {
410        fn is_send<T: Send>() {}
411        is_send::<SdkBody>()
412    }
413}