1use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::OwnedSemaphorePermit;
10use tracing::{debug, trace};
11
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::interceptors::context::{
14 BeforeTransmitInterceptorContextMut, InterceptorContext,
15};
16use aws_smithy_runtime_api::client::interceptors::Intercept;
17use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
18use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
19use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
20use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
21use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};
22
23use crate::client::retries::classifiers::run_classifiers_on_ctx;
24use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
25use crate::client::retries::strategy::standard::ReleaseResult::{
26 APermitWasReleased, NoPermitWasReleased,
27};
28use crate::client::retries::token_bucket::TokenBucket;
29use crate::client::retries::{ClientRateLimiterPartition, RetryPartition};
30use crate::static_partition_map::StaticPartitionMap;
31
32static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
33 StaticPartitionMap::new();
34
35static TOKEN_BUCKET: StaticPartitionMap<RetryPartition, TokenBucket> = StaticPartitionMap::new();
37
38#[derive(Debug, Default)]
40pub struct StandardRetryStrategy {
41 retry_permit: Mutex<Option<OwnedSemaphorePermit>>,
42}
43
44impl Storable for StandardRetryStrategy {
45 type Storer = StoreReplace<Self>;
46}
47
48impl StandardRetryStrategy {
49 pub fn new() -> Self {
51 Default::default()
52 }
53
54 fn release_retry_permit(&self) -> ReleaseResult {
55 let mut retry_permit = self.retry_permit.lock().unwrap();
56 match retry_permit.take() {
57 Some(p) => {
58 drop(p);
59 APermitWasReleased
60 }
61 None => NoPermitWasReleased,
62 }
63 }
64
65 fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
66 let mut old_retry_permit = self.retry_permit.lock().unwrap();
67 if let Some(p) = old_retry_permit.replace(new_retry_permit) {
68 p.forget()
71 }
72 }
73
74 fn adaptive_retry_rate_limiter(
76 runtime_components: &RuntimeComponents,
77 cfg: &ConfigBag,
78 ) -> Option<ClientRateLimiter> {
79 let retry_config = cfg.load::<RetryConfig>().expect("retry config is required");
80 if retry_config.mode() == RetryMode::Adaptive {
81 if let Some(time_source) = runtime_components.time_source() {
82 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
83 let seconds_since_unix_epoch = time_source
84 .now()
85 .duration_since(SystemTime::UNIX_EPOCH)
86 .expect("the present takes place after the UNIX_EPOCH")
87 .as_secs_f64();
88 let client_rate_limiter_partition =
89 ClientRateLimiterPartition::new(retry_partition.clone());
90 let client_rate_limiter = CLIENT_RATE_LIMITER
91 .get_or_init(client_rate_limiter_partition, || {
92 ClientRateLimiter::new(seconds_since_unix_epoch)
93 });
94 return Some(client_rate_limiter);
95 }
96 }
97 None
98 }
99
100 fn calculate_backoff(
101 &self,
102 runtime_components: &RuntimeComponents,
103 cfg: &ConfigBag,
104 retry_cfg: &RetryConfig,
105 retry_reason: &RetryAction,
106 ) -> Result<Duration, ShouldAttempt> {
107 let request_attempts = cfg
108 .load::<RequestAttempts>()
109 .expect("at least one request attempt is made before any retry is attempted")
110 .attempts();
111
112 match retry_reason {
113 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, retry_after }) => {
114 if let Some(delay) = *retry_after {
115 let delay = delay.min(retry_cfg.max_backoff());
116 debug!("explicit request from server to delay {delay:?} before retrying");
117 Ok(delay)
118 } else if let Some(delay) =
119 check_rate_limiter_for_delay(runtime_components, cfg, *kind)
120 {
121 let delay = delay.min(retry_cfg.max_backoff());
122 debug!("rate limiter has requested a {delay:?} delay before retrying");
123 Ok(delay)
124 } else {
125 let base = if retry_cfg.use_static_exponential_base() {
126 1.0
127 } else {
128 fastrand::f64()
129 };
130 Ok(calculate_exponential_backoff(
131 base,
133 retry_cfg.initial_backoff().as_secs_f64(),
135 request_attempts - 1,
138 retry_cfg.max_backoff(),
140 ))
141 }
142 }
143 RetryAction::RetryForbidden | RetryAction::NoActionIndicated => {
144 debug!(
145 attempts = request_attempts,
146 max_attempts = retry_cfg.max_attempts(),
147 "encountered un-retryable error"
148 );
149 Err(ShouldAttempt::No)
150 }
151 _ => unreachable!("RetryAction is non-exhaustive"),
152 }
153 }
154}
155
156enum ReleaseResult {
157 APermitWasReleased,
158 NoPermitWasReleased,
159}
160
161impl RetryStrategy for StandardRetryStrategy {
162 fn should_attempt_initial_request(
163 &self,
164 runtime_components: &RuntimeComponents,
165 cfg: &ConfigBag,
166 ) -> Result<ShouldAttempt, BoxError> {
167 if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) {
168 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
169 if let Err(delay) = crl.acquire_permission_to_send_a_request(
170 seconds_since_unix_epoch,
171 RequestReason::InitialRequest,
172 ) {
173 return Ok(ShouldAttempt::YesAfterDelay(delay));
174 }
175 } else {
176 debug!("no client rate limiter configured, so no token is required for the initial request.");
177 }
178
179 Ok(ShouldAttempt::Yes)
180 }
181
182 fn should_attempt_retry(
183 &self,
184 ctx: &InterceptorContext,
185 runtime_components: &RuntimeComponents,
186 cfg: &ConfigBag,
187 ) -> Result<ShouldAttempt, BoxError> {
188 let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
189
190 let token_bucket = cfg.load::<TokenBucket>().expect("token bucket is required");
192 let retry_classifiers = runtime_components.retry_classifiers();
194 let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);
195
196 let error_kind = error_kind(&classifier_result);
201 let is_throttling_error = error_kind
202 .map(|kind| kind == ErrorKind::ThrottlingError)
203 .unwrap_or(false);
204 update_rate_limiter_if_exists(runtime_components, cfg, is_throttling_error);
205
206 if !ctx.is_failed() {
208 if let NoPermitWasReleased = self.release_retry_permit() {
209 token_bucket.regenerate_a_token();
214 }
215 }
216 let request_attempts = cfg
219 .load::<RequestAttempts>()
220 .expect("at least one request attempt is made before any retry is attempted")
221 .attempts();
222
223 if !classifier_result.should_retry() {
225 debug!(
226 "attempt #{request_attempts} classified as {:?}, not retrying",
227 classifier_result
228 );
229 return Ok(ShouldAttempt::No);
230 }
231
232 if request_attempts >= retry_cfg.max_attempts() {
234 debug!(
235 attempts = request_attempts,
236 max_attempts = retry_cfg.max_attempts(),
237 "not retrying because we are out of attempts"
238 );
239 return Ok(ShouldAttempt::No);
240 }
241
242 let error_kind = error_kind.expect("result was classified retryable");
244 match token_bucket.acquire(&error_kind) {
245 Some(permit) => self.set_retry_permit(permit),
246 None => {
247 debug!("attempt #{request_attempts} failed with {error_kind:?}; However, not enough retry quota is available for another attempt so no retry will be attempted.");
248 return Ok(ShouldAttempt::No);
249 }
250 }
251
252 let backoff =
254 match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
255 Ok(value) => value,
256 Err(value) => return Ok(value),
258 };
259
260 debug!(
261 "attempt #{request_attempts} failed with {:?}; retrying after {:?}",
262 classifier_result, backoff
263 );
264 Ok(ShouldAttempt::YesAfterDelay(backoff))
265 }
266}
267
268fn error_kind(classifier_result: &RetryAction) -> Option<ErrorKind> {
270 match classifier_result {
271 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, .. }) => Some(*kind),
272 _ => None,
273 }
274}
275
276fn update_rate_limiter_if_exists(
277 runtime_components: &RuntimeComponents,
278 cfg: &ConfigBag,
279 is_throttling_error: bool,
280) {
281 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
282 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
283 crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error);
284 }
285}
286
287fn check_rate_limiter_for_delay(
288 runtime_components: &RuntimeComponents,
289 cfg: &ConfigBag,
290 kind: ErrorKind,
291) -> Option<Duration> {
292 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
293 let retry_reason = if kind == ErrorKind::ThrottlingError {
294 RequestReason::RetryTimeout
295 } else {
296 RequestReason::Retry
297 };
298 if let Err(delay) = crl.acquire_permission_to_send_a_request(
299 get_seconds_since_unix_epoch(runtime_components),
300 retry_reason,
301 ) {
302 return Some(delay);
303 }
304 }
305
306 None
307}
308
309fn calculate_exponential_backoff(
310 base: f64,
311 initial_backoff: f64,
312 retry_attempts: u32,
313 max_backoff: Duration,
314) -> Duration {
315 let result = match 2_u32
316 .checked_pow(retry_attempts)
317 .map(|power| (power as f64) * initial_backoff)
318 {
319 Some(backoff) => match Duration::try_from_secs_f64(backoff) {
320 Ok(result) => result.min(max_backoff),
321 Err(e) => {
322 tracing::warn!("falling back to {max_backoff:?} as `Duration` could not be created for exponential backoff: {e}");
323 max_backoff
324 }
325 },
326 None => max_backoff,
327 };
328
329 result.mul_f64(base)
332}
333
334fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {
335 let request_time = runtime_components
336 .time_source()
337 .expect("time source required for retries");
338 request_time
339 .now()
340 .duration_since(SystemTime::UNIX_EPOCH)
341 .unwrap()
342 .as_secs_f64()
343}
344
345#[derive(Debug)]
349pub(crate) struct TokenBucketProvider {
350 default_partition: RetryPartition,
351 token_bucket: TokenBucket,
352}
353
354impl TokenBucketProvider {
355 pub(crate) fn new(default_partition: RetryPartition) -> Self {
360 let token_bucket = TOKEN_BUCKET.get_or_init_default(default_partition.clone());
361 Self {
362 default_partition,
363 token_bucket,
364 }
365 }
366}
367
368impl Intercept for TokenBucketProvider {
369 fn name(&self) -> &'static str {
370 "TokenBucketProvider"
371 }
372
373 fn modify_before_retry_loop(
374 &self,
375 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
376 _runtime_components: &RuntimeComponents,
377 cfg: &mut ConfigBag,
378 ) -> Result<(), BoxError> {
379 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
380
381 let tb = if *retry_partition != self.default_partition {
385 TOKEN_BUCKET.get_or_init_default(retry_partition.clone())
386 } else {
387 self.token_bucket.clone()
389 };
390
391 trace!("token bucket for {retry_partition:?} added to config bag");
392 let mut layer = Layer::new("token_bucket_partition");
393 layer.store_put(tb);
394 cfg.push_layer(layer);
395 Ok(())
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 #[allow(unused_imports)] use std::fmt;
403 use std::sync::Mutex;
404 use std::time::Duration;
405
406 use aws_smithy_runtime_api::client::interceptors::context::{
407 Input, InterceptorContext, Output,
408 };
409 use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
410 use aws_smithy_runtime_api::client::retries::classifiers::{
411 ClassifyRetry, RetryAction, SharedRetryClassifier,
412 };
413 use aws_smithy_runtime_api::client::retries::{
414 AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
415 };
416 use aws_smithy_runtime_api::client::runtime_components::{
417 RuntimeComponents, RuntimeComponentsBuilder,
418 };
419 use aws_smithy_types::config_bag::{ConfigBag, Layer};
420 use aws_smithy_types::retry::{ErrorKind, RetryConfig};
421
422 use super::{calculate_exponential_backoff, StandardRetryStrategy};
423 use crate::client::retries::TokenBucket;
424
425 #[test]
426 fn no_retry_necessary_for_ok_result() {
427 let cfg = ConfigBag::of_layers(vec![{
428 let mut layer = Layer::new("test");
429 layer.store_put(RetryConfig::standard());
430 layer.store_put(RequestAttempts::new(1));
431 layer.store_put(TokenBucket::default());
432 layer
433 }]);
434 let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
435 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
436 let strategy = StandardRetryStrategy::default();
437 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
438
439 let actual = strategy
440 .should_attempt_retry(&ctx, &rc, &cfg)
441 .expect("method is infallible for this use");
442 assert_eq!(ShouldAttempt::No, actual);
443 }
444
445 fn set_up_cfg_and_context(
446 error_kind: ErrorKind,
447 current_request_attempts: u32,
448 retry_config: RetryConfig,
449 ) -> (InterceptorContext, RuntimeComponents, ConfigBag) {
450 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
451 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
452 let rc = RuntimeComponentsBuilder::for_tests()
453 .with_retry_classifier(SharedRetryClassifier::new(AlwaysRetry(error_kind)))
454 .build()
455 .unwrap();
456 let mut layer = Layer::new("test");
457 layer.store_put(RequestAttempts::new(current_request_attempts));
458 layer.store_put(retry_config);
459 layer.store_put(TokenBucket::default());
460 let cfg = ConfigBag::of_layers(vec![layer]);
461
462 (ctx, rc, cfg)
463 }
464
465 fn test_should_retry_error_kind(error_kind: ErrorKind) {
468 let (ctx, rc, cfg) = set_up_cfg_and_context(
469 error_kind,
470 3,
471 RetryConfig::standard()
472 .with_use_static_exponential_base(true)
473 .with_max_attempts(4),
474 );
475 let strategy = StandardRetryStrategy::new();
476 let actual = strategy
477 .should_attempt_retry(&ctx, &rc, &cfg)
478 .expect("method is infallible for this use");
479 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(4)), actual);
480 }
481
482 #[test]
483 fn should_retry_transient_error_result_after_2s() {
484 test_should_retry_error_kind(ErrorKind::TransientError);
485 }
486
487 #[test]
488 fn should_retry_client_error_result_after_2s() {
489 test_should_retry_error_kind(ErrorKind::ClientError);
490 }
491
492 #[test]
493 fn should_retry_server_error_result_after_2s() {
494 test_should_retry_error_kind(ErrorKind::ServerError);
495 }
496
497 #[test]
498 fn should_retry_throttling_error_result_after_2s() {
499 test_should_retry_error_kind(ErrorKind::ThrottlingError);
500 }
501
502 #[test]
503 fn dont_retry_when_out_of_attempts() {
504 let current_attempts = 4;
505 let max_attempts = current_attempts;
506 let (ctx, rc, cfg) = set_up_cfg_and_context(
507 ErrorKind::TransientError,
508 current_attempts,
509 RetryConfig::standard()
510 .with_use_static_exponential_base(true)
511 .with_max_attempts(max_attempts),
512 );
513 let strategy = StandardRetryStrategy::new();
514 let actual = strategy
515 .should_attempt_retry(&ctx, &rc, &cfg)
516 .expect("method is infallible for this use");
517 assert_eq!(ShouldAttempt::No, actual);
518 }
519
520 #[test]
521 fn should_not_panic_when_exponential_backoff_duration_could_not_be_created() {
522 let (ctx, rc, cfg) = set_up_cfg_and_context(
523 ErrorKind::TransientError,
524 33,
526 RetryConfig::standard()
527 .with_use_static_exponential_base(true)
528 .with_max_attempts(100), );
530 let strategy = StandardRetryStrategy::new();
531 let actual = strategy
532 .should_attempt_retry(&ctx, &rc, &cfg)
533 .expect("method is infallible for this use");
534 assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
535 }
536
537 #[allow(dead_code)] #[derive(Debug)]
539 struct PresetReasonRetryClassifier {
540 retry_actions: Mutex<Vec<RetryAction>>,
541 }
542
543 #[cfg(feature = "test-util")]
544 impl PresetReasonRetryClassifier {
545 fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
546 retry_reasons.reverse();
548 Self {
549 retry_actions: Mutex::new(retry_reasons),
550 }
551 }
552 }
553
554 impl ClassifyRetry for PresetReasonRetryClassifier {
555 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
556 let output_or_error = ctx.output_or_error();
558 match output_or_error {
560 Some(Ok(_)) | None => return RetryAction::NoActionIndicated,
561 _ => (),
562 };
563
564 let mut retry_actions = self.retry_actions.lock().unwrap();
565 if retry_actions.len() == 1 {
566 retry_actions.first().unwrap().clone()
567 } else {
568 retry_actions.pop().unwrap()
569 }
570 }
571
572 fn name(&self) -> &'static str {
573 "Always returns a preset retry reason"
574 }
575 }
576
577 #[cfg(feature = "test-util")]
578 fn setup_test(
579 retry_reasons: Vec<RetryAction>,
580 retry_config: RetryConfig,
581 ) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
582 let rc = RuntimeComponentsBuilder::for_tests()
583 .with_retry_classifier(SharedRetryClassifier::new(
584 PresetReasonRetryClassifier::new(retry_reasons),
585 ))
586 .build()
587 .unwrap();
588 let mut layer = Layer::new("test");
589 layer.store_put(retry_config);
590 let cfg = ConfigBag::of_layers(vec![layer]);
591 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
592 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
594
595 (cfg, rc, ctx)
596 }
597
598 #[cfg(feature = "test-util")]
599 #[test]
600 fn eventual_success() {
601 let (mut cfg, rc, mut ctx) = setup_test(
602 vec![RetryAction::server_error()],
603 RetryConfig::standard()
604 .with_use_static_exponential_base(true)
605 .with_max_attempts(5),
606 );
607 let strategy = StandardRetryStrategy::new();
608 cfg.interceptor_state().store_put(TokenBucket::default());
609 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
610
611 cfg.interceptor_state().store_put(RequestAttempts::new(1));
612 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
613 let dur = should_retry.expect_delay();
614 assert_eq!(dur, Duration::from_secs(1));
615 assert_eq!(token_bucket.available_permits(), 495);
616
617 cfg.interceptor_state().store_put(RequestAttempts::new(2));
618 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
619 let dur = should_retry.expect_delay();
620 assert_eq!(dur, Duration::from_secs(2));
621 assert_eq!(token_bucket.available_permits(), 490);
622
623 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
624
625 cfg.interceptor_state().store_put(RequestAttempts::new(3));
626 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
627 assert_eq!(no_retry, ShouldAttempt::No);
628 assert_eq!(token_bucket.available_permits(), 495);
629 }
630
631 #[cfg(feature = "test-util")]
632 #[test]
633 fn no_more_attempts() {
634 let (mut cfg, rc, ctx) = setup_test(
635 vec![RetryAction::server_error()],
636 RetryConfig::standard()
637 .with_use_static_exponential_base(true)
638 .with_max_attempts(3),
639 );
640 let strategy = StandardRetryStrategy::new();
641 cfg.interceptor_state().store_put(TokenBucket::default());
642 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
643
644 cfg.interceptor_state().store_put(RequestAttempts::new(1));
645 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
646 let dur = should_retry.expect_delay();
647 assert_eq!(dur, Duration::from_secs(1));
648 assert_eq!(token_bucket.available_permits(), 495);
649
650 cfg.interceptor_state().store_put(RequestAttempts::new(2));
651 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
652 let dur = should_retry.expect_delay();
653 assert_eq!(dur, Duration::from_secs(2));
654 assert_eq!(token_bucket.available_permits(), 490);
655
656 cfg.interceptor_state().store_put(RequestAttempts::new(3));
657 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
658 assert_eq!(no_retry, ShouldAttempt::No);
659 assert_eq!(token_bucket.available_permits(), 490);
660 }
661
662 #[cfg(feature = "test-util")]
663 #[test]
664 fn successful_request_and_deser_should_be_retryable() {
665 #[derive(Clone, Copy, Debug)]
666 enum LongRunningOperationStatus {
667 Running,
668 Complete,
669 }
670
671 #[derive(Debug)]
672 struct LongRunningOperationOutput {
673 status: Option<LongRunningOperationStatus>,
674 }
675
676 impl LongRunningOperationOutput {
677 fn status(&self) -> Option<LongRunningOperationStatus> {
678 self.status
679 }
680 }
681
682 struct WaiterRetryClassifier {}
683
684 impl WaiterRetryClassifier {
685 fn new() -> Self {
686 WaiterRetryClassifier {}
687 }
688 }
689
690 impl fmt::Debug for WaiterRetryClassifier {
691 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
692 write!(f, "WaiterRetryClassifier")
693 }
694 }
695 impl ClassifyRetry for WaiterRetryClassifier {
696 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
697 let status: Option<LongRunningOperationStatus> =
698 ctx.output_or_error().and_then(|res| {
699 res.ok().and_then(|output| {
700 output
701 .downcast_ref::<LongRunningOperationOutput>()
702 .and_then(|output| output.status())
703 })
704 });
705
706 if let Some(LongRunningOperationStatus::Running) = status {
707 return RetryAction::server_error();
708 };
709
710 RetryAction::NoActionIndicated
711 }
712
713 fn name(&self) -> &'static str {
714 "waiter retry classifier"
715 }
716 }
717
718 let retry_config = RetryConfig::standard()
719 .with_use_static_exponential_base(true)
720 .with_max_attempts(5);
721
722 let rc = RuntimeComponentsBuilder::for_tests()
723 .with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
724 .build()
725 .unwrap();
726 let mut layer = Layer::new("test");
727 layer.store_put(retry_config);
728 let mut cfg = ConfigBag::of_layers(vec![layer]);
729 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
730 let strategy = StandardRetryStrategy::new();
731
732 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
733 status: Some(LongRunningOperationStatus::Running),
734 })));
735
736 cfg.interceptor_state().store_put(TokenBucket::new(5));
737 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
738
739 cfg.interceptor_state().store_put(RequestAttempts::new(1));
740 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
741 let dur = should_retry.expect_delay();
742 assert_eq!(dur, Duration::from_secs(1));
743 assert_eq!(token_bucket.available_permits(), 0);
744
745 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
746 status: Some(LongRunningOperationStatus::Complete),
747 })));
748 cfg.interceptor_state().store_put(RequestAttempts::new(2));
749 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
750 should_retry.expect_no();
751 assert_eq!(token_bucket.available_permits(), 5);
752 }
753
754 #[cfg(feature = "test-util")]
755 #[test]
756 fn no_quota() {
757 let (mut cfg, rc, ctx) = setup_test(
758 vec![RetryAction::server_error()],
759 RetryConfig::standard()
760 .with_use_static_exponential_base(true)
761 .with_max_attempts(5),
762 );
763 let strategy = StandardRetryStrategy::new();
764 cfg.interceptor_state().store_put(TokenBucket::new(5));
765 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
766
767 cfg.interceptor_state().store_put(RequestAttempts::new(1));
768 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
769 let dur = should_retry.expect_delay();
770 assert_eq!(dur, Duration::from_secs(1));
771 assert_eq!(token_bucket.available_permits(), 0);
772
773 cfg.interceptor_state().store_put(RequestAttempts::new(2));
774 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
775 assert_eq!(no_retry, ShouldAttempt::No);
776 assert_eq!(token_bucket.available_permits(), 0);
777 }
778
779 #[cfg(feature = "test-util")]
780 #[test]
781 fn quota_replenishes_on_success() {
782 let (mut cfg, rc, mut ctx) = setup_test(
783 vec![
784 RetryAction::transient_error(),
785 RetryAction::retryable_error_with_explicit_delay(
786 ErrorKind::TransientError,
787 Duration::from_secs(1),
788 ),
789 ],
790 RetryConfig::standard()
791 .with_use_static_exponential_base(true)
792 .with_max_attempts(5),
793 );
794 let strategy = StandardRetryStrategy::new();
795 cfg.interceptor_state().store_put(TokenBucket::new(100));
796 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
797
798 cfg.interceptor_state().store_put(RequestAttempts::new(1));
799 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
800 let dur = should_retry.expect_delay();
801 assert_eq!(dur, Duration::from_secs(1));
802 assert_eq!(token_bucket.available_permits(), 90);
803
804 cfg.interceptor_state().store_put(RequestAttempts::new(2));
805 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
806 let dur = should_retry.expect_delay();
807 assert_eq!(dur, Duration::from_secs(1));
808 assert_eq!(token_bucket.available_permits(), 80);
809
810 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
811
812 cfg.interceptor_state().store_put(RequestAttempts::new(3));
813 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
814 assert_eq!(no_retry, ShouldAttempt::No);
815
816 assert_eq!(token_bucket.available_permits(), 90);
817 }
818
819 #[cfg(feature = "test-util")]
820 #[test]
821 fn quota_replenishes_on_first_try_success() {
822 const PERMIT_COUNT: usize = 20;
823 let (mut cfg, rc, mut ctx) = setup_test(
824 vec![RetryAction::transient_error()],
825 RetryConfig::standard()
826 .with_use_static_exponential_base(true)
827 .with_max_attempts(u32::MAX),
828 );
829 let strategy = StandardRetryStrategy::new();
830 cfg.interceptor_state()
831 .store_put(TokenBucket::new(PERMIT_COUNT));
832 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
833
834 let mut attempt = 1;
835
836 while token_bucket.available_permits() > 0 {
838 if attempt > 2 {
840 panic!("This test should have completed by now (drain)");
841 }
842
843 cfg.interceptor_state()
844 .store_put(RequestAttempts::new(attempt));
845 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
846 assert!(matches!(should_retry, ShouldAttempt::YesAfterDelay(_)));
847 attempt += 1;
848 }
849
850 let permit = strategy.retry_permit.lock().unwrap().take().unwrap();
852 permit.forget();
853
854 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
855
856 while token_bucket.available_permits() < PERMIT_COUNT {
858 if attempt > 23 {
859 panic!("This test should have completed by now (fill-up)");
860 }
861
862 cfg.interceptor_state()
863 .store_put(RequestAttempts::new(attempt));
864 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
865 assert_eq!(no_retry, ShouldAttempt::No);
866 attempt += 1;
867 }
868
869 assert_eq!(attempt, 23);
870 assert_eq!(token_bucket.available_permits(), PERMIT_COUNT);
871 }
872
873 #[cfg(feature = "test-util")]
874 #[test]
875 fn backoff_timing() {
876 let (mut cfg, rc, ctx) = setup_test(
877 vec![RetryAction::server_error()],
878 RetryConfig::standard()
879 .with_use_static_exponential_base(true)
880 .with_max_attempts(5),
881 );
882 let strategy = StandardRetryStrategy::new();
883 cfg.interceptor_state().store_put(TokenBucket::default());
884 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
885
886 cfg.interceptor_state().store_put(RequestAttempts::new(1));
887 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
888 let dur = should_retry.expect_delay();
889 assert_eq!(dur, Duration::from_secs(1));
890 assert_eq!(token_bucket.available_permits(), 495);
891
892 cfg.interceptor_state().store_put(RequestAttempts::new(2));
893 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
894 let dur = should_retry.expect_delay();
895 assert_eq!(dur, Duration::from_secs(2));
896 assert_eq!(token_bucket.available_permits(), 490);
897
898 cfg.interceptor_state().store_put(RequestAttempts::new(3));
899 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
900 let dur = should_retry.expect_delay();
901 assert_eq!(dur, Duration::from_secs(4));
902 assert_eq!(token_bucket.available_permits(), 485);
903
904 cfg.interceptor_state().store_put(RequestAttempts::new(4));
905 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
906 let dur = should_retry.expect_delay();
907 assert_eq!(dur, Duration::from_secs(8));
908 assert_eq!(token_bucket.available_permits(), 480);
909
910 cfg.interceptor_state().store_put(RequestAttempts::new(5));
911 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
912 assert_eq!(no_retry, ShouldAttempt::No);
913 assert_eq!(token_bucket.available_permits(), 480);
914 }
915
916 #[cfg(feature = "test-util")]
917 #[test]
918 fn max_backoff_time() {
919 let (mut cfg, rc, ctx) = setup_test(
920 vec![RetryAction::server_error()],
921 RetryConfig::standard()
922 .with_use_static_exponential_base(true)
923 .with_max_attempts(5)
924 .with_initial_backoff(Duration::from_secs(1))
925 .with_max_backoff(Duration::from_secs(3)),
926 );
927 let strategy = StandardRetryStrategy::new();
928 cfg.interceptor_state().store_put(TokenBucket::default());
929 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
930
931 cfg.interceptor_state().store_put(RequestAttempts::new(1));
932 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
933 let dur = should_retry.expect_delay();
934 assert_eq!(dur, Duration::from_secs(1));
935 assert_eq!(token_bucket.available_permits(), 495);
936
937 cfg.interceptor_state().store_put(RequestAttempts::new(2));
938 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
939 let dur = should_retry.expect_delay();
940 assert_eq!(dur, Duration::from_secs(2));
941 assert_eq!(token_bucket.available_permits(), 490);
942
943 cfg.interceptor_state().store_put(RequestAttempts::new(3));
944 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
945 let dur = should_retry.expect_delay();
946 assert_eq!(dur, Duration::from_secs(3));
947 assert_eq!(token_bucket.available_permits(), 485);
948
949 cfg.interceptor_state().store_put(RequestAttempts::new(4));
950 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
951 let dur = should_retry.expect_delay();
952 assert_eq!(dur, Duration::from_secs(3));
953 assert_eq!(token_bucket.available_permits(), 480);
954
955 cfg.interceptor_state().store_put(RequestAttempts::new(5));
956 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
957 assert_eq!(no_retry, ShouldAttempt::No);
958 assert_eq!(token_bucket.available_permits(), 480);
959 }
960
961 const MAX_BACKOFF: Duration = Duration::from_secs(20);
962
963 #[test]
964 fn calculate_exponential_backoff_where_initial_backoff_is_one() {
965 let initial_backoff = 1.0;
966
967 for (attempt, expected_backoff) in [initial_backoff, 2.0, 4.0].into_iter().enumerate() {
968 let actual_backoff =
969 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
970 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
971 }
972 }
973
974 #[test]
975 fn calculate_exponential_backoff_where_initial_backoff_is_greater_than_one() {
976 let initial_backoff = 3.0;
977
978 for (attempt, expected_backoff) in [initial_backoff, 6.0, 12.0].into_iter().enumerate() {
979 let actual_backoff =
980 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
981 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
982 }
983 }
984
985 #[test]
986 fn calculate_exponential_backoff_where_initial_backoff_is_less_than_one() {
987 let initial_backoff = 0.03;
988
989 for (attempt, expected_backoff) in [initial_backoff, 0.06, 0.12].into_iter().enumerate() {
990 let actual_backoff =
991 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
992 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
993 }
994 }
995
996 #[test]
997 fn calculate_backoff_overflow_should_gracefully_fallback_to_max_backoff() {
998 assert_eq!(
1000 MAX_BACKOFF,
1001 calculate_exponential_backoff(1_f64, 10_f64, 100000, MAX_BACKOFF),
1002 );
1003 }
1004}