1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
56use std::fmt;
7use std::time::{Duration, SystemTime};
89/// Throughput representation for use when configuring [`super::MinimumThroughputBody`]
10#[derive(Debug, Clone, Copy)]
11#[cfg_attr(test, derive(Eq))]
12pub struct Throughput {
13pub(super) bytes_read: u64,
14pub(super) per_time_elapsed: Duration,
15}
1617impl Throughput {
18/// Create a new throughput with the given bytes read and time elapsed.
19pub fn new(bytes_read: u64, per_time_elapsed: Duration) -> Self {
20debug_assert!(
21 !per_time_elapsed.is_zero(),
22"cannot create a throughput if per_time_elapsed == 0"
23);
2425Self {
26 bytes_read,
27 per_time_elapsed,
28 }
29 }
3031/// Create a new throughput in bytes per second.
32pub const fn new_bytes_per_second(bytes: u64) -> Self {
33Self {
34 bytes_read: bytes,
35 per_time_elapsed: Duration::from_secs(1),
36 }
37 }
3839/// Create a new throughput in kilobytes per second.
40pub const fn new_kilobytes_per_second(kilobytes: u64) -> Self {
41Self {
42 bytes_read: kilobytes * 1000,
43 per_time_elapsed: Duration::from_secs(1),
44 }
45 }
4647/// Create a new throughput in megabytes per second.
48pub const fn new_megabytes_per_second(megabytes: u64) -> Self {
49Self {
50 bytes_read: megabytes * 1000 * 1000,
51 per_time_elapsed: Duration::from_secs(1),
52 }
53 }
5455pub(super) fn bytes_per_second(&self) -> f64 {
56let per_time_elapsed_secs = self.per_time_elapsed.as_secs_f64();
57if per_time_elapsed_secs == 0.0 {
58return 0.0; // Avoid dividing by zero.
59};
6061self.bytes_read as f64 / per_time_elapsed_secs
62 }
63}
6465impl PartialEq for Throughput {
66fn eq(&self, other: &Self) -> bool {
67self.bytes_per_second() == other.bytes_per_second()
68 }
69}
7071impl PartialOrd for Throughput {
72fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73self.bytes_per_second()
74 .partial_cmp(&other.bytes_per_second())
75 }
76}
7778impl fmt::Display for Throughput {
79fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80// The default float formatting behavior will ensure the a number like 2.000 is rendered as 2
81 // while a number like 0.9982107441748642 will be rendered as 0.9982107441748642. This
82 // multiplication and division will truncate a float to have a precision of no greater than 3.
83 // For example, 0.9982107441748642 would become 0.999. This will fail for very large floats
84 // but should suffice for the numbers we're dealing with.
85let pretty_bytes_per_second = (self.bytes_per_second() * 1000.0).round() / 1000.0;
8687write!(f, "{pretty_bytes_per_second} B/s")
88 }
89}
9091impl From<(u64, Duration)> for Throughput {
92fn from(value: (u64, Duration)) -> Self {
93Self {
94 bytes_read: value.0,
95 per_time_elapsed: value.1,
96 }
97 }
98}
99100/// Overall label for a given bin.
101#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
102enum BinLabel {
103// IMPORTANT: The order of these enums matters since it represents their priority:
104 // TransferredBytes > Pending > NoPolling > Empty
105 //
106/// There is no data in this bin.
107Empty,
108109/// No polling took place during this bin.
110NoPolling,
111112/// The user/remote was not providing/consuming data fast enough during this bin.
113Pending,
114115/// This many bytes were transferred during this bin.
116TransferredBytes,
117}
118119/// Represents a bin (or a cell) in a linear grid that represents a small chunk of time.
120#[derive(Copy, Clone, Debug)]
121struct Bin {
122 label: BinLabel,
123 bytes: u64,
124}
125126impl Bin {
127const fn new(label: BinLabel, bytes: u64) -> Self {
128Self { label, bytes }
129 }
130const fn empty() -> Self {
131Self::new(BinLabel::Empty, 0)
132 }
133134fn is_empty(&self) -> bool {
135matches!(self.label, BinLabel::Empty)
136 }
137138fn merge(&mut self, other: Bin) -> &mut Self {
139// Assign values based on this priority order (highest priority higher up):
140 // 1. TransferredBytes
141 // 2. Pending
142 // 3. NoPolling
143 // 4. Empty
144self.label = if other.label > self.label {
145 other.label
146 } else {
147self.label
148 };
149self.bytes += other.bytes;
150self
151}
152153/// Number of bytes transferred during this bin
154fn bytes(&self) -> u64 {
155self.bytes
156 }
157}
158159#[derive(Copy, Clone, Debug, Default)]
160struct BinCounts {
161/// Number of bins with no data.
162empty: usize,
163/// Number of "no polling" bins.
164no_polling: usize,
165/// Number of "bytes transferred" bins.
166transferred: usize,
167/// Number of "pending" bins.
168pending: usize,
169}
170171/// Underlying stack-allocated linear grid buffer for tracking
172/// throughput events for [`ThroughputLogs`].
173#[derive(Copy, Clone, Debug)]
174struct LogBuffer<const N: usize> {
175 entries: [Bin; N],
176// The length only needs to exist so that the `fill_gaps` function
177 // can differentiate between `Empty` due to there not having been enough
178 // time to establish a full buffer worth of data vs. `Empty` due to a
179 // polling gap. Once the length reaches N, it will never change again.
180length: usize,
181}
182183impl<const N: usize> LogBuffer<N> {
184fn new() -> Self {
185Self {
186 entries: [Bin::empty(); N],
187 length: 0,
188 }
189 }
190191/// Mutably returns the tail of the buffer.
192 ///
193 /// ## Panics
194 ///
195 /// The buffer MUST have at least one bin in it before this is called.
196fn tail_mut(&mut self) -> &mut Bin {
197debug_assert!(self.length > 0);
198&mut self.entries[self.length - 1]
199 }
200201/// Pushes a bin into the buffer. If the buffer is already full,
202 /// then this will rotate the entire buffer to the left.
203fn push(&mut self, bin: Bin) {
204if self.filled() {
205self.entries.rotate_left(1);
206self.entries[N - 1] = bin;
207 } else {
208self.entries[self.length] = bin;
209self.length += 1;
210 }
211 }
212213/// Returns the total number of bytes transferred within the time window.
214fn bytes_transferred(&self) -> u64 {
215self.entries.iter().take(self.length).map(Bin::bytes).sum()
216 }
217218#[inline]
219fn filled(&self) -> bool {
220self.length == N
221 }
222223/// Fills in missing NoData entries.
224 ///
225 /// We want NoData entries to represent when a future hasn't been polled.
226 /// Since the future is in charge of logging in the first place, the only
227 /// way we can know about these is by examining gaps in time.
228fn fill_gaps(&mut self) {
229for entry in self.entries.iter_mut().take(self.length) {
230if entry.is_empty() {
231*entry = Bin::new(BinLabel::NoPolling, 0);
232 }
233 }
234 }
235236/// Returns the counts of each bin type in the buffer.
237fn counts(&self) -> BinCounts {
238let mut counts = BinCounts::default();
239for entry in &self.entries {
240match entry.label {
241 BinLabel::Empty => counts.empty += 1,
242 BinLabel::NoPolling => counts.no_polling += 1,
243 BinLabel::TransferredBytes => counts.transferred += 1,
244 BinLabel::Pending => counts.pending += 1,
245 }
246 }
247 counts
248 }
249250/// If this LogBuffer is empty, returns `true`. Else, returns `false`.
251fn is_empty(&self) -> bool {
252self.length == 0
253}
254}
255256/// Report/summary of all the events in a time window.
257#[cfg_attr(test, derive(Debug, Eq, PartialEq))]
258pub(crate) enum ThroughputReport {
259/// Not enough data to draw any conclusions. This happens early in a request/response.
260Incomplete,
261/// The stream hasn't been polled for most of this time window.
262NoPolling,
263/// The stream has been waiting for most of the time window.
264Pending,
265/// The stream transferred this amount of throughput during the time window.
266Transferred(Throughput),
267/// The stream has completed, no more data is expected.
268Complete,
269}
270271const BIN_COUNT: usize = 10;
272273/// Log of throughput in a request or response stream.
274///
275/// Used to determine if a configured minimum throughput is being met or not
276/// so that a request or response stream can be timed out in the event of a
277/// stall.
278///
279/// Request/response streams push data transfer or pending events to this log
280/// based on what's going on in their poll functions. The log tracks three kinds
281/// of events despite only receiving two: the third is "no polling". The poll
282/// functions cannot know when they're not being polled, so the log examines gaps
283/// in the event history to know when no polling took place.
284///
285/// The event logging is simplified down to a linear grid consisting of 10 "bins",
286/// with each bin representing 1/10th the total time window. When an event is pushed,
287/// it is either merged into the current tail bin, or all the bins are rotated
288/// left to create a new empty tail bin, and then it is merged into that one.
289#[derive(Clone, Debug)]
290pub(super) struct ThroughputLogs {
291 resolution: Duration,
292 current_tail: SystemTime,
293 buffer: LogBuffer<BIN_COUNT>,
294 stream_complete: bool,
295}
296297impl ThroughputLogs {
298/// Creates a new log starting at `now` with the given `time_window`.
299 ///
300 /// Note: the `time_window` gets divided by 10 to create smaller sub-windows
301 /// to track throughput. The time window should be configured to be large enough
302 /// so that these sub-windows aren't too small for network-based events.
303 /// A time window of 10ms probably won't work, but 500ms might. The default
304 /// is one second.
305pub(super) fn new(time_window: Duration, now: SystemTime) -> Self {
306assert!(!time_window.is_zero());
307let resolution = time_window.div_f64(BIN_COUNT as f64);
308Self {
309 resolution,
310 current_tail: now,
311 buffer: LogBuffer::new(),
312 stream_complete: false,
313 }
314 }
315316/// Returns the resolution at which events are logged at.
317 ///
318 /// The resolution is the number of bins in the time window.
319pub(super) fn resolution(&self) -> Duration {
320self.resolution
321 }
322323/// Pushes a "pending" event.
324 ///
325 /// Pending indicates the streaming future is waiting for something.
326 /// In an upload, it is waiting for data from the user, and in a download,
327 /// it is waiting for data from the server.
328pub(super) fn push_pending(&mut self, time: SystemTime) {
329self.push(time, Bin::new(BinLabel::Pending, 0));
330 }
331332/// Pushes a data transferred event.
333 ///
334 /// Indicates that this number of bytes were transferred at this time.
335pub(super) fn push_bytes_transferred(&mut self, time: SystemTime, bytes: u64) {
336self.push(time, Bin::new(BinLabel::TransferredBytes, bytes));
337 }
338339fn push(&mut self, now: SystemTime, value: Bin) {
340self.catch_up(now);
341if self.buffer.is_empty() {
342self.buffer.push(value)
343 } else {
344self.buffer.tail_mut().merge(value);
345 }
346self.buffer.fill_gaps();
347 }
348349/// Pushes empty bins until `current_tail` is caught up to `now`.
350fn catch_up(&mut self, now: SystemTime) {
351while now >= self.current_tail {
352self.current_tail += self.resolution;
353self.buffer.push(Bin::empty());
354 }
355assert!(self.current_tail >= now);
356 }
357358/// Mark the stream complete indicating no more data is expected. This is an
359 /// idempotent operation -- subsequent invocations of this function have no effect
360 /// and return false.
361 ///
362 /// After marking a stream complete [report](#method.report) will forever more return
363 /// [ThroughputReport::Complete]
364pub(super) fn mark_complete(&mut self) -> bool {
365let prev = self.stream_complete;
366self.stream_complete = true;
367 !prev
368 }
369370/// Generates an overall report of the time window.
371pub(super) fn report(&mut self, now: SystemTime) -> ThroughputReport {
372if self.stream_complete {
373return ThroughputReport::Complete;
374 }
375376self.catch_up(now);
377self.buffer.fill_gaps();
378379let BinCounts {
380 empty,
381 no_polling,
382 transferred,
383 pending,
384 } = self.buffer.counts();
385386// If there are any empty cells at all, then we haven't been tracking
387 // long enough to make any judgements about the stream's progress.
388if empty > 0 {
389return ThroughputReport::Incomplete;
390 }
391392let bytes = self.buffer.bytes_transferred();
393let time = self.resolution * (BIN_COUNT - empty) as u32;
394let throughput = Throughput::new(bytes, time);
395396let half = BIN_COUNT / 2;
397match (transferred > 0, no_polling >= half, pending >= half) {
398 (true, _, _) => ThroughputReport::Transferred(throughput),
399 (_, true, _) => ThroughputReport::NoPolling,
400 (_, _, true) => ThroughputReport::Pending,
401_ => ThroughputReport::Incomplete,
402 }
403 }
404}
405406#[cfg(test)]
407mod test {
408use super::*;
409use std::time::Duration;
410411#[test]
412fn test_log_buffer_bin_label_priority() {
413use BinLabel::*;
414assert!(Empty < NoPolling);
415assert!(NoPolling < Pending);
416assert!(Pending < TransferredBytes);
417 }
418419#[test]
420fn test_throughput_eq() {
421let t1 = Throughput::new(1, Duration::from_secs(1));
422let t2 = Throughput::new(25, Duration::from_secs(25));
423let t3 = Throughput::new(100, Duration::from_secs(100));
424425assert_eq!(t1, t2);
426assert_eq!(t2, t3);
427 }
428429#[test]
430fn incomplete_no_entries() {
431let start = SystemTime::UNIX_EPOCH;
432let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
433let report = logs.report(start);
434assert_eq!(ThroughputReport::Incomplete, report);
435 }
436437#[test]
438fn incomplete_with_entries() {
439let start = SystemTime::UNIX_EPOCH;
440let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
441 logs.push_pending(start);
442443let report = logs.report(start + Duration::from_millis(300));
444assert_eq!(ThroughputReport::Incomplete, report);
445 }
446447#[test]
448fn incomplete_with_transferred() {
449let start = SystemTime::UNIX_EPOCH;
450let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
451 logs.push_pending(start);
452 logs.push_bytes_transferred(start + Duration::from_millis(100), 10);
453454let report = logs.report(start + Duration::from_millis(300));
455assert_eq!(ThroughputReport::Incomplete, report);
456 }
457458#[test]
459fn push_pending_at_the_beginning_of_each_tick() {
460let start = SystemTime::UNIX_EPOCH;
461let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
462463let mut now = start;
464for i in 1..=BIN_COUNT {
465 logs.push_pending(now);
466 now += logs.resolution();
467468assert_eq!(i, logs.buffer.counts().pending);
469 }
470471let report = dbg!(&mut logs).report(now);
472assert_eq!(ThroughputReport::Pending, report);
473 }
474475#[test]
476fn push_pending_at_the_end_of_each_tick() {
477let start = SystemTime::UNIX_EPOCH;
478let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
479480let mut now = start;
481for i in 1..BIN_COUNT {
482 now += logs.resolution();
483 logs.push_pending(now);
484485assert_eq!(i, dbg!(&logs).buffer.counts().pending);
486assert_eq!(0, logs.buffer.counts().transferred);
487assert_eq!(1, logs.buffer.counts().no_polling);
488 }
489// This should replace the initial "no polling" bin
490now += logs.resolution();
491 logs.push_pending(now);
492assert_eq!(0, logs.buffer.counts().no_polling);
493494let report = dbg!(&mut logs).report(now);
495assert_eq!(ThroughputReport::Pending, report);
496 }
497498#[test]
499fn push_transferred_at_the_beginning_of_each_tick() {
500let start = SystemTime::UNIX_EPOCH;
501let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
502503let mut now = start;
504for i in 1..=BIN_COUNT {
505 logs.push_bytes_transferred(now, 10);
506if i != BIN_COUNT {
507 now += logs.resolution();
508 }
509510assert_eq!(i, logs.buffer.counts().transferred);
511assert_eq!(0, logs.buffer.counts().pending);
512assert_eq!(0, logs.buffer.counts().no_polling);
513 }
514515let report = dbg!(&mut logs).report(now);
516assert_eq!(
517 ThroughputReport::Transferred(Throughput::new(100, Duration::from_secs(1))),
518 report
519 );
520 }
521522#[test]
523fn no_polling() {
524let start = SystemTime::UNIX_EPOCH;
525let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
526let report = logs.report(start + Duration::from_secs(2));
527assert_eq!(ThroughputReport::NoPolling, report);
528 }
529530// Transferred bytes MUST take priority over pending when reporting throughput
531#[test]
532fn mixed_bag_mostly_pending() {
533let start = SystemTime::UNIX_EPOCH;
534let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
535536 logs.push_bytes_transferred(start + Duration::from_millis(50), 10);
537 logs.push_pending(start + Duration::from_millis(150));
538 logs.push_pending(start + Duration::from_millis(250));
539 logs.push_bytes_transferred(start + Duration::from_millis(350), 10);
540 logs.push_pending(start + Duration::from_millis(450));
541// skip 550
542logs.push_pending(start + Duration::from_millis(650));
543 logs.push_pending(start + Duration::from_millis(750));
544 logs.push_pending(start + Duration::from_millis(850));
545546let report = logs.report(start + Duration::from_millis(999));
547assert_eq!(
548 ThroughputReport::Transferred(Throughput::new_bytes_per_second(20)),
549 report
550 );
551 }
552553#[test]
554fn mixed_bag_mostly_pending_no_transferred() {
555let start = SystemTime::UNIX_EPOCH;
556let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
557558 logs.push_pending(start + Duration::from_millis(50));
559 logs.push_pending(start + Duration::from_millis(150));
560 logs.push_pending(start + Duration::from_millis(250));
561// skip 350
562logs.push_pending(start + Duration::from_millis(450));
563// skip 550
564logs.push_pending(start + Duration::from_millis(650));
565 logs.push_pending(start + Duration::from_millis(750));
566 logs.push_pending(start + Duration::from_millis(850));
567568let report = logs.report(start + Duration::from_millis(999));
569assert_eq!(ThroughputReport::Pending, report);
570 }
571572#[test]
573fn test_first_push_succeeds_although_time_window_has_not_elapsed() {
574let t0 = SystemTime::UNIX_EPOCH;
575let t1 = t0 + Duration::from_secs(1);
576let mut tl = ThroughputLogs::new(Duration::from_secs(1), t1);
577578 tl.push_pending(t0);
579 }
580581#[test]
582fn test_label_transferred_bytes_should_not_be_overwritten_by_pending() {
583let start = SystemTime::UNIX_EPOCH;
584// Each `Bin`'s resolution is 100ms (1s / BIN_COUNT), where `BIN_COUNT` is 10
585let mut logs = ThroughputLogs::new(Duration::from_secs(1), start);
586587// push `TransferredBytes` and then `Pending` in the same first `Bin`
588logs.push_bytes_transferred(start + Duration::from_millis(10), 10);
589 logs.push_pending(start + Duration::from_millis(20));
590591let BinCounts {
592 empty,
593 no_polling,
594 transferred,
595 pending,
596 } = logs.buffer.counts();
597598assert_eq!(9, empty);
599assert_eq!(0, no_polling);
600assert_eq!(1, transferred); // `transferred` should still be there
601assert_eq!(0, pending); // while `pending` should cease to exist, failing to overwrite `transferred`
602}
603}