1use bytes::Bytes;
9use pin_project_lite::pin_project;
10use std::error::Error as StdError;
11use std::fmt::{self, Debug, Formatter};
12use std::future::poll_fn;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16
17#[cfg(feature = "http-body-0-4-x")]
21pub mod http_body_0_4_x;
22#[cfg(feature = "http-body-1-x")]
23pub mod http_body_1_x;
24
25pub type Error = Box<dyn StdError + Send + Sync>;
27
28pin_project! {
29 pub struct SdkBody {
36 #[pin]
37 inner: Inner,
38 rebuild: Option<Arc<dyn (Fn() -> Inner) + Send + Sync>>,
43 bytes_contents: Option<Bytes>
44 }
45}
46
47impl Debug for SdkBody {
48 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49 f.debug_struct("SdkBody")
50 .field("inner", &self.inner)
51 .field("retryable", &self.rebuild.is_some())
52 .finish()
53 }
54}
55
56enum BoxBody {
58 #[cfg(any(
61 feature = "http-body-0-4-x",
62 feature = "http-body-1-x",
63 feature = "rt-tokio"
64 ))]
65 HttpBody04(#[allow(dead_code)] http_body_0_4::combinators::BoxBody<Bytes, Error>),
67}
68
69pin_project! {
70 #[project = InnerProj]
71 enum Inner {
72 Once {
74 inner: Option<Bytes>
75 },
76 Dyn {
78 #[pin]
79 inner: BoxBody,
80 },
81
82 Taken,
86 }
87}
88
89impl Debug for Inner {
90 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
91 match &self {
92 Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
93 Inner::Dyn { .. } => write!(f, "BoxBody"),
94 Inner::Taken => f.debug_tuple("Taken").finish(),
95 }
96 }
97}
98
99impl SdkBody {
100 pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
109 let initial = f();
110 SdkBody {
111 inner: initial.inner,
112 rebuild: Some(Arc::new(move || f().inner)),
113 bytes_contents: initial.bytes_contents,
114 }
115 }
116
117 pub fn taken() -> Self {
120 Self {
121 inner: Inner::Taken,
122 rebuild: None,
123 bytes_contents: None,
124 }
125 }
126
127 pub fn empty() -> Self {
129 Self {
130 inner: Inner::Once { inner: None },
131 rebuild: Some(Arc::new(|| Inner::Once { inner: None })),
132 bytes_contents: Some(Bytes::new()),
133 }
134 }
135
136 pub(crate) async fn next(&mut self) -> Option<Result<Bytes, Error>> {
137 let mut me = Pin::new(self);
138 poll_fn(|cx| me.as_mut().poll_next(cx)).await
139 }
140
141 pub(crate) fn poll_next(
142 self: Pin<&mut Self>,
143 #[allow(unused)] cx: &mut Context<'_>,
144 ) -> Poll<Option<Result<Bytes, Error>>> {
145 let this = self.project();
146 match this.inner.project() {
147 InnerProj::Once { ref mut inner } => {
148 let data = inner.take();
149 match data {
150 Some(bytes) if bytes.is_empty() => Poll::Ready(None),
151 Some(bytes) => Poll::Ready(Some(Ok(bytes))),
152 None => Poll::Ready(None),
153 }
154 }
155 InnerProj::Dyn { inner: body } => match body.get_mut() {
156 #[cfg(feature = "http-body-0-4-x")]
157 BoxBody::HttpBody04(box_body) => {
158 use http_body_0_4::Body;
159 Pin::new(box_body).poll_data(cx)
160 }
161 #[allow(unreachable_patterns)]
162 _ => unreachable!(
163 "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
164 ),
165 },
166 InnerProj::Taken => {
167 Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
168 }
169 }
170 }
171
172 #[cfg(any(
173 feature = "http-body-0-4-x",
174 feature = "http-body-1-x",
175 feature = "rt-tokio"
176 ))]
177 pub(crate) fn from_body_0_4_internal<T, E>(body: T) -> Self
178 where
179 T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
180 E: Into<Error> + 'static,
181 {
182 Self {
183 inner: Inner::Dyn {
184 inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
185 body.map_err(Into::into),
186 )),
187 },
188 rebuild: None,
189 bytes_contents: None,
190 }
191 }
192
193 #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x",))]
194 pub(crate) fn poll_next_trailers(
195 self: Pin<&mut Self>,
196 cx: &mut Context<'_>,
197 ) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Error>> {
198 let this = self.project();
199 match this.inner.project() {
200 InnerProj::Once { .. } => Poll::Ready(Ok(None)),
201 InnerProj::Dyn { inner } => match inner.get_mut() {
202 BoxBody::HttpBody04(box_body) => {
203 use http_body_0_4::Body;
204 Pin::new(box_body).poll_trailers(cx)
205 }
206 },
207 InnerProj::Taken => Poll::Ready(Err(
208 "A `Taken` body should never be polled for trailers".into(),
209 )),
210 }
211 }
212
213 pub fn bytes(&self) -> Option<&[u8]> {
218 match &self.bytes_contents {
219 Some(b) => Some(b),
220 None => None,
221 }
222 }
223
224 pub fn try_clone(&self) -> Option<Self> {
227 self.rebuild.as_ref().map(|rebuild| {
228 let next = rebuild();
229 Self {
230 inner: next,
231 rebuild: self.rebuild.clone(),
232 bytes_contents: self.bytes_contents.clone(),
233 }
234 })
235 }
236
237 pub fn is_streaming(&self) -> bool {
239 matches!(self.inner, Inner::Dyn { .. })
240 }
241
242 pub fn content_length(&self) -> Option<u64> {
245 match self.bounds_on_remaining_length() {
246 (lo, Some(hi)) if lo == hi => Some(lo),
247 _ => None,
248 }
249 }
250
251 #[allow(dead_code)] pub(crate) fn is_end_stream(&self) -> bool {
253 match &self.inner {
254 Inner::Once { inner: None } => true,
255 Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
256 Inner::Dyn { inner: box_body } => match box_body {
257 #[cfg(feature = "http-body-0-4-x")]
258 BoxBody::HttpBody04(box_body) => {
259 use http_body_0_4::Body;
260 box_body.is_end_stream()
261 }
262 #[allow(unreachable_patterns)]
263 _ => unreachable!(
264 "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
265 ),
266 },
267 Inner::Taken => true,
268 }
269 }
270
271 pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option<u64>) {
272 match &self.inner {
273 Inner::Once { inner: None } => (0, Some(0)),
274 Inner::Once { inner: Some(bytes) } => {
275 let len = bytes.len() as u64;
276 (len, Some(len))
277 }
278 Inner::Dyn { inner: box_body } => match box_body {
279 #[cfg(feature = "http-body-0-4-x")]
280 BoxBody::HttpBody04(box_body) => {
281 use http_body_0_4::Body;
282 let hint = box_body.size_hint();
283 (hint.lower(), hint.upper())
284 }
285 #[allow(unreachable_patterns)]
286 _ => unreachable!(
287 "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
288 ),
289 },
290 Inner::Taken => (0, Some(0)),
291 }
292 }
293
294 pub fn map(self, f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static) -> SdkBody {
297 if self.rebuild.is_some() {
298 SdkBody::retryable(move || f(self.try_clone().unwrap()))
299 } else {
300 f(self)
301 }
302 }
303
304 pub fn map_preserve_contents(
310 self,
311 f: impl Fn(SdkBody) -> SdkBody + Sync + Send + 'static,
312 ) -> SdkBody {
313 let contents = self.bytes_contents.clone();
314 let mut out = if self.rebuild.is_some() {
315 SdkBody::retryable(move || f(self.try_clone().unwrap()))
316 } else {
317 f(self)
318 };
319 out.bytes_contents = contents;
320 out
321 }
322}
323
324impl From<&str> for SdkBody {
325 fn from(s: &str) -> Self {
326 Self::from(s.as_bytes())
327 }
328}
329
330impl From<Bytes> for SdkBody {
331 fn from(bytes: Bytes) -> Self {
332 let b = bytes.clone();
333 SdkBody {
334 inner: Inner::Once {
335 inner: Some(bytes.clone()),
336 },
337 rebuild: Some(Arc::new(move || Inner::Once {
338 inner: Some(bytes.clone()),
339 })),
340 bytes_contents: Some(b),
341 }
342 }
343}
344
345impl From<Vec<u8>> for SdkBody {
346 fn from(data: Vec<u8>) -> Self {
347 Self::from(Bytes::from(data))
348 }
349}
350
351impl From<String> for SdkBody {
352 fn from(s: String) -> Self {
353 Self::from(s.into_bytes())
354 }
355}
356
357impl From<&[u8]> for SdkBody {
358 fn from(data: &[u8]) -> Self {
359 Self::from(Bytes::copy_from_slice(data))
360 }
361}
362
363#[cfg(test)]
364mod test {
365 use crate::body::SdkBody;
366 use std::pin::Pin;
367
368 #[test]
369 fn valid_size_hint() {
370 assert_eq!(SdkBody::from("hello").content_length(), Some(5));
371 assert_eq!(SdkBody::from("").content_length(), Some(0));
372 }
373
374 #[allow(clippy::bool_assert_comparison)]
375 #[test]
376 fn valid_eos() {
377 assert_eq!(SdkBody::from("hello").is_end_stream(), false);
378 assert_eq!(SdkBody::from("").is_end_stream(), true);
379 }
380
381 #[tokio::test]
382 async fn http_body_consumes_data() {
383 let mut body = SdkBody::from("hello!");
384 let mut body = Pin::new(&mut body);
385 assert!(!body.is_end_stream());
386 let data = body.next().await;
387 assert!(data.is_some());
388 let data = body.next().await;
389 assert!(data.is_none());
390 assert!(body.is_end_stream());
391 }
392
393 #[tokio::test]
394 async fn empty_body_returns_none() {
395 let mut body = SdkBody::from("");
397 let mut body = Pin::new(&mut body);
398 let data = body.next().await;
399 assert!(data.is_none());
400 }
401
402 #[test]
403 fn sdkbody_debug_once() {
404 let body = SdkBody::from("123");
405 assert!(format!("{:?}", body).contains("Once"));
406 }
407
408 #[test]
409 fn sdk_body_is_send() {
410 fn is_send<T: Send>() {}
411 is_send::<SdkBody>()
412 }
413}