aws_smithy_runtime/client/retries/
client_rate_limiter.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! A rate limiter for controlling the rate at which AWS requests are made. The rate changes based
7//! on the number of throttling errors encountered.
8
9#![allow(dead_code)]
10
11use crate::client::retries::RetryPartition;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use tracing::debug;
15
16/// Represents a partition for the rate limiter, e.g. an endpoint, a region
17#[non_exhaustive]
18#[derive(Clone, Debug, Hash, PartialEq, Eq)]
19pub struct ClientRateLimiterPartition {
20    retry_partition: RetryPartition,
21}
22
23impl ClientRateLimiterPartition {
24    /// Creates a `ClientRateLimiterPartition` from the given [`RetryPartition`]
25    pub fn new(retry_partition: RetryPartition) -> Self {
26        Self { retry_partition }
27    }
28}
29
30const RETRY_COST: f64 = 5.0;
31const RETRY_TIMEOUT_COST: f64 = RETRY_COST * 2.0;
32const INITIAL_REQUEST_COST: f64 = 1.0;
33
34const MIN_FILL_RATE: f64 = 0.5;
35const MIN_CAPACITY: f64 = 1.0;
36const SMOOTH: f64 = 0.8;
37/// How much to scale back after receiving a throttling response
38const BETA: f64 = 0.7;
39/// Controls how aggressively we scale up after being throttled
40const SCALE_CONSTANT: f64 = 0.4;
41
42/// Rate limiter for adaptive retry.
43#[derive(Clone, Debug)]
44pub struct ClientRateLimiter {
45    inner: Arc<Mutex<Inner>>,
46}
47
48#[derive(Debug)]
49pub(crate) struct Inner {
50    /// The rate at which token are replenished.
51    fill_rate: f64,
52    /// The maximum capacity allowed in the token bucket.
53    max_capacity: f64,
54    /// The current capacity of the token bucket.
55    current_capacity: f64,
56    /// The last time the token bucket was refilled.
57    last_timestamp: Option<f64>,
58    /// Boolean indicating if the token bucket is enabled.
59    /// The token bucket is initially disabled.
60    /// When a throttling error is encountered it is enabled.
61    enabled: bool,
62    /// The smoothed rate which tokens are being retrieved.
63    measured_tx_rate: f64,
64    /// The last half second time bucket used.
65    last_tx_rate_bucket: f64,
66    /// The number of requests seen within the current time bucket.
67    request_count: u64,
68    /// The maximum rate when the client was last throttled.
69    last_max_rate: f64,
70    /// The last time when the client was throttled.
71    time_of_last_throttle: f64,
72}
73
74pub(crate) enum RequestReason {
75    Retry,
76    RetryTimeout,
77    InitialRequest,
78}
79
80impl ClientRateLimiter {
81    /// Creates a new `ClientRateLimiter`
82    pub fn new(seconds_since_unix_epoch: f64) -> Self {
83        Self::builder()
84            .tokens_retrieved_per_second(MIN_FILL_RATE)
85            .time_of_last_throttle(seconds_since_unix_epoch)
86            .previous_time_bucket(seconds_since_unix_epoch.floor())
87            .build()
88    }
89
90    fn builder() -> Builder {
91        Builder::new()
92    }
93
94    pub(crate) fn acquire_permission_to_send_a_request(
95        &self,
96        seconds_since_unix_epoch: f64,
97        kind: RequestReason,
98    ) -> Result<(), Duration> {
99        let mut it = self.inner.lock().unwrap();
100
101        if !it.enabled {
102            // return early if we haven't encountered a throttling error yet
103            return Ok(());
104        }
105        let amount = match kind {
106            RequestReason::Retry => RETRY_COST,
107            RequestReason::RetryTimeout => RETRY_TIMEOUT_COST,
108            RequestReason::InitialRequest => INITIAL_REQUEST_COST,
109        };
110
111        it.refill(seconds_since_unix_epoch);
112
113        let res = if amount > it.current_capacity {
114            let sleep_time = (amount - it.current_capacity) / it.fill_rate;
115            debug!(
116                amount,
117                it.current_capacity,
118                it.fill_rate,
119                sleep_time,
120                "client rate limiter delayed a request"
121            );
122
123            Err(Duration::from_secs_f64(sleep_time))
124        } else {
125            Ok(())
126        };
127
128        it.current_capacity -= amount;
129        res
130    }
131
132    pub(crate) fn update_rate_limiter(
133        &self,
134        seconds_since_unix_epoch: f64,
135        is_throttling_error: bool,
136    ) {
137        let mut it = self.inner.lock().unwrap();
138        it.update_tokens_retrieved_per_second(seconds_since_unix_epoch);
139
140        let calculated_rate;
141        if is_throttling_error {
142            let rate_to_use = if it.enabled {
143                f64::min(it.measured_tx_rate, it.fill_rate)
144            } else {
145                it.measured_tx_rate
146            };
147
148            // The fill_rate is from the token bucket
149            it.last_max_rate = rate_to_use;
150            it.calculate_time_window();
151            it.time_of_last_throttle = seconds_since_unix_epoch;
152            calculated_rate = cubic_throttle(rate_to_use);
153            it.enable_token_bucket();
154        } else {
155            it.calculate_time_window();
156            calculated_rate = it.cubic_success(seconds_since_unix_epoch);
157        }
158
159        let new_rate = f64::min(calculated_rate, 2.0 * it.measured_tx_rate);
160        it.update_bucket_refill_rate(seconds_since_unix_epoch, new_rate);
161    }
162}
163
164impl Inner {
165    fn refill(&mut self, seconds_since_unix_epoch: f64) {
166        if let Some(last_timestamp) = self.last_timestamp {
167            let fill_amount = (seconds_since_unix_epoch - last_timestamp) * self.fill_rate;
168            self.current_capacity =
169                f64::min(self.max_capacity, self.current_capacity + fill_amount);
170            debug!(
171                fill_amount,
172                self.current_capacity, self.max_capacity, "refilling client rate limiter tokens"
173            );
174        }
175        self.last_timestamp = Some(seconds_since_unix_epoch);
176    }
177
178    fn update_bucket_refill_rate(&mut self, seconds_since_unix_epoch: f64, new_fill_rate: f64) {
179        // Refill based on our current rate before we update to the new fill rate.
180        self.refill(seconds_since_unix_epoch);
181
182        self.fill_rate = f64::max(new_fill_rate, MIN_FILL_RATE);
183        self.max_capacity = f64::max(new_fill_rate, MIN_CAPACITY);
184
185        debug!(
186            fill_rate = self.fill_rate,
187            max_capacity = self.max_capacity,
188            current_capacity = self.current_capacity,
189            measured_tx_rate = self.measured_tx_rate,
190            "client rate limiter state has been updated"
191        );
192
193        // When we scale down we can't have a current capacity that exceeds our max_capacity.
194        self.current_capacity = f64::min(self.current_capacity, self.max_capacity);
195    }
196
197    fn enable_token_bucket(&mut self) {
198        // If throttling wasn't already enabled, note that we're now enabling it.
199        if !self.enabled {
200            debug!("client rate limiting has been enabled");
201        }
202        self.enabled = true;
203    }
204
205    fn update_tokens_retrieved_per_second(&mut self, seconds_since_unix_epoch: f64) {
206        let next_time_bucket = (seconds_since_unix_epoch * 2.0).floor() / 2.0;
207        self.request_count += 1;
208
209        if next_time_bucket > self.last_tx_rate_bucket {
210            let current_rate =
211                self.request_count as f64 / (next_time_bucket - self.last_tx_rate_bucket);
212            self.measured_tx_rate = current_rate * SMOOTH + self.measured_tx_rate * (1.0 - SMOOTH);
213            self.request_count = 0;
214            self.last_tx_rate_bucket = next_time_bucket;
215        }
216    }
217
218    fn calculate_time_window(&self) -> f64 {
219        let base = (self.last_max_rate * (1.0 - BETA)) / SCALE_CONSTANT;
220        base.powf(1.0 / 3.0)
221    }
222
223    fn cubic_success(&self, seconds_since_unix_epoch: f64) -> f64 {
224        let dt =
225            seconds_since_unix_epoch - self.time_of_last_throttle - self.calculate_time_window();
226        (SCALE_CONSTANT * dt.powi(3)) + self.last_max_rate
227    }
228}
229
230fn cubic_throttle(rate_to_use: f64) -> f64 {
231    rate_to_use * BETA
232}
233
234#[derive(Clone, Debug, Default)]
235struct Builder {
236    ///The rate at which token are replenished.
237    token_refill_rate: Option<f64>,
238    ///The maximum capacity allowed in the token bucket.
239    maximum_bucket_capacity: Option<f64>,
240    ///The current capacity of the token bucket. The minimum this can be is 1.0
241    current_bucket_capacity: Option<f64>,
242    ///The last time the token bucket was refilled.
243    time_of_last_refill: Option<f64>,
244    ///The smoothed rate which tokens are being retrieved.
245    tokens_retrieved_per_second: Option<f64>,
246    ///The last half second time bucket used.
247    previous_time_bucket: Option<f64>,
248    ///The number of requests seen within the current time bucket.
249    request_count: Option<u64>,
250    ///Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
251    enable_throttling: Option<bool>,
252    ///The maximum rate when the client was last throttled.
253    tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
254    ///The last time when the client was throttled.
255    time_of_last_throttle: Option<f64>,
256}
257
258impl Builder {
259    fn new() -> Self {
260        Builder::default()
261    }
262    ///The rate at which token are replenished.
263    fn set_token_refill_rate(&mut self, token_refill_rate: Option<f64>) -> &mut Self {
264        self.token_refill_rate = token_refill_rate;
265        self
266    }
267    ///The rate at which token are replenished.
268    fn token_refill_rate(mut self, token_refill_rate: f64) -> Self {
269        self.token_refill_rate = Some(token_refill_rate);
270        self
271    }
272    ///The maximum capacity allowed in the token bucket.
273    fn set_maximum_bucket_capacity(&mut self, maximum_bucket_capacity: Option<f64>) -> &mut Self {
274        self.maximum_bucket_capacity = maximum_bucket_capacity;
275        self
276    }
277    ///The maximum capacity allowed in the token bucket.
278    fn maximum_bucket_capacity(mut self, maximum_bucket_capacity: f64) -> Self {
279        self.maximum_bucket_capacity = Some(maximum_bucket_capacity);
280        self
281    }
282    ///The current capacity of the token bucket. The minimum this can be is 1.0
283    fn set_current_bucket_capacity(&mut self, current_bucket_capacity: Option<f64>) -> &mut Self {
284        self.current_bucket_capacity = current_bucket_capacity;
285        self
286    }
287    ///The current capacity of the token bucket. The minimum this can be is 1.0
288    fn current_bucket_capacity(mut self, current_bucket_capacity: f64) -> Self {
289        self.current_bucket_capacity = Some(current_bucket_capacity);
290        self
291    }
292    ///The last time the token bucket was refilled.
293    fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
294        self.time_of_last_refill = time_of_last_refill;
295        self
296    }
297    ///The last time the token bucket was refilled.
298    fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
299        self.time_of_last_refill = Some(time_of_last_refill);
300        self
301    }
302    ///The smoothed rate which tokens are being retrieved.
303    fn set_tokens_retrieved_per_second(
304        &mut self,
305        tokens_retrieved_per_second: Option<f64>,
306    ) -> &mut Self {
307        self.tokens_retrieved_per_second = tokens_retrieved_per_second;
308        self
309    }
310    ///The smoothed rate which tokens are being retrieved.
311    fn tokens_retrieved_per_second(mut self, tokens_retrieved_per_second: f64) -> Self {
312        self.tokens_retrieved_per_second = Some(tokens_retrieved_per_second);
313        self
314    }
315    ///The last half second time bucket used.
316    fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
317        self.previous_time_bucket = previous_time_bucket;
318        self
319    }
320    ///The last half second time bucket used.
321    fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
322        self.previous_time_bucket = Some(previous_time_bucket);
323        self
324    }
325    ///The number of requests seen within the current time bucket.
326    fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
327        self.request_count = request_count;
328        self
329    }
330    ///The number of requests seen within the current time bucket.
331    fn request_count(mut self, request_count: u64) -> Self {
332        self.request_count = Some(request_count);
333        self
334    }
335    ///Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
336    fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
337        self.enable_throttling = enable_throttling;
338        self
339    }
340    ///Boolean indicating if the token bucket is enabled. The token bucket is initially disabled. When a throttling error is encountered it is enabled.
341    fn enable_throttling(mut self, enable_throttling: bool) -> Self {
342        self.enable_throttling = Some(enable_throttling);
343        self
344    }
345    ///The maximum rate when the client was last throttled.
346    fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
347        &mut self,
348        tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
349    ) -> &mut Self {
350        self.tokens_retrieved_per_second_at_time_of_last_throttle =
351            tokens_retrieved_per_second_at_time_of_last_throttle;
352        self
353    }
354    ///The maximum rate when the client was last throttled.
355    fn tokens_retrieved_per_second_at_time_of_last_throttle(
356        mut self,
357        tokens_retrieved_per_second_at_time_of_last_throttle: f64,
358    ) -> Self {
359        self.tokens_retrieved_per_second_at_time_of_last_throttle =
360            Some(tokens_retrieved_per_second_at_time_of_last_throttle);
361        self
362    }
363    ///The last time when the client was throttled.
364    fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
365        self.time_of_last_throttle = time_of_last_throttle;
366        self
367    }
368    ///The last time when the client was throttled.
369    fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
370        self.time_of_last_throttle = Some(time_of_last_throttle);
371        self
372    }
373
374    fn build(self) -> ClientRateLimiter {
375        ClientRateLimiter {
376            inner: Arc::new(Mutex::new(Inner {
377                fill_rate: self.token_refill_rate.unwrap_or_default(),
378                max_capacity: self.maximum_bucket_capacity.unwrap_or(f64::MAX),
379                current_capacity: self.current_bucket_capacity.unwrap_or_default(),
380                last_timestamp: self.time_of_last_refill,
381                enabled: self.enable_throttling.unwrap_or_default(),
382                measured_tx_rate: self.tokens_retrieved_per_second.unwrap_or_default(),
383                last_tx_rate_bucket: self.previous_time_bucket.unwrap_or_default(),
384                request_count: self.request_count.unwrap_or_default(),
385                last_max_rate: self
386                    .tokens_retrieved_per_second_at_time_of_last_throttle
387                    .unwrap_or_default(),
388                time_of_last_throttle: self.time_of_last_throttle.unwrap_or_default(),
389            })),
390        }
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::{cubic_throttle, ClientRateLimiter};
397    use crate::client::retries::client_rate_limiter::RequestReason;
398    use approx::assert_relative_eq;
399    use aws_smithy_async::rt::sleep::AsyncSleep;
400    use aws_smithy_async::test_util::instant_time_and_sleep;
401    use std::time::{Duration, SystemTime};
402
403    const ONE_SECOND: Duration = Duration::from_secs(1);
404    const TWO_HUNDRED_MILLISECONDS: Duration = Duration::from_millis(200);
405
406    #[test]
407    fn should_match_beta_decrease() {
408        let new_rate = cubic_throttle(10.0);
409        assert_relative_eq!(new_rate, 7.0);
410
411        let rate_limiter = ClientRateLimiter::builder()
412            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
413            .time_of_last_throttle(1.0)
414            .build();
415
416        rate_limiter.inner.lock().unwrap().calculate_time_window();
417        let new_rate = rate_limiter.inner.lock().unwrap().cubic_success(1.0);
418        assert_relative_eq!(new_rate, 7.0);
419    }
420
421    #[tokio::test]
422    async fn throttling_is_enabled_once_throttling_error_is_received() {
423        let rate_limiter = ClientRateLimiter::builder()
424            .previous_time_bucket(0.0)
425            .time_of_last_throttle(0.0)
426            .build();
427
428        assert!(
429            !rate_limiter.inner.lock().unwrap().enabled,
430            "rate_limiter should be disabled by default"
431        );
432        rate_limiter.update_rate_limiter(0.0, true);
433        assert!(
434            rate_limiter.inner.lock().unwrap().enabled,
435            "rate_limiter should be enabled after throttling error"
436        );
437    }
438
439    #[tokio::test]
440    async fn test_calculated_rate_with_successes() {
441        let rate_limiter = ClientRateLimiter::builder()
442            .time_of_last_throttle(5.0)
443            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
444            .build();
445
446        struct Attempt {
447            seconds_since_unix_epoch: f64,
448            expected_calculated_rate: f64,
449        }
450
451        let attempts = [
452            Attempt {
453                seconds_since_unix_epoch: 5.0,
454                expected_calculated_rate: 7.0,
455            },
456            Attempt {
457                seconds_since_unix_epoch: 6.0,
458                expected_calculated_rate: 9.64893600966,
459            },
460            Attempt {
461                seconds_since_unix_epoch: 7.0,
462                expected_calculated_rate: 10.000030849917364,
463            },
464            Attempt {
465                seconds_since_unix_epoch: 8.0,
466                expected_calculated_rate: 10.453284520772092,
467            },
468            Attempt {
469                seconds_since_unix_epoch: 9.0,
470                expected_calculated_rate: 13.408697022224185,
471            },
472            Attempt {
473                seconds_since_unix_epoch: 10.0,
474                expected_calculated_rate: 21.26626835427364,
475            },
476            Attempt {
477                seconds_since_unix_epoch: 11.0,
478                expected_calculated_rate: 36.425998516920465,
479            },
480        ];
481
482        // Think this test is a little strange? I ported the test from Go v2, and this is how it
483        // was implemented. See for yourself:
484        // https://github.com/aws/aws-sdk-go-v2/blob/844ff45cdc76182229ad098c95bf3f5ab8c20e9f/aws/retry/adaptive_ratelimit_test.go#L97
485        for attempt in attempts {
486            rate_limiter.inner.lock().unwrap().calculate_time_window();
487            let calculated_rate = rate_limiter
488                .inner
489                .lock()
490                .unwrap()
491                .cubic_success(attempt.seconds_since_unix_epoch);
492
493            assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
494        }
495    }
496
497    #[tokio::test]
498    async fn test_calculated_rate_with_throttles() {
499        let rate_limiter = ClientRateLimiter::builder()
500            .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
501            .time_of_last_throttle(5.0)
502            .build();
503
504        struct Attempt {
505            throttled: bool,
506            seconds_since_unix_epoch: f64,
507            expected_calculated_rate: f64,
508        }
509
510        let attempts = [
511            Attempt {
512                throttled: false,
513                seconds_since_unix_epoch: 5.0,
514                expected_calculated_rate: 7.0,
515            },
516            Attempt {
517                throttled: false,
518                seconds_since_unix_epoch: 6.0,
519                expected_calculated_rate: 9.64893600966,
520            },
521            Attempt {
522                throttled: true,
523                seconds_since_unix_epoch: 7.0,
524                expected_calculated_rate: 6.754255206761999,
525            },
526            Attempt {
527                throttled: true,
528                seconds_since_unix_epoch: 8.0,
529                expected_calculated_rate: 4.727978644733399,
530            },
531            Attempt {
532                throttled: false,
533                seconds_since_unix_epoch: 9.0,
534                expected_calculated_rate: 4.670125557970046,
535            },
536            Attempt {
537                throttled: false,
538                seconds_since_unix_epoch: 10.0,
539                expected_calculated_rate: 4.770870456867401,
540            },
541            Attempt {
542                throttled: false,
543                seconds_since_unix_epoch: 11.0,
544                expected_calculated_rate: 6.011819748005445,
545            },
546            Attempt {
547                throttled: false,
548                seconds_since_unix_epoch: 12.0,
549                expected_calculated_rate: 10.792973431384178,
550            },
551        ];
552
553        // Think this test is a little strange? I ported the test from Go v2, and this is how it
554        // was implemented. See for yourself:
555        // https://github.com/aws/aws-sdk-go-v2/blob/844ff45cdc76182229ad098c95bf3f5ab8c20e9f/aws/retry/adaptive_ratelimit_test.go#L97
556        let mut calculated_rate = 0.0;
557        for attempt in attempts {
558            let mut inner = rate_limiter.inner.lock().unwrap();
559            inner.calculate_time_window();
560            if attempt.throttled {
561                calculated_rate = cubic_throttle(calculated_rate);
562                inner.time_of_last_throttle = attempt.seconds_since_unix_epoch;
563                inner.last_max_rate = calculated_rate;
564            } else {
565                calculated_rate = inner.cubic_success(attempt.seconds_since_unix_epoch);
566            };
567
568            assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
569        }
570    }
571
572    #[tokio::test]
573    async fn test_client_sending_rates() {
574        let (_, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
575        let rate_limiter = ClientRateLimiter::builder().build();
576
577        struct Attempt {
578            throttled: bool,
579            seconds_since_unix_epoch: f64,
580            expected_tokens_retrieved_per_second: f64,
581            expected_token_refill_rate: f64,
582        }
583
584        let attempts = [
585            Attempt {
586                throttled: false,
587                seconds_since_unix_epoch: 0.2,
588                expected_tokens_retrieved_per_second: 0.000000,
589                expected_token_refill_rate: 0.500000,
590            },
591            Attempt {
592                throttled: false,
593                seconds_since_unix_epoch: 0.4,
594                expected_tokens_retrieved_per_second: 0.000000,
595                expected_token_refill_rate: 0.500000,
596            },
597            Attempt {
598                throttled: false,
599                seconds_since_unix_epoch: 0.6,
600                expected_tokens_retrieved_per_second: 4.800000000000001,
601                expected_token_refill_rate: 0.500000,
602            },
603            Attempt {
604                throttled: false,
605                seconds_since_unix_epoch: 0.8,
606                expected_tokens_retrieved_per_second: 4.800000000000001,
607                expected_token_refill_rate: 0.500000,
608            },
609            Attempt {
610                throttled: false,
611                seconds_since_unix_epoch: 1.0,
612                expected_tokens_retrieved_per_second: 4.160000,
613                expected_token_refill_rate: 0.500000,
614            },
615            Attempt {
616                throttled: false,
617                seconds_since_unix_epoch: 1.2,
618                expected_tokens_retrieved_per_second: 4.160000,
619                expected_token_refill_rate: 0.691200,
620            },
621            Attempt {
622                throttled: false,
623                seconds_since_unix_epoch: 1.4,
624                expected_tokens_retrieved_per_second: 4.160000,
625                expected_token_refill_rate: 1.0975999999999997,
626            },
627            Attempt {
628                throttled: false,
629                seconds_since_unix_epoch: 1.6,
630                expected_tokens_retrieved_per_second: 5.632000000000001,
631                expected_token_refill_rate: 1.6384000000000005,
632            },
633            Attempt {
634                throttled: false,
635                seconds_since_unix_epoch: 1.8,
636                expected_tokens_retrieved_per_second: 5.632000000000001,
637                expected_token_refill_rate: 2.332800,
638            },
639            Attempt {
640                throttled: true,
641                seconds_since_unix_epoch: 2.0,
642                expected_tokens_retrieved_per_second: 4.326400,
643                expected_token_refill_rate: 3.0284799999999996,
644            },
645            Attempt {
646                throttled: false,
647                seconds_since_unix_epoch: 2.2,
648                expected_tokens_retrieved_per_second: 4.326400,
649                expected_token_refill_rate: 3.48663917347026,
650            },
651            Attempt {
652                throttled: false,
653                seconds_since_unix_epoch: 2.4,
654                expected_tokens_retrieved_per_second: 4.326400,
655                expected_token_refill_rate: 3.821874416040255,
656            },
657            Attempt {
658                throttled: false,
659                seconds_since_unix_epoch: 2.6,
660                expected_tokens_retrieved_per_second: 5.665280,
661                expected_token_refill_rate: 4.053385727709987,
662            },
663            Attempt {
664                throttled: false,
665                seconds_since_unix_epoch: 2.8,
666                expected_tokens_retrieved_per_second: 5.665280,
667                expected_token_refill_rate: 4.200373108479454,
668            },
669            Attempt {
670                throttled: false,
671                seconds_since_unix_epoch: 3.0,
672                expected_tokens_retrieved_per_second: 4.333056,
673                expected_token_refill_rate: 4.282036558348658,
674            },
675            Attempt {
676                throttled: true,
677                seconds_since_unix_epoch: 3.2,
678                expected_tokens_retrieved_per_second: 4.333056,
679                expected_token_refill_rate: 2.99742559084406,
680            },
681            Attempt {
682                throttled: false,
683                seconds_since_unix_epoch: 3.4,
684                expected_tokens_retrieved_per_second: 4.333056,
685                expected_token_refill_rate: 3.4522263943863463,
686            },
687        ];
688
689        for attempt in attempts {
690            sleep_impl.sleep(TWO_HUNDRED_MILLISECONDS).await;
691            assert_eq!(
692                attempt.seconds_since_unix_epoch,
693                sleep_impl.total_duration().as_secs_f64()
694            );
695
696            rate_limiter.update_rate_limiter(attempt.seconds_since_unix_epoch, attempt.throttled);
697            assert_relative_eq!(
698                attempt.expected_tokens_retrieved_per_second,
699                rate_limiter.inner.lock().unwrap().measured_tx_rate
700            );
701            assert_relative_eq!(
702                attempt.expected_token_refill_rate,
703                rate_limiter.inner.lock().unwrap().fill_rate
704            );
705        }
706    }
707
708    // This test is only testing that we don't fail basic math and panic. It does include an
709    // element of randomness, but no duration between >= 0.0s and <= 1.0s will ever cause a panic.
710    //
711    // Because the cost of sending an individual request is 1.0, and because the minimum capacity is
712    // also 1.0, we will never encounter a situation where we run out of tokens.
713    #[tokio::test]
714    async fn test_when_throttling_is_enabled_requests_can_still_be_sent() {
715        let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
716        let crl = ClientRateLimiter::builder()
717            .time_of_last_throttle(0.0)
718            .previous_time_bucket(0.0)
719            .build();
720
721        // Start by recording a throttling error
722        crl.update_rate_limiter(0.0, true);
723
724        for _i in 0..100 {
725            // advance time by a random amount (up to 1s) each iteration
726            let duration = Duration::from_secs_f64(fastrand::f64());
727            sleep_impl.sleep(duration).await;
728            if let Err(delay) = crl.acquire_permission_to_send_a_request(
729                time_source.seconds_since_unix_epoch(),
730                RequestReason::InitialRequest,
731            ) {
732                sleep_impl.sleep(delay).await;
733            }
734
735            // Assume all further requests succeed on the first try
736            crl.update_rate_limiter(time_source.seconds_since_unix_epoch(), false);
737        }
738
739        let inner = crl.inner.lock().unwrap();
740        assert!(inner.enabled, "the rate limiter should still be enabled");
741        // Assert that the rate limiter respects the passage of time.
742        assert_relative_eq!(
743            inner.last_timestamp.unwrap(),
744            sleep_impl.total_duration().as_secs_f64(),
745            max_relative = 0.0001
746        );
747    }
748}