aws_smithy_runtime/client/http/body/
minimum_throughput.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! A body-wrapping type that ensures data is being streamed faster than some lower limit.
7//!
8//! If data is being streamed too slowly, this body type will emit an error next time it's polled.
9
10/// An implementation of v0.4 `http_body::Body` for `MinimumThroughputBody` and related code.
11pub mod http_body_0_4_x;
12
13/// Options for a [`MinimumThroughputBody`].
14pub mod options;
15pub use throughput::Throughput;
16mod throughput;
17
18use crate::client::http::body::minimum_throughput::throughput::ThroughputReport;
19use aws_smithy_async::rt::sleep::Sleep;
20use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
21use aws_smithy_async::time::{SharedTimeSource, TimeSource};
22use aws_smithy_runtime_api::{
23    box_error::BoxError,
24    client::{
25        http::HttpConnectorFuture, result::ConnectorError, runtime_components::RuntimeComponents,
26        stalled_stream_protection::StalledStreamProtectionConfig,
27    },
28};
29use aws_smithy_runtime_api::{client::orchestrator::HttpResponse, shared::IntoShared};
30use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
31use options::MinimumThroughputBodyOptions;
32use std::{
33    fmt,
34    sync::{Arc, Mutex},
35    task::Poll,
36};
37use std::{future::Future, pin::Pin};
38use std::{
39    task::Context,
40    time::{Duration, SystemTime},
41};
42use throughput::ThroughputLogs;
43
44/// Use [`MinimumThroughputDownloadBody`] instead.
45#[deprecated(note = "Renamed to MinimumThroughputDownloadBody since it doesn't work for uploads")]
46pub type MinimumThroughputBody<B> = MinimumThroughputDownloadBody<B>;
47
48pin_project_lite::pin_project! {
49    /// A body-wrapping type that ensures data is being streamed faster than some lower limit.
50    ///
51    /// If data is being streamed too slowly, this body type will emit an error next time it's polled.
52    pub struct MinimumThroughputDownloadBody<B> {
53        async_sleep: SharedAsyncSleep,
54        time_source: SharedTimeSource,
55        options: MinimumThroughputBodyOptions,
56        throughput_logs: ThroughputLogs,
57        resolution: Duration,
58        #[pin]
59        sleep_fut: Option<Sleep>,
60        #[pin]
61        grace_period_fut: Option<Sleep>,
62        #[pin]
63        inner: B,
64    }
65}
66
67impl<B> MinimumThroughputDownloadBody<B> {
68    /// Create a new minimum throughput body.
69    pub fn new(
70        time_source: impl TimeSource + 'static,
71        async_sleep: impl AsyncSleep + 'static,
72        body: B,
73        options: MinimumThroughputBodyOptions,
74    ) -> Self {
75        let time_source: SharedTimeSource = time_source.into_shared();
76        let now = time_source.now();
77        let throughput_logs = ThroughputLogs::new(options.check_window(), now);
78        let resolution = throughput_logs.resolution();
79        Self {
80            throughput_logs,
81            resolution,
82            async_sleep: async_sleep.into_shared(),
83            time_source,
84            inner: body,
85            sleep_fut: None,
86            grace_period_fut: None,
87            options,
88        }
89    }
90}
91
92#[derive(Debug, PartialEq)]
93enum Error {
94    ThroughputBelowMinimum {
95        expected: Throughput,
96        actual: Throughput,
97    },
98}
99
100impl fmt::Display for Error {
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102        match self {
103            Self::ThroughputBelowMinimum { expected, actual } => {
104                write!(
105                    f,
106                    "minimum throughput was specified at {expected}, but throughput of {actual} was observed",
107                )
108            }
109        }
110    }
111}
112
113impl std::error::Error for Error {}
114
115/// Used to store the upload throughput in the interceptor context.
116#[derive(Clone, Debug)]
117pub(crate) struct UploadThroughput {
118    logs: Arc<Mutex<ThroughputLogs>>,
119}
120
121impl UploadThroughput {
122    pub(crate) fn new(time_window: Duration, now: SystemTime) -> Self {
123        Self {
124            logs: Arc::new(Mutex::new(ThroughputLogs::new(time_window, now))),
125        }
126    }
127
128    pub(crate) fn resolution(&self) -> Duration {
129        self.logs.lock().unwrap().resolution()
130    }
131
132    pub(crate) fn push_pending(&self, now: SystemTime) {
133        self.logs.lock().unwrap().push_pending(now);
134    }
135    pub(crate) fn push_bytes_transferred(&self, now: SystemTime, bytes: u64) {
136        self.logs.lock().unwrap().push_bytes_transferred(now, bytes);
137    }
138
139    pub(crate) fn mark_complete(&self) -> bool {
140        self.logs.lock().unwrap().mark_complete()
141    }
142
143    pub(crate) fn report(&self, now: SystemTime) -> ThroughputReport {
144        self.logs.lock().unwrap().report(now)
145    }
146}
147
148impl Storable for UploadThroughput {
149    type Storer = StoreReplace<Self>;
150}
151
152pin_project_lite::pin_project! {
153    pub(crate) struct ThroughputReadingBody<B> {
154        time_source: SharedTimeSource,
155        throughput: UploadThroughput,
156        #[pin]
157        inner: B,
158    }
159}
160
161impl<B> ThroughputReadingBody<B> {
162    pub(crate) fn new(
163        time_source: SharedTimeSource,
164        throughput: UploadThroughput,
165        body: B,
166    ) -> Self {
167        Self {
168            time_source,
169            throughput,
170            inner: body,
171        }
172    }
173}
174
175const ZERO_THROUGHPUT: Throughput = Throughput::new_bytes_per_second(0);
176
177// Helper trait for interpretting the throughput report.
178trait UploadReport {
179    fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput);
180}
181impl UploadReport for ThroughputReport {
182    fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
183        let throughput = match self {
184            // stream has been exhausted, stop tracking violations
185            ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
186            // If the report is incomplete, then we don't have enough data yet to
187            // decide if minimum throughput was violated.
188            ThroughputReport::Incomplete => {
189                tracing::trace!(
190                    "not enough data to decide if minimum throughput has been violated"
191                );
192                return (false, ZERO_THROUGHPUT);
193            }
194            // If most of the datapoints are Poll::Pending, then the user has stalled.
195            // In this case, we don't want to say minimum throughput was violated.
196            ThroughputReport::Pending => {
197                tracing::debug!(
198                    "the user has stalled; this will not become a minimum throughput violation"
199                );
200                return (false, ZERO_THROUGHPUT);
201            }
202            // If there has been no polling, then the server has stalled. Alternatively,
203            // if we're transferring data, but it's too slow, then we also want to say
204            // that the minimum throughput has been violated.
205            ThroughputReport::NoPolling => ZERO_THROUGHPUT,
206            ThroughputReport::Transferred(tp) => tp,
207        };
208        if throughput < minimum_throughput {
209            tracing::debug!(
210                "current throughput: {throughput} is below minimum: {minimum_throughput}"
211            );
212            (true, throughput)
213        } else {
214            (false, throughput)
215        }
216    }
217}
218
219pin_project_lite::pin_project! {
220    /// Future that pairs with [`UploadThroughput`] to add a minimum throughput
221    /// requirement to a request upload stream.
222    pub(crate) struct UploadThroughputCheckFuture {
223        #[pin]
224        response: HttpConnectorFuture,
225        #[pin]
226        check_interval: Option<Sleep>,
227        #[pin]
228        grace_period: Option<Sleep>,
229
230        time_source: SharedTimeSource,
231        sleep_impl: SharedAsyncSleep,
232        upload_throughput: UploadThroughput,
233        resolution: Duration,
234        options: MinimumThroughputBodyOptions,
235
236        failing_throughput: Option<Throughput>,
237    }
238}
239
240impl UploadThroughputCheckFuture {
241    fn new(
242        response: HttpConnectorFuture,
243        time_source: SharedTimeSource,
244        sleep_impl: SharedAsyncSleep,
245        upload_throughput: UploadThroughput,
246        options: MinimumThroughputBodyOptions,
247    ) -> Self {
248        let resolution = upload_throughput.resolution();
249        Self {
250            response,
251            check_interval: Some(sleep_impl.sleep(resolution)),
252            grace_period: None,
253            time_source,
254            sleep_impl,
255            upload_throughput,
256            resolution,
257            options,
258            failing_throughput: None,
259        }
260    }
261}
262
263impl Future for UploadThroughputCheckFuture {
264    type Output = Result<HttpResponse, ConnectorError>;
265
266    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
267        let mut this = self.project();
268
269        if let Poll::Ready(output) = this.response.poll(cx) {
270            return Poll::Ready(output);
271        } else {
272            let mut below_minimum_throughput = false;
273            let check_interval_expired = this
274                .check_interval
275                .as_mut()
276                .as_pin_mut()
277                .expect("always set")
278                .poll(cx)
279                .is_ready();
280            if check_interval_expired {
281                // Set up the next check interval
282                *this.check_interval = Some(this.sleep_impl.sleep(*this.resolution));
283
284                // Wake so that the check interval future gets polled
285                // next time this poll method is called. If it never gets polled,
286                // then this task won't be woken to check again.
287                cx.waker().wake_by_ref();
288            }
289
290            let should_check = check_interval_expired || this.grace_period.is_some();
291            if should_check {
292                let now = this.time_source.now();
293                let report = this.upload_throughput.report(now);
294                let (violated, current_throughput) =
295                    report.minimum_throughput_violated(this.options.minimum_throughput());
296                below_minimum_throughput = violated;
297                if below_minimum_throughput && !this.failing_throughput.is_some() {
298                    *this.failing_throughput = Some(current_throughput);
299                } else if !below_minimum_throughput {
300                    *this.failing_throughput = None;
301                }
302            }
303
304            // If we kicked off a grace period and are now satisfied, clear out the grace period
305            if !below_minimum_throughput && this.grace_period.is_some() {
306                tracing::debug!("upload minimum throughput recovered during grace period");
307                *this.grace_period = None;
308            }
309            if below_minimum_throughput {
310                // Start a grace period if below minimum throughput
311                if this.grace_period.is_none() {
312                    tracing::debug!(
313                        grace_period=?this.options.grace_period(),
314                        "upload minimum throughput below configured minimum; starting grace period"
315                    );
316                    *this.grace_period = Some(this.sleep_impl.sleep(this.options.grace_period()));
317                }
318                // Check the grace period if one is already set and we're not satisfied
319                if let Some(grace_period) = this.grace_period.as_pin_mut() {
320                    if grace_period.poll(cx).is_ready() {
321                        tracing::debug!("grace period ended; timing out request");
322                        return Poll::Ready(Err(ConnectorError::timeout(
323                            Error::ThroughputBelowMinimum {
324                                expected: this.options.minimum_throughput(),
325                                actual: this
326                                    .failing_throughput
327                                    .expect("always set if there's a grace period"),
328                            }
329                            .into(),
330                        )));
331                    }
332                }
333            }
334        }
335        Poll::Pending
336    }
337}
338
339pin_project_lite::pin_project! {
340    #[project = EnumProj]
341    pub(crate) enum MaybeUploadThroughputCheckFuture {
342        Direct { #[pin] future: HttpConnectorFuture },
343        Checked { #[pin] future: UploadThroughputCheckFuture },
344    }
345}
346
347impl MaybeUploadThroughputCheckFuture {
348    pub(crate) fn new(
349        cfg: &mut ConfigBag,
350        components: &RuntimeComponents,
351        connector_future: HttpConnectorFuture,
352    ) -> Self {
353        if let Some(sspcfg) = cfg.load::<StalledStreamProtectionConfig>().cloned() {
354            if sspcfg.is_enabled() {
355                let options = MinimumThroughputBodyOptions::from(sspcfg);
356                return Self::new_inner(
357                    connector_future,
358                    components.time_source(),
359                    components.sleep_impl(),
360                    cfg.interceptor_state().load::<UploadThroughput>().cloned(),
361                    Some(options),
362                );
363            }
364        }
365        tracing::debug!("no minimum upload throughput checks");
366        Self::new_inner(connector_future, None, None, None, None)
367    }
368
369    fn new_inner(
370        response: HttpConnectorFuture,
371        time_source: Option<SharedTimeSource>,
372        sleep_impl: Option<SharedAsyncSleep>,
373        upload_throughput: Option<UploadThroughput>,
374        options: Option<MinimumThroughputBodyOptions>,
375    ) -> Self {
376        match (time_source, sleep_impl, upload_throughput, options) {
377            (Some(time_source), Some(sleep_impl), Some(upload_throughput), Some(options)) => {
378                tracing::debug!(options=?options, "applying minimum upload throughput check future");
379                Self::Checked {
380                    future: UploadThroughputCheckFuture::new(
381                        response,
382                        time_source,
383                        sleep_impl,
384                        upload_throughput,
385                        options,
386                    ),
387                }
388            }
389            _ => Self::Direct { future: response },
390        }
391    }
392}
393
394impl Future for MaybeUploadThroughputCheckFuture {
395    type Output = Result<HttpResponse, ConnectorError>;
396
397    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398        match self.project() {
399            EnumProj::Direct { future } => future.poll(cx),
400            EnumProj::Checked { future } => future.poll(cx),
401        }
402    }
403}