aws_smithy_runtime/client/http/body/
minimum_throughput.rs
1pub mod http_body_0_4_x;
12
13pub mod options;
15pub use throughput::Throughput;
16mod throughput;
17
18use crate::client::http::body::minimum_throughput::throughput::ThroughputReport;
19use aws_smithy_async::rt::sleep::Sleep;
20use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
21use aws_smithy_async::time::{SharedTimeSource, TimeSource};
22use aws_smithy_runtime_api::{
23 box_error::BoxError,
24 client::{
25 http::HttpConnectorFuture, result::ConnectorError, runtime_components::RuntimeComponents,
26 stalled_stream_protection::StalledStreamProtectionConfig,
27 },
28};
29use aws_smithy_runtime_api::{client::orchestrator::HttpResponse, shared::IntoShared};
30use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
31use options::MinimumThroughputBodyOptions;
32use std::{
33 fmt,
34 sync::{Arc, Mutex},
35 task::Poll,
36};
37use std::{future::Future, pin::Pin};
38use std::{
39 task::Context,
40 time::{Duration, SystemTime},
41};
42use throughput::ThroughputLogs;
43
44#[deprecated(note = "Renamed to MinimumThroughputDownloadBody since it doesn't work for uploads")]
46pub type MinimumThroughputBody<B> = MinimumThroughputDownloadBody<B>;
47
48pin_project_lite::pin_project! {
49 pub struct MinimumThroughputDownloadBody<B> {
53 async_sleep: SharedAsyncSleep,
54 time_source: SharedTimeSource,
55 options: MinimumThroughputBodyOptions,
56 throughput_logs: ThroughputLogs,
57 resolution: Duration,
58 #[pin]
59 sleep_fut: Option<Sleep>,
60 #[pin]
61 grace_period_fut: Option<Sleep>,
62 #[pin]
63 inner: B,
64 }
65}
66
67impl<B> MinimumThroughputDownloadBody<B> {
68 pub fn new(
70 time_source: impl TimeSource + 'static,
71 async_sleep: impl AsyncSleep + 'static,
72 body: B,
73 options: MinimumThroughputBodyOptions,
74 ) -> Self {
75 let time_source: SharedTimeSource = time_source.into_shared();
76 let now = time_source.now();
77 let throughput_logs = ThroughputLogs::new(options.check_window(), now);
78 let resolution = throughput_logs.resolution();
79 Self {
80 throughput_logs,
81 resolution,
82 async_sleep: async_sleep.into_shared(),
83 time_source,
84 inner: body,
85 sleep_fut: None,
86 grace_period_fut: None,
87 options,
88 }
89 }
90}
91
92#[derive(Debug, PartialEq)]
93enum Error {
94 ThroughputBelowMinimum {
95 expected: Throughput,
96 actual: Throughput,
97 },
98}
99
100impl fmt::Display for Error {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 match self {
103 Self::ThroughputBelowMinimum { expected, actual } => {
104 write!(
105 f,
106 "minimum throughput was specified at {expected}, but throughput of {actual} was observed",
107 )
108 }
109 }
110 }
111}
112
113impl std::error::Error for Error {}
114
115#[derive(Clone, Debug)]
117pub(crate) struct UploadThroughput {
118 logs: Arc<Mutex<ThroughputLogs>>,
119}
120
121impl UploadThroughput {
122 pub(crate) fn new(time_window: Duration, now: SystemTime) -> Self {
123 Self {
124 logs: Arc::new(Mutex::new(ThroughputLogs::new(time_window, now))),
125 }
126 }
127
128 pub(crate) fn resolution(&self) -> Duration {
129 self.logs.lock().unwrap().resolution()
130 }
131
132 pub(crate) fn push_pending(&self, now: SystemTime) {
133 self.logs.lock().unwrap().push_pending(now);
134 }
135 pub(crate) fn push_bytes_transferred(&self, now: SystemTime, bytes: u64) {
136 self.logs.lock().unwrap().push_bytes_transferred(now, bytes);
137 }
138
139 pub(crate) fn mark_complete(&self) -> bool {
140 self.logs.lock().unwrap().mark_complete()
141 }
142
143 pub(crate) fn report(&self, now: SystemTime) -> ThroughputReport {
144 self.logs.lock().unwrap().report(now)
145 }
146}
147
148impl Storable for UploadThroughput {
149 type Storer = StoreReplace<Self>;
150}
151
152pin_project_lite::pin_project! {
153 pub(crate) struct ThroughputReadingBody<B> {
154 time_source: SharedTimeSource,
155 throughput: UploadThroughput,
156 #[pin]
157 inner: B,
158 }
159}
160
161impl<B> ThroughputReadingBody<B> {
162 pub(crate) fn new(
163 time_source: SharedTimeSource,
164 throughput: UploadThroughput,
165 body: B,
166 ) -> Self {
167 Self {
168 time_source,
169 throughput,
170 inner: body,
171 }
172 }
173}
174
175const ZERO_THROUGHPUT: Throughput = Throughput::new_bytes_per_second(0);
176
177trait UploadReport {
179 fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput);
180}
181impl UploadReport for ThroughputReport {
182 fn minimum_throughput_violated(self, minimum_throughput: Throughput) -> (bool, Throughput) {
183 let throughput = match self {
184 ThroughputReport::Complete => return (false, ZERO_THROUGHPUT),
186 ThroughputReport::Incomplete => {
189 tracing::trace!(
190 "not enough data to decide if minimum throughput has been violated"
191 );
192 return (false, ZERO_THROUGHPUT);
193 }
194 ThroughputReport::Pending => {
197 tracing::debug!(
198 "the user has stalled; this will not become a minimum throughput violation"
199 );
200 return (false, ZERO_THROUGHPUT);
201 }
202 ThroughputReport::NoPolling => ZERO_THROUGHPUT,
206 ThroughputReport::Transferred(tp) => tp,
207 };
208 if throughput < minimum_throughput {
209 tracing::debug!(
210 "current throughput: {throughput} is below minimum: {minimum_throughput}"
211 );
212 (true, throughput)
213 } else {
214 (false, throughput)
215 }
216 }
217}
218
219pin_project_lite::pin_project! {
220 pub(crate) struct UploadThroughputCheckFuture {
223 #[pin]
224 response: HttpConnectorFuture,
225 #[pin]
226 check_interval: Option<Sleep>,
227 #[pin]
228 grace_period: Option<Sleep>,
229
230 time_source: SharedTimeSource,
231 sleep_impl: SharedAsyncSleep,
232 upload_throughput: UploadThroughput,
233 resolution: Duration,
234 options: MinimumThroughputBodyOptions,
235
236 failing_throughput: Option<Throughput>,
237 }
238}
239
240impl UploadThroughputCheckFuture {
241 fn new(
242 response: HttpConnectorFuture,
243 time_source: SharedTimeSource,
244 sleep_impl: SharedAsyncSleep,
245 upload_throughput: UploadThroughput,
246 options: MinimumThroughputBodyOptions,
247 ) -> Self {
248 let resolution = upload_throughput.resolution();
249 Self {
250 response,
251 check_interval: Some(sleep_impl.sleep(resolution)),
252 grace_period: None,
253 time_source,
254 sleep_impl,
255 upload_throughput,
256 resolution,
257 options,
258 failing_throughput: None,
259 }
260 }
261}
262
263impl Future for UploadThroughputCheckFuture {
264 type Output = Result<HttpResponse, ConnectorError>;
265
266 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
267 let mut this = self.project();
268
269 if let Poll::Ready(output) = this.response.poll(cx) {
270 return Poll::Ready(output);
271 } else {
272 let mut below_minimum_throughput = false;
273 let check_interval_expired = this
274 .check_interval
275 .as_mut()
276 .as_pin_mut()
277 .expect("always set")
278 .poll(cx)
279 .is_ready();
280 if check_interval_expired {
281 *this.check_interval = Some(this.sleep_impl.sleep(*this.resolution));
283
284 cx.waker().wake_by_ref();
288 }
289
290 let should_check = check_interval_expired || this.grace_period.is_some();
291 if should_check {
292 let now = this.time_source.now();
293 let report = this.upload_throughput.report(now);
294 let (violated, current_throughput) =
295 report.minimum_throughput_violated(this.options.minimum_throughput());
296 below_minimum_throughput = violated;
297 if below_minimum_throughput && !this.failing_throughput.is_some() {
298 *this.failing_throughput = Some(current_throughput);
299 } else if !below_minimum_throughput {
300 *this.failing_throughput = None;
301 }
302 }
303
304 if !below_minimum_throughput && this.grace_period.is_some() {
306 tracing::debug!("upload minimum throughput recovered during grace period");
307 *this.grace_period = None;
308 }
309 if below_minimum_throughput {
310 if this.grace_period.is_none() {
312 tracing::debug!(
313 grace_period=?this.options.grace_period(),
314 "upload minimum throughput below configured minimum; starting grace period"
315 );
316 *this.grace_period = Some(this.sleep_impl.sleep(this.options.grace_period()));
317 }
318 if let Some(grace_period) = this.grace_period.as_pin_mut() {
320 if grace_period.poll(cx).is_ready() {
321 tracing::debug!("grace period ended; timing out request");
322 return Poll::Ready(Err(ConnectorError::timeout(
323 Error::ThroughputBelowMinimum {
324 expected: this.options.minimum_throughput(),
325 actual: this
326 .failing_throughput
327 .expect("always set if there's a grace period"),
328 }
329 .into(),
330 )));
331 }
332 }
333 }
334 }
335 Poll::Pending
336 }
337}
338
339pin_project_lite::pin_project! {
340 #[project = EnumProj]
341 pub(crate) enum MaybeUploadThroughputCheckFuture {
342 Direct { #[pin] future: HttpConnectorFuture },
343 Checked { #[pin] future: UploadThroughputCheckFuture },
344 }
345}
346
347impl MaybeUploadThroughputCheckFuture {
348 pub(crate) fn new(
349 cfg: &mut ConfigBag,
350 components: &RuntimeComponents,
351 connector_future: HttpConnectorFuture,
352 ) -> Self {
353 if let Some(sspcfg) = cfg.load::<StalledStreamProtectionConfig>().cloned() {
354 if sspcfg.is_enabled() {
355 let options = MinimumThroughputBodyOptions::from(sspcfg);
356 return Self::new_inner(
357 connector_future,
358 components.time_source(),
359 components.sleep_impl(),
360 cfg.interceptor_state().load::<UploadThroughput>().cloned(),
361 Some(options),
362 );
363 }
364 }
365 tracing::debug!("no minimum upload throughput checks");
366 Self::new_inner(connector_future, None, None, None, None)
367 }
368
369 fn new_inner(
370 response: HttpConnectorFuture,
371 time_source: Option<SharedTimeSource>,
372 sleep_impl: Option<SharedAsyncSleep>,
373 upload_throughput: Option<UploadThroughput>,
374 options: Option<MinimumThroughputBodyOptions>,
375 ) -> Self {
376 match (time_source, sleep_impl, upload_throughput, options) {
377 (Some(time_source), Some(sleep_impl), Some(upload_throughput), Some(options)) => {
378 tracing::debug!(options=?options, "applying minimum upload throughput check future");
379 Self::Checked {
380 future: UploadThroughputCheckFuture::new(
381 response,
382 time_source,
383 sleep_impl,
384 upload_throughput,
385 options,
386 ),
387 }
388 }
389 _ => Self::Direct { future: response },
390 }
391 }
392}
393
394impl Future for MaybeUploadThroughputCheckFuture {
395 type Output = Result<HttpResponse, ConnectorError>;
396
397 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398 match self.project() {
399 EnumProj::Direct { future } => future.poll(cx),
400 EnumProj::Checked { future } => future.poll(cx),
401 }
402 }
403}