1#![allow(dead_code)]
10
11use crate::client::retries::RetryPartition;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use tracing::debug;
15
16#[non_exhaustive]
18#[derive(Clone, Debug, Hash, PartialEq, Eq)]
19pub struct ClientRateLimiterPartition {
20 retry_partition: RetryPartition,
21}
22
23impl ClientRateLimiterPartition {
24 pub fn new(retry_partition: RetryPartition) -> Self {
26 Self { retry_partition }
27 }
28}
29
30const RETRY_COST: f64 = 5.0;
31const RETRY_TIMEOUT_COST: f64 = RETRY_COST * 2.0;
32const INITIAL_REQUEST_COST: f64 = 1.0;
33
34const MIN_FILL_RATE: f64 = 0.5;
35const MIN_CAPACITY: f64 = 1.0;
36const SMOOTH: f64 = 0.8;
37const BETA: f64 = 0.7;
39const SCALE_CONSTANT: f64 = 0.4;
41
42#[derive(Clone, Debug)]
44pub struct ClientRateLimiter {
45 inner: Arc<Mutex<Inner>>,
46}
47
48#[derive(Debug)]
49pub(crate) struct Inner {
50 fill_rate: f64,
52 max_capacity: f64,
54 current_capacity: f64,
56 last_timestamp: Option<f64>,
58 enabled: bool,
62 measured_tx_rate: f64,
64 last_tx_rate_bucket: f64,
66 request_count: u64,
68 last_max_rate: f64,
70 time_of_last_throttle: f64,
72}
73
74pub(crate) enum RequestReason {
75 Retry,
76 RetryTimeout,
77 InitialRequest,
78}
79
80impl ClientRateLimiter {
81 pub fn new(seconds_since_unix_epoch: f64) -> Self {
83 Self::builder()
84 .tokens_retrieved_per_second(MIN_FILL_RATE)
85 .time_of_last_throttle(seconds_since_unix_epoch)
86 .previous_time_bucket(seconds_since_unix_epoch.floor())
87 .build()
88 }
89
90 fn builder() -> Builder {
91 Builder::new()
92 }
93
94 pub(crate) fn acquire_permission_to_send_a_request(
95 &self,
96 seconds_since_unix_epoch: f64,
97 kind: RequestReason,
98 ) -> Result<(), Duration> {
99 let mut it = self.inner.lock().unwrap();
100
101 if !it.enabled {
102 return Ok(());
104 }
105 let amount = match kind {
106 RequestReason::Retry => RETRY_COST,
107 RequestReason::RetryTimeout => RETRY_TIMEOUT_COST,
108 RequestReason::InitialRequest => INITIAL_REQUEST_COST,
109 };
110
111 it.refill(seconds_since_unix_epoch);
112
113 let res = if amount > it.current_capacity {
114 let sleep_time = (amount - it.current_capacity) / it.fill_rate;
115 debug!(
116 amount,
117 it.current_capacity,
118 it.fill_rate,
119 sleep_time,
120 "client rate limiter delayed a request"
121 );
122
123 Err(Duration::from_secs_f64(sleep_time))
124 } else {
125 Ok(())
126 };
127
128 it.current_capacity -= amount;
129 res
130 }
131
132 pub(crate) fn update_rate_limiter(
133 &self,
134 seconds_since_unix_epoch: f64,
135 is_throttling_error: bool,
136 ) {
137 let mut it = self.inner.lock().unwrap();
138 it.update_tokens_retrieved_per_second(seconds_since_unix_epoch);
139
140 let calculated_rate;
141 if is_throttling_error {
142 let rate_to_use = if it.enabled {
143 f64::min(it.measured_tx_rate, it.fill_rate)
144 } else {
145 it.measured_tx_rate
146 };
147
148 it.last_max_rate = rate_to_use;
150 it.calculate_time_window();
151 it.time_of_last_throttle = seconds_since_unix_epoch;
152 calculated_rate = cubic_throttle(rate_to_use);
153 it.enable_token_bucket();
154 } else {
155 it.calculate_time_window();
156 calculated_rate = it.cubic_success(seconds_since_unix_epoch);
157 }
158
159 let new_rate = f64::min(calculated_rate, 2.0 * it.measured_tx_rate);
160 it.update_bucket_refill_rate(seconds_since_unix_epoch, new_rate);
161 }
162}
163
164impl Inner {
165 fn refill(&mut self, seconds_since_unix_epoch: f64) {
166 if let Some(last_timestamp) = self.last_timestamp {
167 let fill_amount = (seconds_since_unix_epoch - last_timestamp) * self.fill_rate;
168 self.current_capacity =
169 f64::min(self.max_capacity, self.current_capacity + fill_amount);
170 debug!(
171 fill_amount,
172 self.current_capacity, self.max_capacity, "refilling client rate limiter tokens"
173 );
174 }
175 self.last_timestamp = Some(seconds_since_unix_epoch);
176 }
177
178 fn update_bucket_refill_rate(&mut self, seconds_since_unix_epoch: f64, new_fill_rate: f64) {
179 self.refill(seconds_since_unix_epoch);
181
182 self.fill_rate = f64::max(new_fill_rate, MIN_FILL_RATE);
183 self.max_capacity = f64::max(new_fill_rate, MIN_CAPACITY);
184
185 debug!(
186 fill_rate = self.fill_rate,
187 max_capacity = self.max_capacity,
188 current_capacity = self.current_capacity,
189 measured_tx_rate = self.measured_tx_rate,
190 "client rate limiter state has been updated"
191 );
192
193 self.current_capacity = f64::min(self.current_capacity, self.max_capacity);
195 }
196
197 fn enable_token_bucket(&mut self) {
198 if !self.enabled {
200 debug!("client rate limiting has been enabled");
201 }
202 self.enabled = true;
203 }
204
205 fn update_tokens_retrieved_per_second(&mut self, seconds_since_unix_epoch: f64) {
206 let next_time_bucket = (seconds_since_unix_epoch * 2.0).floor() / 2.0;
207 self.request_count += 1;
208
209 if next_time_bucket > self.last_tx_rate_bucket {
210 let current_rate =
211 self.request_count as f64 / (next_time_bucket - self.last_tx_rate_bucket);
212 self.measured_tx_rate = current_rate * SMOOTH + self.measured_tx_rate * (1.0 - SMOOTH);
213 self.request_count = 0;
214 self.last_tx_rate_bucket = next_time_bucket;
215 }
216 }
217
218 fn calculate_time_window(&self) -> f64 {
219 let base = (self.last_max_rate * (1.0 - BETA)) / SCALE_CONSTANT;
220 base.powf(1.0 / 3.0)
221 }
222
223 fn cubic_success(&self, seconds_since_unix_epoch: f64) -> f64 {
224 let dt =
225 seconds_since_unix_epoch - self.time_of_last_throttle - self.calculate_time_window();
226 (SCALE_CONSTANT * dt.powi(3)) + self.last_max_rate
227 }
228}
229
230fn cubic_throttle(rate_to_use: f64) -> f64 {
231 rate_to_use * BETA
232}
233
234#[derive(Clone, Debug, Default)]
235struct Builder {
236 token_refill_rate: Option<f64>,
238 maximum_bucket_capacity: Option<f64>,
240 current_bucket_capacity: Option<f64>,
242 time_of_last_refill: Option<f64>,
244 tokens_retrieved_per_second: Option<f64>,
246 previous_time_bucket: Option<f64>,
248 request_count: Option<u64>,
250 enable_throttling: Option<bool>,
252 tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
254 time_of_last_throttle: Option<f64>,
256}
257
258impl Builder {
259 fn new() -> Self {
260 Builder::default()
261 }
262 fn set_token_refill_rate(&mut self, token_refill_rate: Option<f64>) -> &mut Self {
264 self.token_refill_rate = token_refill_rate;
265 self
266 }
267 fn token_refill_rate(mut self, token_refill_rate: f64) -> Self {
269 self.token_refill_rate = Some(token_refill_rate);
270 self
271 }
272 fn set_maximum_bucket_capacity(&mut self, maximum_bucket_capacity: Option<f64>) -> &mut Self {
274 self.maximum_bucket_capacity = maximum_bucket_capacity;
275 self
276 }
277 fn maximum_bucket_capacity(mut self, maximum_bucket_capacity: f64) -> Self {
279 self.maximum_bucket_capacity = Some(maximum_bucket_capacity);
280 self
281 }
282 fn set_current_bucket_capacity(&mut self, current_bucket_capacity: Option<f64>) -> &mut Self {
284 self.current_bucket_capacity = current_bucket_capacity;
285 self
286 }
287 fn current_bucket_capacity(mut self, current_bucket_capacity: f64) -> Self {
289 self.current_bucket_capacity = Some(current_bucket_capacity);
290 self
291 }
292 fn set_time_of_last_refill(&mut self, time_of_last_refill: Option<f64>) -> &mut Self {
294 self.time_of_last_refill = time_of_last_refill;
295 self
296 }
297 fn time_of_last_refill(mut self, time_of_last_refill: f64) -> Self {
299 self.time_of_last_refill = Some(time_of_last_refill);
300 self
301 }
302 fn set_tokens_retrieved_per_second(
304 &mut self,
305 tokens_retrieved_per_second: Option<f64>,
306 ) -> &mut Self {
307 self.tokens_retrieved_per_second = tokens_retrieved_per_second;
308 self
309 }
310 fn tokens_retrieved_per_second(mut self, tokens_retrieved_per_second: f64) -> Self {
312 self.tokens_retrieved_per_second = Some(tokens_retrieved_per_second);
313 self
314 }
315 fn set_previous_time_bucket(&mut self, previous_time_bucket: Option<f64>) -> &mut Self {
317 self.previous_time_bucket = previous_time_bucket;
318 self
319 }
320 fn previous_time_bucket(mut self, previous_time_bucket: f64) -> Self {
322 self.previous_time_bucket = Some(previous_time_bucket);
323 self
324 }
325 fn set_request_count(&mut self, request_count: Option<u64>) -> &mut Self {
327 self.request_count = request_count;
328 self
329 }
330 fn request_count(mut self, request_count: u64) -> Self {
332 self.request_count = Some(request_count);
333 self
334 }
335 fn set_enable_throttling(&mut self, enable_throttling: Option<bool>) -> &mut Self {
337 self.enable_throttling = enable_throttling;
338 self
339 }
340 fn enable_throttling(mut self, enable_throttling: bool) -> Self {
342 self.enable_throttling = Some(enable_throttling);
343 self
344 }
345 fn set_tokens_retrieved_per_second_at_time_of_last_throttle(
347 &mut self,
348 tokens_retrieved_per_second_at_time_of_last_throttle: Option<f64>,
349 ) -> &mut Self {
350 self.tokens_retrieved_per_second_at_time_of_last_throttle =
351 tokens_retrieved_per_second_at_time_of_last_throttle;
352 self
353 }
354 fn tokens_retrieved_per_second_at_time_of_last_throttle(
356 mut self,
357 tokens_retrieved_per_second_at_time_of_last_throttle: f64,
358 ) -> Self {
359 self.tokens_retrieved_per_second_at_time_of_last_throttle =
360 Some(tokens_retrieved_per_second_at_time_of_last_throttle);
361 self
362 }
363 fn set_time_of_last_throttle(&mut self, time_of_last_throttle: Option<f64>) -> &mut Self {
365 self.time_of_last_throttle = time_of_last_throttle;
366 self
367 }
368 fn time_of_last_throttle(mut self, time_of_last_throttle: f64) -> Self {
370 self.time_of_last_throttle = Some(time_of_last_throttle);
371 self
372 }
373
374 fn build(self) -> ClientRateLimiter {
375 ClientRateLimiter {
376 inner: Arc::new(Mutex::new(Inner {
377 fill_rate: self.token_refill_rate.unwrap_or_default(),
378 max_capacity: self.maximum_bucket_capacity.unwrap_or(f64::MAX),
379 current_capacity: self.current_bucket_capacity.unwrap_or_default(),
380 last_timestamp: self.time_of_last_refill,
381 enabled: self.enable_throttling.unwrap_or_default(),
382 measured_tx_rate: self.tokens_retrieved_per_second.unwrap_or_default(),
383 last_tx_rate_bucket: self.previous_time_bucket.unwrap_or_default(),
384 request_count: self.request_count.unwrap_or_default(),
385 last_max_rate: self
386 .tokens_retrieved_per_second_at_time_of_last_throttle
387 .unwrap_or_default(),
388 time_of_last_throttle: self.time_of_last_throttle.unwrap_or_default(),
389 })),
390 }
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::{cubic_throttle, ClientRateLimiter};
397 use crate::client::retries::client_rate_limiter::RequestReason;
398 use approx::assert_relative_eq;
399 use aws_smithy_async::rt::sleep::AsyncSleep;
400 use aws_smithy_async::test_util::instant_time_and_sleep;
401 use std::time::{Duration, SystemTime};
402
403 const ONE_SECOND: Duration = Duration::from_secs(1);
404 const TWO_HUNDRED_MILLISECONDS: Duration = Duration::from_millis(200);
405
406 #[test]
407 fn should_match_beta_decrease() {
408 let new_rate = cubic_throttle(10.0);
409 assert_relative_eq!(new_rate, 7.0);
410
411 let rate_limiter = ClientRateLimiter::builder()
412 .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
413 .time_of_last_throttle(1.0)
414 .build();
415
416 rate_limiter.inner.lock().unwrap().calculate_time_window();
417 let new_rate = rate_limiter.inner.lock().unwrap().cubic_success(1.0);
418 assert_relative_eq!(new_rate, 7.0);
419 }
420
421 #[tokio::test]
422 async fn throttling_is_enabled_once_throttling_error_is_received() {
423 let rate_limiter = ClientRateLimiter::builder()
424 .previous_time_bucket(0.0)
425 .time_of_last_throttle(0.0)
426 .build();
427
428 assert!(
429 !rate_limiter.inner.lock().unwrap().enabled,
430 "rate_limiter should be disabled by default"
431 );
432 rate_limiter.update_rate_limiter(0.0, true);
433 assert!(
434 rate_limiter.inner.lock().unwrap().enabled,
435 "rate_limiter should be enabled after throttling error"
436 );
437 }
438
439 #[tokio::test]
440 async fn test_calculated_rate_with_successes() {
441 let rate_limiter = ClientRateLimiter::builder()
442 .time_of_last_throttle(5.0)
443 .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
444 .build();
445
446 struct Attempt {
447 seconds_since_unix_epoch: f64,
448 expected_calculated_rate: f64,
449 }
450
451 let attempts = [
452 Attempt {
453 seconds_since_unix_epoch: 5.0,
454 expected_calculated_rate: 7.0,
455 },
456 Attempt {
457 seconds_since_unix_epoch: 6.0,
458 expected_calculated_rate: 9.64893600966,
459 },
460 Attempt {
461 seconds_since_unix_epoch: 7.0,
462 expected_calculated_rate: 10.000030849917364,
463 },
464 Attempt {
465 seconds_since_unix_epoch: 8.0,
466 expected_calculated_rate: 10.453284520772092,
467 },
468 Attempt {
469 seconds_since_unix_epoch: 9.0,
470 expected_calculated_rate: 13.408697022224185,
471 },
472 Attempt {
473 seconds_since_unix_epoch: 10.0,
474 expected_calculated_rate: 21.26626835427364,
475 },
476 Attempt {
477 seconds_since_unix_epoch: 11.0,
478 expected_calculated_rate: 36.425998516920465,
479 },
480 ];
481
482 for attempt in attempts {
486 rate_limiter.inner.lock().unwrap().calculate_time_window();
487 let calculated_rate = rate_limiter
488 .inner
489 .lock()
490 .unwrap()
491 .cubic_success(attempt.seconds_since_unix_epoch);
492
493 assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
494 }
495 }
496
497 #[tokio::test]
498 async fn test_calculated_rate_with_throttles() {
499 let rate_limiter = ClientRateLimiter::builder()
500 .tokens_retrieved_per_second_at_time_of_last_throttle(10.0)
501 .time_of_last_throttle(5.0)
502 .build();
503
504 struct Attempt {
505 throttled: bool,
506 seconds_since_unix_epoch: f64,
507 expected_calculated_rate: f64,
508 }
509
510 let attempts = [
511 Attempt {
512 throttled: false,
513 seconds_since_unix_epoch: 5.0,
514 expected_calculated_rate: 7.0,
515 },
516 Attempt {
517 throttled: false,
518 seconds_since_unix_epoch: 6.0,
519 expected_calculated_rate: 9.64893600966,
520 },
521 Attempt {
522 throttled: true,
523 seconds_since_unix_epoch: 7.0,
524 expected_calculated_rate: 6.754255206761999,
525 },
526 Attempt {
527 throttled: true,
528 seconds_since_unix_epoch: 8.0,
529 expected_calculated_rate: 4.727978644733399,
530 },
531 Attempt {
532 throttled: false,
533 seconds_since_unix_epoch: 9.0,
534 expected_calculated_rate: 4.670125557970046,
535 },
536 Attempt {
537 throttled: false,
538 seconds_since_unix_epoch: 10.0,
539 expected_calculated_rate: 4.770870456867401,
540 },
541 Attempt {
542 throttled: false,
543 seconds_since_unix_epoch: 11.0,
544 expected_calculated_rate: 6.011819748005445,
545 },
546 Attempt {
547 throttled: false,
548 seconds_since_unix_epoch: 12.0,
549 expected_calculated_rate: 10.792973431384178,
550 },
551 ];
552
553 let mut calculated_rate = 0.0;
557 for attempt in attempts {
558 let mut inner = rate_limiter.inner.lock().unwrap();
559 inner.calculate_time_window();
560 if attempt.throttled {
561 calculated_rate = cubic_throttle(calculated_rate);
562 inner.time_of_last_throttle = attempt.seconds_since_unix_epoch;
563 inner.last_max_rate = calculated_rate;
564 } else {
565 calculated_rate = inner.cubic_success(attempt.seconds_since_unix_epoch);
566 };
567
568 assert_relative_eq!(attempt.expected_calculated_rate, calculated_rate);
569 }
570 }
571
572 #[tokio::test]
573 async fn test_client_sending_rates() {
574 let (_, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
575 let rate_limiter = ClientRateLimiter::builder().build();
576
577 struct Attempt {
578 throttled: bool,
579 seconds_since_unix_epoch: f64,
580 expected_tokens_retrieved_per_second: f64,
581 expected_token_refill_rate: f64,
582 }
583
584 let attempts = [
585 Attempt {
586 throttled: false,
587 seconds_since_unix_epoch: 0.2,
588 expected_tokens_retrieved_per_second: 0.000000,
589 expected_token_refill_rate: 0.500000,
590 },
591 Attempt {
592 throttled: false,
593 seconds_since_unix_epoch: 0.4,
594 expected_tokens_retrieved_per_second: 0.000000,
595 expected_token_refill_rate: 0.500000,
596 },
597 Attempt {
598 throttled: false,
599 seconds_since_unix_epoch: 0.6,
600 expected_tokens_retrieved_per_second: 4.800000000000001,
601 expected_token_refill_rate: 0.500000,
602 },
603 Attempt {
604 throttled: false,
605 seconds_since_unix_epoch: 0.8,
606 expected_tokens_retrieved_per_second: 4.800000000000001,
607 expected_token_refill_rate: 0.500000,
608 },
609 Attempt {
610 throttled: false,
611 seconds_since_unix_epoch: 1.0,
612 expected_tokens_retrieved_per_second: 4.160000,
613 expected_token_refill_rate: 0.500000,
614 },
615 Attempt {
616 throttled: false,
617 seconds_since_unix_epoch: 1.2,
618 expected_tokens_retrieved_per_second: 4.160000,
619 expected_token_refill_rate: 0.691200,
620 },
621 Attempt {
622 throttled: false,
623 seconds_since_unix_epoch: 1.4,
624 expected_tokens_retrieved_per_second: 4.160000,
625 expected_token_refill_rate: 1.0975999999999997,
626 },
627 Attempt {
628 throttled: false,
629 seconds_since_unix_epoch: 1.6,
630 expected_tokens_retrieved_per_second: 5.632000000000001,
631 expected_token_refill_rate: 1.6384000000000005,
632 },
633 Attempt {
634 throttled: false,
635 seconds_since_unix_epoch: 1.8,
636 expected_tokens_retrieved_per_second: 5.632000000000001,
637 expected_token_refill_rate: 2.332800,
638 },
639 Attempt {
640 throttled: true,
641 seconds_since_unix_epoch: 2.0,
642 expected_tokens_retrieved_per_second: 4.326400,
643 expected_token_refill_rate: 3.0284799999999996,
644 },
645 Attempt {
646 throttled: false,
647 seconds_since_unix_epoch: 2.2,
648 expected_tokens_retrieved_per_second: 4.326400,
649 expected_token_refill_rate: 3.48663917347026,
650 },
651 Attempt {
652 throttled: false,
653 seconds_since_unix_epoch: 2.4,
654 expected_tokens_retrieved_per_second: 4.326400,
655 expected_token_refill_rate: 3.821874416040255,
656 },
657 Attempt {
658 throttled: false,
659 seconds_since_unix_epoch: 2.6,
660 expected_tokens_retrieved_per_second: 5.665280,
661 expected_token_refill_rate: 4.053385727709987,
662 },
663 Attempt {
664 throttled: false,
665 seconds_since_unix_epoch: 2.8,
666 expected_tokens_retrieved_per_second: 5.665280,
667 expected_token_refill_rate: 4.200373108479454,
668 },
669 Attempt {
670 throttled: false,
671 seconds_since_unix_epoch: 3.0,
672 expected_tokens_retrieved_per_second: 4.333056,
673 expected_token_refill_rate: 4.282036558348658,
674 },
675 Attempt {
676 throttled: true,
677 seconds_since_unix_epoch: 3.2,
678 expected_tokens_retrieved_per_second: 4.333056,
679 expected_token_refill_rate: 2.99742559084406,
680 },
681 Attempt {
682 throttled: false,
683 seconds_since_unix_epoch: 3.4,
684 expected_tokens_retrieved_per_second: 4.333056,
685 expected_token_refill_rate: 3.4522263943863463,
686 },
687 ];
688
689 for attempt in attempts {
690 sleep_impl.sleep(TWO_HUNDRED_MILLISECONDS).await;
691 assert_eq!(
692 attempt.seconds_since_unix_epoch,
693 sleep_impl.total_duration().as_secs_f64()
694 );
695
696 rate_limiter.update_rate_limiter(attempt.seconds_since_unix_epoch, attempt.throttled);
697 assert_relative_eq!(
698 attempt.expected_tokens_retrieved_per_second,
699 rate_limiter.inner.lock().unwrap().measured_tx_rate
700 );
701 assert_relative_eq!(
702 attempt.expected_token_refill_rate,
703 rate_limiter.inner.lock().unwrap().fill_rate
704 );
705 }
706 }
707
708 #[tokio::test]
714 async fn test_when_throttling_is_enabled_requests_can_still_be_sent() {
715 let (time_source, sleep_impl) = instant_time_and_sleep(SystemTime::UNIX_EPOCH);
716 let crl = ClientRateLimiter::builder()
717 .time_of_last_throttle(0.0)
718 .previous_time_bucket(0.0)
719 .build();
720
721 crl.update_rate_limiter(0.0, true);
723
724 for _i in 0..100 {
725 let duration = Duration::from_secs_f64(fastrand::f64());
727 sleep_impl.sleep(duration).await;
728 if let Err(delay) = crl.acquire_permission_to_send_a_request(
729 time_source.seconds_since_unix_epoch(),
730 RequestReason::InitialRequest,
731 ) {
732 sleep_impl.sleep(delay).await;
733 }
734
735 crl.update_rate_limiter(time_source.seconds_since_unix_epoch(), false);
737 }
738
739 let inner = crl.inner.lock().unwrap();
740 assert!(inner.enabled, "the rate limiter should still be enabled");
741 assert_relative_eq!(
743 inner.last_timestamp.unwrap(),
744 sleep_impl.total_duration().as_secs_f64(),
745 max_relative = 0.0001
746 );
747 }
748}