aws_smithy_runtime/client/retries/strategy/
standard.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use std::sync::Mutex;
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::OwnedSemaphorePermit;
10use tracing::{debug, trace};
11
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::interceptors::context::{
14    BeforeTransmitInterceptorContextMut, InterceptorContext,
15};
16use aws_smithy_runtime_api::client::interceptors::Intercept;
17use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
18use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
19use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
20use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
21use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};
22
23use crate::client::retries::classifiers::run_classifiers_on_ctx;
24use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
25use crate::client::retries::strategy::standard::ReleaseResult::{
26    APermitWasReleased, NoPermitWasReleased,
27};
28use crate::client::retries::token_bucket::TokenBucket;
29use crate::client::retries::{ClientRateLimiterPartition, RetryPartition};
30use crate::static_partition_map::StaticPartitionMap;
31
32static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
33    StaticPartitionMap::new();
34
35/// Used by token bucket interceptor to ensure a TokenBucket always exists in config bag
36static TOKEN_BUCKET: StaticPartitionMap<RetryPartition, TokenBucket> = StaticPartitionMap::new();
37
38/// Retry strategy with exponential backoff, max attempts, and a token bucket.
39#[derive(Debug, Default)]
40pub struct StandardRetryStrategy {
41    retry_permit: Mutex<Option<OwnedSemaphorePermit>>,
42}
43
44impl Storable for StandardRetryStrategy {
45    type Storer = StoreReplace<Self>;
46}
47
48impl StandardRetryStrategy {
49    /// Create a new standard retry strategy with the given config.
50    pub fn new() -> Self {
51        Default::default()
52    }
53
54    fn release_retry_permit(&self) -> ReleaseResult {
55        let mut retry_permit = self.retry_permit.lock().unwrap();
56        match retry_permit.take() {
57            Some(p) => {
58                drop(p);
59                APermitWasReleased
60            }
61            None => NoPermitWasReleased,
62        }
63    }
64
65    fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
66        let mut old_retry_permit = self.retry_permit.lock().unwrap();
67        if let Some(p) = old_retry_permit.replace(new_retry_permit) {
68            // Whenever we set a new retry permit, and it replaces the old one, we need to "forget"
69            // the old permit, removing it from the bucket forever.
70            p.forget()
71        }
72    }
73
74    /// Returns a [`ClientRateLimiter`] if adaptive retry is configured.
75    fn adaptive_retry_rate_limiter(
76        runtime_components: &RuntimeComponents,
77        cfg: &ConfigBag,
78    ) -> Option<ClientRateLimiter> {
79        let retry_config = cfg.load::<RetryConfig>().expect("retry config is required");
80        if retry_config.mode() == RetryMode::Adaptive {
81            if let Some(time_source) = runtime_components.time_source() {
82                let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
83                let seconds_since_unix_epoch = time_source
84                    .now()
85                    .duration_since(SystemTime::UNIX_EPOCH)
86                    .expect("the present takes place after the UNIX_EPOCH")
87                    .as_secs_f64();
88                let client_rate_limiter_partition =
89                    ClientRateLimiterPartition::new(retry_partition.clone());
90                let client_rate_limiter = CLIENT_RATE_LIMITER
91                    .get_or_init(client_rate_limiter_partition, || {
92                        ClientRateLimiter::new(seconds_since_unix_epoch)
93                    });
94                return Some(client_rate_limiter);
95            }
96        }
97        None
98    }
99
100    fn calculate_backoff(
101        &self,
102        runtime_components: &RuntimeComponents,
103        cfg: &ConfigBag,
104        retry_cfg: &RetryConfig,
105        retry_reason: &RetryAction,
106    ) -> Result<Duration, ShouldAttempt> {
107        let request_attempts = cfg
108            .load::<RequestAttempts>()
109            .expect("at least one request attempt is made before any retry is attempted")
110            .attempts();
111
112        match retry_reason {
113            RetryAction::RetryIndicated(RetryReason::RetryableError { kind, retry_after }) => {
114                if let Some(delay) = *retry_after {
115                    let delay = delay.min(retry_cfg.max_backoff());
116                    debug!("explicit request from server to delay {delay:?} before retrying");
117                    Ok(delay)
118                } else if let Some(delay) =
119                    check_rate_limiter_for_delay(runtime_components, cfg, *kind)
120                {
121                    let delay = delay.min(retry_cfg.max_backoff());
122                    debug!("rate limiter has requested a {delay:?} delay before retrying");
123                    Ok(delay)
124                } else {
125                    let base = if retry_cfg.use_static_exponential_base() {
126                        1.0
127                    } else {
128                        fastrand::f64()
129                    };
130                    Ok(calculate_exponential_backoff(
131                        // Generate a random base multiplier to create jitter
132                        base,
133                        // Get the backoff time multiplier in seconds (with fractional seconds)
134                        retry_cfg.initial_backoff().as_secs_f64(),
135                        // `self.local.attempts` tracks number of requests made including the initial request
136                        // The initial attempt shouldn't count towards backoff calculations, so we subtract it
137                        request_attempts - 1,
138                        // Maximum backoff duration as a fallback to prevent overflow when calculating a power
139                        retry_cfg.max_backoff(),
140                    ))
141                }
142            }
143            RetryAction::RetryForbidden | RetryAction::NoActionIndicated => {
144                debug!(
145                    attempts = request_attempts,
146                    max_attempts = retry_cfg.max_attempts(),
147                    "encountered un-retryable error"
148                );
149                Err(ShouldAttempt::No)
150            }
151            _ => unreachable!("RetryAction is non-exhaustive"),
152        }
153    }
154}
155
156enum ReleaseResult {
157    APermitWasReleased,
158    NoPermitWasReleased,
159}
160
161impl RetryStrategy for StandardRetryStrategy {
162    fn should_attempt_initial_request(
163        &self,
164        runtime_components: &RuntimeComponents,
165        cfg: &ConfigBag,
166    ) -> Result<ShouldAttempt, BoxError> {
167        if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) {
168            let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
169            if let Err(delay) = crl.acquire_permission_to_send_a_request(
170                seconds_since_unix_epoch,
171                RequestReason::InitialRequest,
172            ) {
173                return Ok(ShouldAttempt::YesAfterDelay(delay));
174            }
175        } else {
176            debug!("no client rate limiter configured, so no token is required for the initial request.");
177        }
178
179        Ok(ShouldAttempt::Yes)
180    }
181
182    fn should_attempt_retry(
183        &self,
184        ctx: &InterceptorContext,
185        runtime_components: &RuntimeComponents,
186        cfg: &ConfigBag,
187    ) -> Result<ShouldAttempt, BoxError> {
188        let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
189
190        // bookkeeping
191        let token_bucket = cfg.load::<TokenBucket>().expect("token bucket is required");
192        // run the classifier against the context to determine if we should retry
193        let retry_classifiers = runtime_components.retry_classifiers();
194        let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);
195
196        // (adaptive only): update fill rate
197        // NOTE: SEP indicates doing bookkeeping before asking if we should retry. We need to know if
198        // the error was a throttling error though to do adaptive retry bookkeeping so we take
199        // advantage of that information being available via the classifier result
200        let error_kind = error_kind(&classifier_result);
201        let is_throttling_error = error_kind
202            .map(|kind| kind == ErrorKind::ThrottlingError)
203            .unwrap_or(false);
204        update_rate_limiter_if_exists(runtime_components, cfg, is_throttling_error);
205
206        // on success release any retry quota held by previous attempts
207        if !ctx.is_failed() {
208            if let NoPermitWasReleased = self.release_retry_permit() {
209                // In the event that there was no retry permit to release, we generate new
210                // permits from nothing. We do this to make up for permits we had to "forget".
211                // Otherwise, repeated retries would empty the bucket and nothing could fill it
212                // back up again.
213                token_bucket.regenerate_a_token();
214            }
215        }
216        // end bookkeeping
217
218        let request_attempts = cfg
219            .load::<RequestAttempts>()
220            .expect("at least one request attempt is made before any retry is attempted")
221            .attempts();
222
223        // check if retry should be attempted
224        if !classifier_result.should_retry() {
225            debug!(
226                "attempt #{request_attempts} classified as {:?}, not retrying",
227                classifier_result
228            );
229            return Ok(ShouldAttempt::No);
230        }
231
232        // check if we're out of attempts
233        if request_attempts >= retry_cfg.max_attempts() {
234            debug!(
235                attempts = request_attempts,
236                max_attempts = retry_cfg.max_attempts(),
237                "not retrying because we are out of attempts"
238            );
239            return Ok(ShouldAttempt::No);
240        }
241
242        //  acquire permit for retry
243        let error_kind = error_kind.expect("result was classified retryable");
244        match token_bucket.acquire(&error_kind) {
245            Some(permit) => self.set_retry_permit(permit),
246            None => {
247                debug!("attempt #{request_attempts} failed with {error_kind:?}; However, not enough retry quota is available for another attempt so no retry will be attempted.");
248                return Ok(ShouldAttempt::No);
249            }
250        }
251
252        // calculate delay until next attempt
253        let backoff =
254            match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
255                Ok(value) => value,
256                // In some cases, backoff calculation will decide that we shouldn't retry at all.
257                Err(value) => return Ok(value),
258            };
259
260        debug!(
261            "attempt #{request_attempts} failed with {:?}; retrying after {:?}",
262            classifier_result, backoff
263        );
264        Ok(ShouldAttempt::YesAfterDelay(backoff))
265    }
266}
267
268/// extract the error kind from the classifier result if available
269fn error_kind(classifier_result: &RetryAction) -> Option<ErrorKind> {
270    match classifier_result {
271        RetryAction::RetryIndicated(RetryReason::RetryableError { kind, .. }) => Some(*kind),
272        _ => None,
273    }
274}
275
276fn update_rate_limiter_if_exists(
277    runtime_components: &RuntimeComponents,
278    cfg: &ConfigBag,
279    is_throttling_error: bool,
280) {
281    if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
282        let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
283        crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error);
284    }
285}
286
287fn check_rate_limiter_for_delay(
288    runtime_components: &RuntimeComponents,
289    cfg: &ConfigBag,
290    kind: ErrorKind,
291) -> Option<Duration> {
292    if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
293        let retry_reason = if kind == ErrorKind::ThrottlingError {
294            RequestReason::RetryTimeout
295        } else {
296            RequestReason::Retry
297        };
298        if let Err(delay) = crl.acquire_permission_to_send_a_request(
299            get_seconds_since_unix_epoch(runtime_components),
300            retry_reason,
301        ) {
302            return Some(delay);
303        }
304    }
305
306    None
307}
308
309fn calculate_exponential_backoff(
310    base: f64,
311    initial_backoff: f64,
312    retry_attempts: u32,
313    max_backoff: Duration,
314) -> Duration {
315    let result = match 2_u32
316        .checked_pow(retry_attempts)
317        .map(|power| (power as f64) * initial_backoff)
318    {
319        Some(backoff) => match Duration::try_from_secs_f64(backoff) {
320            Ok(result) => result.min(max_backoff),
321            Err(e) => {
322                tracing::warn!("falling back to {max_backoff:?} as `Duration` could not be created for exponential backoff: {e}");
323                max_backoff
324            }
325        },
326        None => max_backoff,
327    };
328
329    // Apply jitter to `result`, and note that it can be applied to `max_backoff`.
330    // Won't panic because `base` is either in range 0..1 or a constant 1 in testing (if configured).
331    result.mul_f64(base)
332}
333
334fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {
335    let request_time = runtime_components
336        .time_source()
337        .expect("time source required for retries");
338    request_time
339        .now()
340        .duration_since(SystemTime::UNIX_EPOCH)
341        .unwrap()
342        .as_secs_f64()
343}
344
345/// Interceptor registered in default retry plugin that ensures a token bucket exists in config
346/// bag for every operation. Token bucket provided is partitioned by the retry partition **in the
347/// config bag** at the time an operation is executed.
348#[derive(Debug)]
349pub(crate) struct TokenBucketProvider {
350    default_partition: RetryPartition,
351    token_bucket: TokenBucket,
352}
353
354impl TokenBucketProvider {
355    /// Create a new token bucket provider with the given default retry partition.
356    ///
357    /// NOTE: This partition should be the one used for every operation on a client
358    /// unless config is overridden.
359    pub(crate) fn new(default_partition: RetryPartition) -> Self {
360        let token_bucket = TOKEN_BUCKET.get_or_init_default(default_partition.clone());
361        Self {
362            default_partition,
363            token_bucket,
364        }
365    }
366}
367
368impl Intercept for TokenBucketProvider {
369    fn name(&self) -> &'static str {
370        "TokenBucketProvider"
371    }
372
373    fn modify_before_retry_loop(
374        &self,
375        _context: &mut BeforeTransmitInterceptorContextMut<'_>,
376        _runtime_components: &RuntimeComponents,
377        cfg: &mut ConfigBag,
378    ) -> Result<(), BoxError> {
379        let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
380
381        // we store the original retry partition configured and associated token bucket
382        // for the client when created so that we can avoid locking on _every_ request
383        // from _every_ client
384        let tb = if *retry_partition != self.default_partition {
385            TOKEN_BUCKET.get_or_init_default(retry_partition.clone())
386        } else {
387            // avoid contention on the global lock
388            self.token_bucket.clone()
389        };
390
391        trace!("token bucket for {retry_partition:?} added to config bag");
392        let mut layer = Layer::new("token_bucket_partition");
393        layer.store_put(tb);
394        cfg.push_layer(layer);
395        Ok(())
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    #[allow(unused_imports)] // will be unused with `--no-default-features --features client`
402    use std::fmt;
403    use std::sync::Mutex;
404    use std::time::Duration;
405
406    use aws_smithy_runtime_api::client::interceptors::context::{
407        Input, InterceptorContext, Output,
408    };
409    use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
410    use aws_smithy_runtime_api::client::retries::classifiers::{
411        ClassifyRetry, RetryAction, SharedRetryClassifier,
412    };
413    use aws_smithy_runtime_api::client::retries::{
414        AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
415    };
416    use aws_smithy_runtime_api::client::runtime_components::{
417        RuntimeComponents, RuntimeComponentsBuilder,
418    };
419    use aws_smithy_types::config_bag::{ConfigBag, Layer};
420    use aws_smithy_types::retry::{ErrorKind, RetryConfig};
421
422    use super::{calculate_exponential_backoff, StandardRetryStrategy};
423    use crate::client::retries::TokenBucket;
424
425    #[test]
426    fn no_retry_necessary_for_ok_result() {
427        let cfg = ConfigBag::of_layers(vec![{
428            let mut layer = Layer::new("test");
429            layer.store_put(RetryConfig::standard());
430            layer.store_put(RequestAttempts::new(1));
431            layer.store_put(TokenBucket::default());
432            layer
433        }]);
434        let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
435        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
436        let strategy = StandardRetryStrategy::default();
437        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
438
439        let actual = strategy
440            .should_attempt_retry(&ctx, &rc, &cfg)
441            .expect("method is infallible for this use");
442        assert_eq!(ShouldAttempt::No, actual);
443    }
444
445    fn set_up_cfg_and_context(
446        error_kind: ErrorKind,
447        current_request_attempts: u32,
448        retry_config: RetryConfig,
449    ) -> (InterceptorContext, RuntimeComponents, ConfigBag) {
450        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
451        ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
452        let rc = RuntimeComponentsBuilder::for_tests()
453            .with_retry_classifier(SharedRetryClassifier::new(AlwaysRetry(error_kind)))
454            .build()
455            .unwrap();
456        let mut layer = Layer::new("test");
457        layer.store_put(RequestAttempts::new(current_request_attempts));
458        layer.store_put(retry_config);
459        layer.store_put(TokenBucket::default());
460        let cfg = ConfigBag::of_layers(vec![layer]);
461
462        (ctx, rc, cfg)
463    }
464
465    // Test that error kinds produce the correct "retry after X seconds" output.
466    // All error kinds are handled in the same way for the standard strategy.
467    fn test_should_retry_error_kind(error_kind: ErrorKind) {
468        let (ctx, rc, cfg) = set_up_cfg_and_context(
469            error_kind,
470            3,
471            RetryConfig::standard()
472                .with_use_static_exponential_base(true)
473                .with_max_attempts(4),
474        );
475        let strategy = StandardRetryStrategy::new();
476        let actual = strategy
477            .should_attempt_retry(&ctx, &rc, &cfg)
478            .expect("method is infallible for this use");
479        assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(4)), actual);
480    }
481
482    #[test]
483    fn should_retry_transient_error_result_after_2s() {
484        test_should_retry_error_kind(ErrorKind::TransientError);
485    }
486
487    #[test]
488    fn should_retry_client_error_result_after_2s() {
489        test_should_retry_error_kind(ErrorKind::ClientError);
490    }
491
492    #[test]
493    fn should_retry_server_error_result_after_2s() {
494        test_should_retry_error_kind(ErrorKind::ServerError);
495    }
496
497    #[test]
498    fn should_retry_throttling_error_result_after_2s() {
499        test_should_retry_error_kind(ErrorKind::ThrottlingError);
500    }
501
502    #[test]
503    fn dont_retry_when_out_of_attempts() {
504        let current_attempts = 4;
505        let max_attempts = current_attempts;
506        let (ctx, rc, cfg) = set_up_cfg_and_context(
507            ErrorKind::TransientError,
508            current_attempts,
509            RetryConfig::standard()
510                .with_use_static_exponential_base(true)
511                .with_max_attempts(max_attempts),
512        );
513        let strategy = StandardRetryStrategy::new();
514        let actual = strategy
515            .should_attempt_retry(&ctx, &rc, &cfg)
516            .expect("method is infallible for this use");
517        assert_eq!(ShouldAttempt::No, actual);
518    }
519
520    #[test]
521    fn should_not_panic_when_exponential_backoff_duration_could_not_be_created() {
522        let (ctx, rc, cfg) = set_up_cfg_and_context(
523            ErrorKind::TransientError,
524            // Greater than 32 when subtracted by 1 in `calculate_backoff`, causing overflow in `calculate_exponential_backoff`
525            33,
526            RetryConfig::standard()
527                .with_use_static_exponential_base(true)
528                .with_max_attempts(100), // Any value greater than 33 will do
529        );
530        let strategy = StandardRetryStrategy::new();
531        let actual = strategy
532            .should_attempt_retry(&ctx, &rc, &cfg)
533            .expect("method is infallible for this use");
534        assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
535    }
536
537    #[allow(dead_code)] // will be unused with `--no-default-features --features client`
538    #[derive(Debug)]
539    struct PresetReasonRetryClassifier {
540        retry_actions: Mutex<Vec<RetryAction>>,
541    }
542
543    #[cfg(feature = "test-util")]
544    impl PresetReasonRetryClassifier {
545        fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
546            // We'll pop the retry_reasons in reverse order, so we reverse the list to fix that.
547            retry_reasons.reverse();
548            Self {
549                retry_actions: Mutex::new(retry_reasons),
550            }
551        }
552    }
553
554    impl ClassifyRetry for PresetReasonRetryClassifier {
555        fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
556            // Check for a result
557            let output_or_error = ctx.output_or_error();
558            // Check for an error
559            match output_or_error {
560                Some(Ok(_)) | None => return RetryAction::NoActionIndicated,
561                _ => (),
562            };
563
564            let mut retry_actions = self.retry_actions.lock().unwrap();
565            if retry_actions.len() == 1 {
566                retry_actions.first().unwrap().clone()
567            } else {
568                retry_actions.pop().unwrap()
569            }
570        }
571
572        fn name(&self) -> &'static str {
573            "Always returns a preset retry reason"
574        }
575    }
576
577    #[cfg(feature = "test-util")]
578    fn setup_test(
579        retry_reasons: Vec<RetryAction>,
580        retry_config: RetryConfig,
581    ) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
582        let rc = RuntimeComponentsBuilder::for_tests()
583            .with_retry_classifier(SharedRetryClassifier::new(
584                PresetReasonRetryClassifier::new(retry_reasons),
585            ))
586            .build()
587            .unwrap();
588        let mut layer = Layer::new("test");
589        layer.store_put(retry_config);
590        let cfg = ConfigBag::of_layers(vec![layer]);
591        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
592        // This type doesn't matter b/c the classifier will just return whatever we tell it to.
593        ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
594
595        (cfg, rc, ctx)
596    }
597
598    #[cfg(feature = "test-util")]
599    #[test]
600    fn eventual_success() {
601        let (mut cfg, rc, mut ctx) = setup_test(
602            vec![RetryAction::server_error()],
603            RetryConfig::standard()
604                .with_use_static_exponential_base(true)
605                .with_max_attempts(5),
606        );
607        let strategy = StandardRetryStrategy::new();
608        cfg.interceptor_state().store_put(TokenBucket::default());
609        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
610
611        cfg.interceptor_state().store_put(RequestAttempts::new(1));
612        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
613        let dur = should_retry.expect_delay();
614        assert_eq!(dur, Duration::from_secs(1));
615        assert_eq!(token_bucket.available_permits(), 495);
616
617        cfg.interceptor_state().store_put(RequestAttempts::new(2));
618        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
619        let dur = should_retry.expect_delay();
620        assert_eq!(dur, Duration::from_secs(2));
621        assert_eq!(token_bucket.available_permits(), 490);
622
623        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
624
625        cfg.interceptor_state().store_put(RequestAttempts::new(3));
626        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
627        assert_eq!(no_retry, ShouldAttempt::No);
628        assert_eq!(token_bucket.available_permits(), 495);
629    }
630
631    #[cfg(feature = "test-util")]
632    #[test]
633    fn no_more_attempts() {
634        let (mut cfg, rc, ctx) = setup_test(
635            vec![RetryAction::server_error()],
636            RetryConfig::standard()
637                .with_use_static_exponential_base(true)
638                .with_max_attempts(3),
639        );
640        let strategy = StandardRetryStrategy::new();
641        cfg.interceptor_state().store_put(TokenBucket::default());
642        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
643
644        cfg.interceptor_state().store_put(RequestAttempts::new(1));
645        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
646        let dur = should_retry.expect_delay();
647        assert_eq!(dur, Duration::from_secs(1));
648        assert_eq!(token_bucket.available_permits(), 495);
649
650        cfg.interceptor_state().store_put(RequestAttempts::new(2));
651        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
652        let dur = should_retry.expect_delay();
653        assert_eq!(dur, Duration::from_secs(2));
654        assert_eq!(token_bucket.available_permits(), 490);
655
656        cfg.interceptor_state().store_put(RequestAttempts::new(3));
657        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
658        assert_eq!(no_retry, ShouldAttempt::No);
659        assert_eq!(token_bucket.available_permits(), 490);
660    }
661
662    #[cfg(feature = "test-util")]
663    #[test]
664    fn successful_request_and_deser_should_be_retryable() {
665        #[derive(Clone, Copy, Debug)]
666        enum LongRunningOperationStatus {
667            Running,
668            Complete,
669        }
670
671        #[derive(Debug)]
672        struct LongRunningOperationOutput {
673            status: Option<LongRunningOperationStatus>,
674        }
675
676        impl LongRunningOperationOutput {
677            fn status(&self) -> Option<LongRunningOperationStatus> {
678                self.status
679            }
680        }
681
682        struct WaiterRetryClassifier {}
683
684        impl WaiterRetryClassifier {
685            fn new() -> Self {
686                WaiterRetryClassifier {}
687            }
688        }
689
690        impl fmt::Debug for WaiterRetryClassifier {
691            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
692                write!(f, "WaiterRetryClassifier")
693            }
694        }
695        impl ClassifyRetry for WaiterRetryClassifier {
696            fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
697                let status: Option<LongRunningOperationStatus> =
698                    ctx.output_or_error().and_then(|res| {
699                        res.ok().and_then(|output| {
700                            output
701                                .downcast_ref::<LongRunningOperationOutput>()
702                                .and_then(|output| output.status())
703                        })
704                    });
705
706                if let Some(LongRunningOperationStatus::Running) = status {
707                    return RetryAction::server_error();
708                };
709
710                RetryAction::NoActionIndicated
711            }
712
713            fn name(&self) -> &'static str {
714                "waiter retry classifier"
715            }
716        }
717
718        let retry_config = RetryConfig::standard()
719            .with_use_static_exponential_base(true)
720            .with_max_attempts(5);
721
722        let rc = RuntimeComponentsBuilder::for_tests()
723            .with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
724            .build()
725            .unwrap();
726        let mut layer = Layer::new("test");
727        layer.store_put(retry_config);
728        let mut cfg = ConfigBag::of_layers(vec![layer]);
729        let mut ctx = InterceptorContext::new(Input::doesnt_matter());
730        let strategy = StandardRetryStrategy::new();
731
732        ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
733            status: Some(LongRunningOperationStatus::Running),
734        })));
735
736        cfg.interceptor_state().store_put(TokenBucket::new(5));
737        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
738
739        cfg.interceptor_state().store_put(RequestAttempts::new(1));
740        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
741        let dur = should_retry.expect_delay();
742        assert_eq!(dur, Duration::from_secs(1));
743        assert_eq!(token_bucket.available_permits(), 0);
744
745        ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
746            status: Some(LongRunningOperationStatus::Complete),
747        })));
748        cfg.interceptor_state().store_put(RequestAttempts::new(2));
749        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
750        should_retry.expect_no();
751        assert_eq!(token_bucket.available_permits(), 5);
752    }
753
754    #[cfg(feature = "test-util")]
755    #[test]
756    fn no_quota() {
757        let (mut cfg, rc, ctx) = setup_test(
758            vec![RetryAction::server_error()],
759            RetryConfig::standard()
760                .with_use_static_exponential_base(true)
761                .with_max_attempts(5),
762        );
763        let strategy = StandardRetryStrategy::new();
764        cfg.interceptor_state().store_put(TokenBucket::new(5));
765        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
766
767        cfg.interceptor_state().store_put(RequestAttempts::new(1));
768        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
769        let dur = should_retry.expect_delay();
770        assert_eq!(dur, Duration::from_secs(1));
771        assert_eq!(token_bucket.available_permits(), 0);
772
773        cfg.interceptor_state().store_put(RequestAttempts::new(2));
774        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
775        assert_eq!(no_retry, ShouldAttempt::No);
776        assert_eq!(token_bucket.available_permits(), 0);
777    }
778
779    #[cfg(feature = "test-util")]
780    #[test]
781    fn quota_replenishes_on_success() {
782        let (mut cfg, rc, mut ctx) = setup_test(
783            vec![
784                RetryAction::transient_error(),
785                RetryAction::retryable_error_with_explicit_delay(
786                    ErrorKind::TransientError,
787                    Duration::from_secs(1),
788                ),
789            ],
790            RetryConfig::standard()
791                .with_use_static_exponential_base(true)
792                .with_max_attempts(5),
793        );
794        let strategy = StandardRetryStrategy::new();
795        cfg.interceptor_state().store_put(TokenBucket::new(100));
796        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
797
798        cfg.interceptor_state().store_put(RequestAttempts::new(1));
799        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
800        let dur = should_retry.expect_delay();
801        assert_eq!(dur, Duration::from_secs(1));
802        assert_eq!(token_bucket.available_permits(), 90);
803
804        cfg.interceptor_state().store_put(RequestAttempts::new(2));
805        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
806        let dur = should_retry.expect_delay();
807        assert_eq!(dur, Duration::from_secs(1));
808        assert_eq!(token_bucket.available_permits(), 80);
809
810        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
811
812        cfg.interceptor_state().store_put(RequestAttempts::new(3));
813        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
814        assert_eq!(no_retry, ShouldAttempt::No);
815
816        assert_eq!(token_bucket.available_permits(), 90);
817    }
818
819    #[cfg(feature = "test-util")]
820    #[test]
821    fn quota_replenishes_on_first_try_success() {
822        const PERMIT_COUNT: usize = 20;
823        let (mut cfg, rc, mut ctx) = setup_test(
824            vec![RetryAction::transient_error()],
825            RetryConfig::standard()
826                .with_use_static_exponential_base(true)
827                .with_max_attempts(u32::MAX),
828        );
829        let strategy = StandardRetryStrategy::new();
830        cfg.interceptor_state()
831            .store_put(TokenBucket::new(PERMIT_COUNT));
832        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
833
834        let mut attempt = 1;
835
836        // Drain all available permits with failed attempts
837        while token_bucket.available_permits() > 0 {
838            // Draining should complete in 2 attempts
839            if attempt > 2 {
840                panic!("This test should have completed by now (drain)");
841            }
842
843            cfg.interceptor_state()
844                .store_put(RequestAttempts::new(attempt));
845            let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
846            assert!(matches!(should_retry, ShouldAttempt::YesAfterDelay(_)));
847            attempt += 1;
848        }
849
850        // Forget the permit so that we can only refill by "success on first try".
851        let permit = strategy.retry_permit.lock().unwrap().take().unwrap();
852        permit.forget();
853
854        ctx.set_output_or_error(Ok(Output::doesnt_matter()));
855
856        // Replenish permits until we get back to `PERMIT_COUNT`
857        while token_bucket.available_permits() < PERMIT_COUNT {
858            if attempt > 23 {
859                panic!("This test should have completed by now (fill-up)");
860            }
861
862            cfg.interceptor_state()
863                .store_put(RequestAttempts::new(attempt));
864            let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
865            assert_eq!(no_retry, ShouldAttempt::No);
866            attempt += 1;
867        }
868
869        assert_eq!(attempt, 23);
870        assert_eq!(token_bucket.available_permits(), PERMIT_COUNT);
871    }
872
873    #[cfg(feature = "test-util")]
874    #[test]
875    fn backoff_timing() {
876        let (mut cfg, rc, ctx) = setup_test(
877            vec![RetryAction::server_error()],
878            RetryConfig::standard()
879                .with_use_static_exponential_base(true)
880                .with_max_attempts(5),
881        );
882        let strategy = StandardRetryStrategy::new();
883        cfg.interceptor_state().store_put(TokenBucket::default());
884        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
885
886        cfg.interceptor_state().store_put(RequestAttempts::new(1));
887        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
888        let dur = should_retry.expect_delay();
889        assert_eq!(dur, Duration::from_secs(1));
890        assert_eq!(token_bucket.available_permits(), 495);
891
892        cfg.interceptor_state().store_put(RequestAttempts::new(2));
893        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
894        let dur = should_retry.expect_delay();
895        assert_eq!(dur, Duration::from_secs(2));
896        assert_eq!(token_bucket.available_permits(), 490);
897
898        cfg.interceptor_state().store_put(RequestAttempts::new(3));
899        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
900        let dur = should_retry.expect_delay();
901        assert_eq!(dur, Duration::from_secs(4));
902        assert_eq!(token_bucket.available_permits(), 485);
903
904        cfg.interceptor_state().store_put(RequestAttempts::new(4));
905        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
906        let dur = should_retry.expect_delay();
907        assert_eq!(dur, Duration::from_secs(8));
908        assert_eq!(token_bucket.available_permits(), 480);
909
910        cfg.interceptor_state().store_put(RequestAttempts::new(5));
911        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
912        assert_eq!(no_retry, ShouldAttempt::No);
913        assert_eq!(token_bucket.available_permits(), 480);
914    }
915
916    #[cfg(feature = "test-util")]
917    #[test]
918    fn max_backoff_time() {
919        let (mut cfg, rc, ctx) = setup_test(
920            vec![RetryAction::server_error()],
921            RetryConfig::standard()
922                .with_use_static_exponential_base(true)
923                .with_max_attempts(5)
924                .with_initial_backoff(Duration::from_secs(1))
925                .with_max_backoff(Duration::from_secs(3)),
926        );
927        let strategy = StandardRetryStrategy::new();
928        cfg.interceptor_state().store_put(TokenBucket::default());
929        let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
930
931        cfg.interceptor_state().store_put(RequestAttempts::new(1));
932        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
933        let dur = should_retry.expect_delay();
934        assert_eq!(dur, Duration::from_secs(1));
935        assert_eq!(token_bucket.available_permits(), 495);
936
937        cfg.interceptor_state().store_put(RequestAttempts::new(2));
938        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
939        let dur = should_retry.expect_delay();
940        assert_eq!(dur, Duration::from_secs(2));
941        assert_eq!(token_bucket.available_permits(), 490);
942
943        cfg.interceptor_state().store_put(RequestAttempts::new(3));
944        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
945        let dur = should_retry.expect_delay();
946        assert_eq!(dur, Duration::from_secs(3));
947        assert_eq!(token_bucket.available_permits(), 485);
948
949        cfg.interceptor_state().store_put(RequestAttempts::new(4));
950        let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
951        let dur = should_retry.expect_delay();
952        assert_eq!(dur, Duration::from_secs(3));
953        assert_eq!(token_bucket.available_permits(), 480);
954
955        cfg.interceptor_state().store_put(RequestAttempts::new(5));
956        let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
957        assert_eq!(no_retry, ShouldAttempt::No);
958        assert_eq!(token_bucket.available_permits(), 480);
959    }
960
961    const MAX_BACKOFF: Duration = Duration::from_secs(20);
962
963    #[test]
964    fn calculate_exponential_backoff_where_initial_backoff_is_one() {
965        let initial_backoff = 1.0;
966
967        for (attempt, expected_backoff) in [initial_backoff, 2.0, 4.0].into_iter().enumerate() {
968            let actual_backoff =
969                calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
970            assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
971        }
972    }
973
974    #[test]
975    fn calculate_exponential_backoff_where_initial_backoff_is_greater_than_one() {
976        let initial_backoff = 3.0;
977
978        for (attempt, expected_backoff) in [initial_backoff, 6.0, 12.0].into_iter().enumerate() {
979            let actual_backoff =
980                calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
981            assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
982        }
983    }
984
985    #[test]
986    fn calculate_exponential_backoff_where_initial_backoff_is_less_than_one() {
987        let initial_backoff = 0.03;
988
989        for (attempt, expected_backoff) in [initial_backoff, 0.06, 0.12].into_iter().enumerate() {
990            let actual_backoff =
991                calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
992            assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
993        }
994    }
995
996    #[test]
997    fn calculate_backoff_overflow_should_gracefully_fallback_to_max_backoff() {
998        // avoid overflow for a silly large amount of retry attempts
999        assert_eq!(
1000            MAX_BACKOFF,
1001            calculate_exponential_backoff(1_f64, 10_f64, 100000, MAX_BACKOFF),
1002        );
1003    }
1004}