aws_smithy_runtime/client/http/body/minimum_throughput/
throughput.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use std::fmt;
7use std::time::{Duration, SystemTime};
8
9/// Throughput representation for use when configuring [`super::MinimumThroughputBody`]
10#[derive(Debug, Clone, Copy)]
11#[cfg_attr(test, derive(Eq))]
12pub struct Throughput {
13    pub(super) bytes_read: u64,
14    pub(super) per_time_elapsed: Duration,
15}
16
17impl Throughput {
18    /// Create a new throughput with the given bytes read and time elapsed.
19    pub fn new(bytes_read: u64, per_time_elapsed: Duration) -> Self {
20        debug_assert!(
21            !per_time_elapsed.is_zero(),
22            "cannot create a throughput if per_time_elapsed == 0"
23        );
24
25        Self {
26            bytes_read,
27            per_time_elapsed,
28        }
29    }
30
31    /// Create a new throughput in bytes per second.
32    pub const fn new_bytes_per_second(bytes: u64) -> Self {
33        Self {
34            bytes_read: bytes,
35            per_time_elapsed: Duration::from_secs(1),
36        }
37    }
38
39    /// Create a new throughput in kilobytes per second.
40    pub const fn new_kilobytes_per_second(kilobytes: u64) -> Self {
41        Self {
42            bytes_read: kilobytes * 1000,
43            per_time_elapsed: Duration::from_secs(1),
44        }
45    }
46
47    /// Create a new throughput in megabytes per second.
48    pub const fn new_megabytes_per_second(megabytes: u64) -> Self {
49        Self {
50            bytes_read: megabytes * 1000 * 1000,
51            per_time_elapsed: Duration::from_secs(1),
52        }
53    }
54
55    pub(super) fn bytes_per_second(&self) -> f64 {
56        let per_time_elapsed_secs = self.per_time_elapsed.as_secs_f64();
57        if per_time_elapsed_secs == 0.0 {
58            return 0.0; // Avoid dividing by zero.
59        };
60
61        self.bytes_read as f64 / per_time_elapsed_secs
62    }
63}
64
65impl PartialEq for Throughput {
66    fn eq(&self, other: &Self) -> bool {
67        self.bytes_per_second() == other.bytes_per_second()
68    }
69}
70
71impl PartialOrd for Throughput {
72    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73        self.bytes_per_second()
74            .partial_cmp(&other.bytes_per_second())
75    }
76}
77
78impl fmt::Display for Throughput {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        // The default float formatting behavior will ensure the a number like 2.000 is rendered as 2
81        // while a number like 0.9982107441748642 will be rendered as 0.9982107441748642. This
82        // multiplication and division will truncate a float to have a precision of no greater than 3.
83        // For example, 0.9982107441748642 would become 0.999. This will fail for very large floats
84        // but should suffice for the numbers we're dealing with.
85        let pretty_bytes_per_second = (self.bytes_per_second() * 1000.0).round() / 1000.0;
86
87        write!(f, "{pretty_bytes_per_second} B/s")
88    }
89}
90
91impl From<(u64, Duration)> for Throughput {
92    fn from(value: (u64, Duration)) -> Self {
93        Self {
94            bytes_read: value.0,
95            per_time_elapsed: value.1,
96        }
97    }
98}
99
100/// Overall label for a given bin.
101#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
102enum BinLabel {
103    // IMPORTANT: The order of these enums matters since it represents their priority:
104    // TransferredBytes > Pending > NoPolling > Empty
105    //
106    /// There is no data in this bin.
107    Empty,
108
109    /// No polling took place during this bin.
110    NoPolling,
111
112    /// The user/remote was not providing/consuming data fast enough during this bin.
113    Pending,
114
115    /// This many bytes were transferred during this bin.
116    TransferredBytes,
117}
118
119/// Represents a bin (or a cell) in a linear grid that represents a small chunk of time.
120#[derive(Copy, Clone, Debug)]
121struct Bin {
122    label: BinLabel,
123    bytes: u64,
124}
125
126impl Bin {
127    const fn new(label: BinLabel, bytes: u64) -> Self {
128        Self { label, bytes }
129    }
130    const fn empty() -> Self {
131        Self::new(BinLabel::Empty, 0)
132    }
133
134    fn is_empty(&self) -> bool {
135        matches!(self.label, BinLabel::Empty)
136    }
137
138    fn merge(&mut self, other: Bin) -> &mut Self {
139        // Assign values based on this priority order (highest priority higher up):
140        //   1. TransferredBytes
141        //   2. Pending
142        //   3. NoPolling
143        //   4. Empty
144        self.label = if other.label > self.label {
145            other.label
146        } else {
147            self.label
148        };
149        self.bytes += other.bytes;
150        self
151    }
152
153    /// Number of bytes transferred during this bin
154    fn bytes(&self) -> u64 {
155        self.bytes
156    }
157}
158
159#[derive(Copy, Clone, Debug, Default)]
160struct BinCounts {
161    /// Number of bins with no data.
162    empty: usize,
163    /// Number of "no polling" bins.
164    no_polling: usize,
165    /// Number of "bytes transferred" bins.
166    transferred: usize,
167    /// Number of "pending" bins.
168    pending: usize,
169}
170
171/// Underlying stack-allocated linear grid buffer for tracking
172/// throughput events for [`ThroughputLogs`].
173#[derive(Copy, Clone, Debug)]
174struct LogBuffer<const N: usize> {
175    entries: [Bin; N],
176    // The length only needs to exist so that the `fill_gaps` function
177    // can differentiate between `Empty` due to there not having been enough
178    // time to establish a full buffer worth of data vs. `Empty` due to a
179    // polling gap. Once the length reaches N, it will never change again.
180    length: usize,
181}
182
183impl<const N: usize> LogBuffer<N> {
184    fn new() -> Self {
185        Self {
186            entries: [Bin::empty(); N],
187            length: 0,
188        }
189    }
190
191    /// Mutably returns the tail of the buffer.
192    ///
193    /// ## Panics
194    ///
195    /// The buffer MUST have at least one bin in it before this is called.
196    fn tail_mut(&mut self) -> &mut Bin {
197        debug_assert!(self.length > 0);
198        &mut self.entries[self.length - 1]
199    }
200
201    /// Pushes a bin into the buffer. If the buffer is already full,
202    /// then this will rotate the entire buffer to the left.
203    fn push(&mut self, bin: Bin) {
204        if self.filled() {
205            self.entries.rotate_left(1);
206            self.entries[N - 1] = bin;
207        } else {
208            self.entries[self.length] = bin;
209            self.length += 1;
210        }
211    }
212
213    /// Returns the total number of bytes transferred within the time window.
214    fn bytes_transferred(&self) -> u64 {
215        self.entries.iter().take(self.length).map(Bin::bytes).sum()
216    }
217
218    #[inline]
219    fn filled(&self) -> bool {
220        self.length == N
221    }
222
223    /// Fills in missing NoData entries.
224    ///
225    /// We want NoData entries to represent when a future hasn't been polled.
226    /// Since the future is in charge of logging in the first place, the only
227    /// way we can know about these is by examining gaps in time.
228    fn fill_gaps(&mut self) {
229        for entry in self.entries.iter_mut().take(self.length) {
230            if entry.is_empty() {
231                *entry = Bin::new(BinLabel::NoPolling, 0);
232            }
233        }
234    }
235
236    /// Returns the counts of each bin type in the buffer.
237    fn counts(&self) -> BinCounts {
238        let mut counts = BinCounts::default();
239        for entry in &self.entries {
240            match entry.label {
241                BinLabel::Empty => counts.empty += 1,
242                BinLabel::NoPolling => counts.no_polling += 1,
243                BinLabel::TransferredBytes => counts.transferred += 1,
244                BinLabel::Pending => counts.pending += 1,
245            }
246        }
247        counts
248    }
249
250    /// If this LogBuffer is empty, returns `true`. Else, returns `false`.
251    fn is_empty(&self) -> bool {
252        self.length == 0
253    }
254}
255
256/// Report/summary of all the events in a time window.
257#[cfg_attr(test, derive(Debug, Eq, PartialEq))]
258pub(crate) enum ThroughputReport {
259    /// Not enough data to draw any conclusions. This happens early in a request/response.
260    Incomplete,
261    /// The stream hasn't been polled for most of this time window.
262    NoPolling,
263    /// The stream has been waiting for most of the time window.
264    Pending,
265    /// The stream transferred this amount of throughput during the time window.
266    Transferred(Throughput),
267    /// The stream has completed, no more data is expected.
268    Complete,
269}
270
271const BIN_COUNT: usize = 10;
272
273/// Log of throughput in a request or response stream.
274///
275/// Used to determine if a configured minimum throughput is being met or not
276/// so that a request or response stream can be timed out in the event of a
277/// stall.
278///
279/// Request/response streams push data transfer or pending events to this log
280/// based on what's going on in their poll functions. The log tracks three kinds
281/// of events despite only receiving two: the third is "no polling". The poll
282/// functions cannot know when they're not being polled, so the log examines gaps
283/// in the event history to know when no polling took place.
284///
285/// The event logging is simplified down to a linear grid consisting of 10 "bins",
286/// with each bin representing 1/10th the total time window. When an event is pushed,
287/// it is either merged into the current tail bin, or all the bins are rotated
288/// left to create a new empty tail bin, and then it is merged into that one.
289#[derive(Clone, Debug)]
290pub(super) struct ThroughputLogs {
291    resolution: Duration,
292    current_tail: SystemTime,
293    buffer: LogBuffer<BIN_COUNT>,
294    stream_complete: bool,
295}
296
297impl ThroughputLogs {
298    /// Creates a new log starting at `now` with the given `time_window`.
299    ///
300    /// Note: the `time_window` gets divided by 10 to create smaller sub-windows
301    /// to track throughput. The time window should be configured to be large enough
302    /// so that these sub-windows aren't too small for network-based events.
303    /// A time window of 10ms probably won't work, but 500ms might. The default
304    /// is one second.
305    pub(super) fn new(time_window: Duration, now: SystemTime) -> Self {
306        assert!(!time_window.is_zero());
307        let resolution = time_window.div_f64(BIN_COUNT as f64);
308        Self {
309            resolution,
310            current_tail: now,
311            buffer: LogBuffer::new(),
312            stream_complete: false,
313        }
314    }
315
316    /// Returns the resolution at which events are logged at.
317    ///
318    /// The resolution is the number of bins in the time window.
319    pub(super) fn resolution(&self) -> Duration {
320        self.resolution
321    }
322
323    /// Pushes a "pending" event.
324    ///
325    /// Pending indicates the streaming future is waiting for something.
326    /// In an upload, it is waiting for data from the user, and in a download,
327    /// it is waiting for data from the server.
328    pub(super) fn push_pending(&mut self, time: SystemTime) {
329        self.push(time, Bin::new(BinLabel::Pending, 0));
330    }
331
332    /// Pushes a data transferred event.
333    ///
334    /// Indicates that this number of bytes were transferred at this time.
335    pub(super) fn push_bytes_transferred(&mut self, time: SystemTime, bytes: u64) {
336        self.push(time, Bin::new(BinLabel::TransferredBytes, bytes));
337    }
338
339    fn push(&mut self, now: SystemTime, value: Bin) {
340        self.catch_up(now);
341        if self.buffer.is_empty() {
342            self.buffer.push(value)
343        } else {
344            self.buffer.tail_mut().merge(value);
345        }
346        self.buffer.fill_gaps();
347    }
348
349    /// Pushes empty bins until `current_tail` is caught up to `now`.
350    fn catch_up(&mut self, now: SystemTime) {
351        while now >= self.current_tail {
352            self.current_tail += self.resolution;
353            self.buffer.push(Bin::empty());
354        }
355        assert!(self.current_tail >= now);
356    }
357
358    /// Mark the stream complete indicating no more data is expected. This is an
359    /// idempotent operation -- subsequent invocations of this function have no effect
360    /// and return false.
361    ///
362    /// After marking a stream complete [report](#method.report) will forever more return
363    /// [ThroughputReport::Complete]
364    pub(super) fn mark_complete(&mut self) -> bool {
365        let prev = self.stream_complete;
366        self.stream_complete = true;
367        !prev
368    }
369
370    /// Generates an overall report of the time window.
371    pub(super) fn report(&mut self, now: SystemTime) -> ThroughputReport {
372        if self.stream_complete {
373            return ThroughputReport::Complete;
374        }
375
376        self.catch_up(now);
377        self.buffer.fill_gaps();
378
379        let BinCounts {
380            empty,
381            no_polling,
382            transferred,
383            pending,
384        } = self.buffer.counts();
385
386        // If there are any empty cells at all, then we haven't been tracking
387        // long enough to make any judgements about the stream's progress.
388        if empty > 0 {
389            return ThroughputReport::Incomplete;
390        }
391
392        let bytes = self.buffer.bytes_transferred();
393        let time = self.resolution * (BIN_COUNT - empty) as u32;
394        let throughput = Throughput::new(bytes, time);
395
396        let half = BIN_COUNT / 2;
397        match (transferred > 0, no_polling >= half, pending >= half) {
398            (true, _, _) => ThroughputReport::Transferred(throughput),
399            (_, true, _) => ThroughputReport::NoPolling,
400            (_, _, true) => ThroughputReport::Pending,
401            _ => ThroughputReport::Incomplete,
402        }
403    }
404}
405
406#[cfg(test)]
407mod test {
408    use super::*;
409    use std::time::Duration;
410
411    #[test]
412    fn test_log_buffer_bin_label_priority() {
413        use BinLabel::*;
414        assert!(Empty < NoPolling);
415        assert!(NoPolling < Pending);
416        assert!(Pending < TransferredBytes);
417    }
418
419    #[test]
420    fn test_throughput_eq() {
421        let t1 = Throughput::new(1, Duration::from_secs(1));
422        let t2 = Throughput::new(25, Duration::from_secs(25));
423        let t3 = Throughput::new(100, Duration::from_secs(100));
424
425        assert_eq!(t1, t2);
426        assert_eq!(t2, t3);
427    }
428
429    #[test]
430    fn incomplete_no_entries() {
431        let start = SystemTime::UNIX_EPOCH;
432        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
433        let report = logs.report(start);
434        assert_eq!(ThroughputReport::Incomplete, report);
435    }
436
437    #[test]
438    fn incomplete_with_entries() {
439        let start = SystemTime::UNIX_EPOCH;
440        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
441        logs.push_pending(start);
442
443        let report = logs.report(start + Duration::from_millis(300));
444        assert_eq!(ThroughputReport::Incomplete, report);
445    }
446
447    #[test]
448    fn incomplete_with_transferred() {
449        let start = SystemTime::UNIX_EPOCH;
450        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
451        logs.push_pending(start);
452        logs.push_bytes_transferred(start + Duration::from_millis(100), 10);
453
454        let report = logs.report(start + Duration::from_millis(300));
455        assert_eq!(ThroughputReport::Incomplete, report);
456    }
457
458    #[test]
459    fn push_pending_at_the_beginning_of_each_tick() {
460        let start = SystemTime::UNIX_EPOCH;
461        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
462
463        let mut now = start;
464        for i in 1..=BIN_COUNT {
465            logs.push_pending(now);
466            now += logs.resolution();
467
468            assert_eq!(i, logs.buffer.counts().pending);
469        }
470
471        let report = dbg!(&mut logs).report(now);
472        assert_eq!(ThroughputReport::Pending, report);
473    }
474
475    #[test]
476    fn push_pending_at_the_end_of_each_tick() {
477        let start = SystemTime::UNIX_EPOCH;
478        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
479
480        let mut now = start;
481        for i in 1..BIN_COUNT {
482            now += logs.resolution();
483            logs.push_pending(now);
484
485            assert_eq!(i, dbg!(&logs).buffer.counts().pending);
486            assert_eq!(0, logs.buffer.counts().transferred);
487            assert_eq!(1, logs.buffer.counts().no_polling);
488        }
489        // This should replace the initial "no polling" bin
490        now += logs.resolution();
491        logs.push_pending(now);
492        assert_eq!(0, logs.buffer.counts().no_polling);
493
494        let report = dbg!(&mut logs).report(now);
495        assert_eq!(ThroughputReport::Pending, report);
496    }
497
498    #[test]
499    fn push_transferred_at_the_beginning_of_each_tick() {
500        let start = SystemTime::UNIX_EPOCH;
501        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
502
503        let mut now = start;
504        for i in 1..=BIN_COUNT {
505            logs.push_bytes_transferred(now, 10);
506            if i != BIN_COUNT {
507                now += logs.resolution();
508            }
509
510            assert_eq!(i, logs.buffer.counts().transferred);
511            assert_eq!(0, logs.buffer.counts().pending);
512            assert_eq!(0, logs.buffer.counts().no_polling);
513        }
514
515        let report = dbg!(&mut logs).report(now);
516        assert_eq!(
517            ThroughputReport::Transferred(Throughput::new(100, Duration::from_secs(1))),
518            report
519        );
520    }
521
522    #[test]
523    fn no_polling() {
524        let start = SystemTime::UNIX_EPOCH;
525        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
526        let report = logs.report(start + Duration::from_secs(2));
527        assert_eq!(ThroughputReport::NoPolling, report);
528    }
529
530    // Transferred bytes MUST take priority over pending when reporting throughput
531    #[test]
532    fn mixed_bag_mostly_pending() {
533        let start = SystemTime::UNIX_EPOCH;
534        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
535
536        logs.push_bytes_transferred(start + Duration::from_millis(50), 10);
537        logs.push_pending(start + Duration::from_millis(150));
538        logs.push_pending(start + Duration::from_millis(250));
539        logs.push_bytes_transferred(start + Duration::from_millis(350), 10);
540        logs.push_pending(start + Duration::from_millis(450));
541        // skip 550
542        logs.push_pending(start + Duration::from_millis(650));
543        logs.push_pending(start + Duration::from_millis(750));
544        logs.push_pending(start + Duration::from_millis(850));
545
546        let report = logs.report(start + Duration::from_millis(999));
547        assert_eq!(
548            ThroughputReport::Transferred(Throughput::new_bytes_per_second(20)),
549            report
550        );
551    }
552
553    #[test]
554    fn mixed_bag_mostly_pending_no_transferred() {
555        let start = SystemTime::UNIX_EPOCH;
556        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
557
558        logs.push_pending(start + Duration::from_millis(50));
559        logs.push_pending(start + Duration::from_millis(150));
560        logs.push_pending(start + Duration::from_millis(250));
561        // skip 350
562        logs.push_pending(start + Duration::from_millis(450));
563        // skip 550
564        logs.push_pending(start + Duration::from_millis(650));
565        logs.push_pending(start + Duration::from_millis(750));
566        logs.push_pending(start + Duration::from_millis(850));
567
568        let report = logs.report(start + Duration::from_millis(999));
569        assert_eq!(ThroughputReport::Pending, report);
570    }
571
572    #[test]
573    fn test_first_push_succeeds_although_time_window_has_not_elapsed() {
574        let t0 = SystemTime::UNIX_EPOCH;
575        let t1 = t0 + Duration::from_secs(1);
576        let mut tl = ThroughputLogs::new(Duration::from_secs(1), t1);
577
578        tl.push_pending(t0);
579    }
580
581    #[test]
582    fn test_label_transferred_bytes_should_not_be_overwritten_by_pending() {
583        let start = SystemTime::UNIX_EPOCH;
584        // Each `Bin`'s resolution is 100ms (1s / BIN_COUNT), where `BIN_COUNT` is 10
585        let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
586
587        // push `TransferredBytes` and then `Pending` in the same first `Bin`
588        logs.push_bytes_transferred(start + Duration::from_millis(10), 10);
589        logs.push_pending(start + Duration::from_millis(20));
590
591        let BinCounts {
592            empty,
593            no_polling,
594            transferred,
595            pending,
596        } = logs.buffer.counts();
597
598        assert_eq!(9, empty);
599        assert_eq!(0, no_polling);
600        assert_eq!(1, transferred); // `transferred` should still be there
601        assert_eq!(0, pending); // while `pending` should cease to exist, failing to overwrite `transferred`
602    }
603}