aws_smithy_runtime/client/
timeout.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use aws_smithy_async::future::timeout::Timeout;
7use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep, Sleep};
8use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
9use aws_smithy_runtime_api::client::result::SdkError;
10use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
11use aws_smithy_types::config_bag::ConfigBag;
12use aws_smithy_types::timeout::TimeoutConfig;
13use pin_project_lite::pin_project;
14use std::future::Future;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use std::time::Duration;
18
19#[derive(Debug)]
20struct MaybeTimeoutError {
21    kind: TimeoutKind,
22    duration: Duration,
23}
24
25impl MaybeTimeoutError {
26    fn new(kind: TimeoutKind, duration: Duration) -> Self {
27        Self { kind, duration }
28    }
29}
30
31impl std::fmt::Display for MaybeTimeoutError {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(
34            f,
35            "{} occurred after {:?}",
36            match self.kind {
37                TimeoutKind::Operation => "operation timeout (all attempts including retries)",
38                TimeoutKind::OperationAttempt => "operation attempt timeout (single attempt)",
39            },
40            self.duration
41        )
42    }
43}
44
45impl std::error::Error for MaybeTimeoutError {}
46
47pin_project! {
48    #[non_exhaustive]
49    #[must_use = "futures do nothing unless you `.await` or poll them"]
50    // This allow is needed because otherwise Clippy will get mad we didn't document the
51    // generated MaybeTimeoutFutureProj
52    #[allow(missing_docs)]
53    #[project = MaybeTimeoutFutureProj]
54    /// A timeout future that may or may not have a timeout depending on
55    /// whether or not one was set. A `kind` can be set so that when a timeout occurs, there
56    /// is additional context attached to the error.
57    pub(super) enum MaybeTimeoutFuture<F> {
58        /// A wrapper around an inner future that will output an [`SdkError`] if it runs longer than
59        /// the given duration
60        Timeout {
61            #[pin]
62            future: Timeout<F, Sleep>,
63            timeout_kind: TimeoutKind,
64            duration: Duration,
65        },
66        /// A thin wrapper around an inner future that will never time out
67        NoTimeout {
68            #[pin]
69            future: F
70        }
71    }
72}
73
74impl<InnerFuture, T, E> Future for MaybeTimeoutFuture<InnerFuture>
75where
76    InnerFuture: Future<Output = Result<T, SdkError<E, HttpResponse>>>,
77{
78    type Output = Result<T, SdkError<E, HttpResponse>>;
79
80    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81        let (future, kind, duration) = match self.project() {
82            MaybeTimeoutFutureProj::NoTimeout { future } => return future.poll(cx),
83            MaybeTimeoutFutureProj::Timeout {
84                future,
85                timeout_kind,
86                duration,
87            } => (future, timeout_kind, duration),
88        };
89        match future.poll(cx) {
90            Poll::Ready(Ok(response)) => Poll::Ready(response),
91            Poll::Ready(Err(_timeout)) => Poll::Ready(Err(SdkError::timeout_error(
92                MaybeTimeoutError::new(*kind, *duration),
93            ))),
94            Poll::Pending => Poll::Pending,
95        }
96    }
97}
98
99#[derive(Copy, Clone, Debug, Eq, PartialEq)]
100pub(super) enum TimeoutKind {
101    Operation,
102    OperationAttempt,
103}
104
105#[derive(Clone, Debug)]
106pub(super) struct MaybeTimeoutConfig {
107    sleep_impl: Option<SharedAsyncSleep>,
108    timeout: Option<Duration>,
109    timeout_kind: TimeoutKind,
110}
111
112impl MaybeTimeoutConfig {
113    pub(super) fn new(
114        runtime_components: &RuntimeComponents,
115        cfg: &ConfigBag,
116        timeout_kind: TimeoutKind,
117    ) -> MaybeTimeoutConfig {
118        if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
119            let sleep_impl = runtime_components.sleep_impl();
120            let timeout = match (sleep_impl.as_ref(), timeout_kind) {
121                (None, _) => None,
122                (Some(_), TimeoutKind::Operation) => timeout_config.operation_timeout(),
123                (Some(_), TimeoutKind::OperationAttempt) => {
124                    timeout_config.operation_attempt_timeout()
125                }
126            };
127            MaybeTimeoutConfig {
128                sleep_impl,
129                timeout,
130                timeout_kind,
131            }
132        } else {
133            MaybeTimeoutConfig {
134                sleep_impl: None,
135                timeout: None,
136                timeout_kind,
137            }
138        }
139    }
140}
141
142/// Trait to conveniently wrap a future with an optional timeout.
143pub(super) trait MaybeTimeout<T>: Sized {
144    /// Wraps a future in a timeout if one is set.
145    fn maybe_timeout(self, timeout_config: MaybeTimeoutConfig) -> MaybeTimeoutFuture<Self>;
146}
147
148impl<T> MaybeTimeout<T> for T
149where
150    T: Future,
151{
152    fn maybe_timeout(self, timeout_config: MaybeTimeoutConfig) -> MaybeTimeoutFuture<Self> {
153        match timeout_config {
154            MaybeTimeoutConfig {
155                sleep_impl: Some(sleep_impl),
156                timeout: Some(timeout),
157                timeout_kind,
158            } => MaybeTimeoutFuture::Timeout {
159                future: Timeout::new(self, sleep_impl.sleep(timeout)),
160                timeout_kind,
161                duration: timeout,
162            },
163            _ => MaybeTimeoutFuture::NoTimeout { future: self },
164        }
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use aws_smithy_async::assert_elapsed;
172    use aws_smithy_async::future::never::Never;
173    use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep, TokioSleep};
174    use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
175    use aws_smithy_runtime_api::client::result::SdkError;
176    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
177    use aws_smithy_types::config_bag::{CloneableLayer, ConfigBag};
178    use aws_smithy_types::timeout::TimeoutConfig;
179    use std::time::Duration;
180
181    #[tokio::test]
182    async fn test_no_timeout() {
183        let sleep_impl = SharedAsyncSleep::new(TokioSleep::new());
184        let sleep_future = sleep_impl.sleep(Duration::from_millis(250));
185        let underlying_future = async {
186            sleep_future.await;
187            Result::<_, SdkError<(), HttpResponse>>::Ok(())
188        };
189
190        let now = tokio::time::Instant::now();
191        tokio::time::pause();
192
193        let runtime_components = RuntimeComponentsBuilder::for_tests()
194            .with_sleep_impl(Some(sleep_impl))
195            .build()
196            .unwrap();
197
198        let mut timeout_config = CloneableLayer::new("timeout");
199        timeout_config.store_put(TimeoutConfig::builder().build());
200        let cfg = ConfigBag::of_layers(vec![timeout_config.into()]);
201
202        let maybe_timeout =
203            MaybeTimeoutConfig::new(&runtime_components, &cfg, TimeoutKind::Operation);
204        underlying_future
205            .maybe_timeout(maybe_timeout)
206            .await
207            .expect("success");
208
209        assert_elapsed!(now, Duration::from_secs_f32(0.25));
210    }
211
212    #[tokio::test]
213    async fn test_operation_timeout() {
214        let sleep_impl = SharedAsyncSleep::new(TokioSleep::new());
215        let never = Never::new();
216        let underlying_future = async {
217            never.await;
218            Result::<_, SdkError<(), HttpResponse>>::Ok(())
219        };
220
221        let now = tokio::time::Instant::now();
222        tokio::time::pause();
223
224        let runtime_components = RuntimeComponentsBuilder::for_tests()
225            .with_sleep_impl(Some(sleep_impl))
226            .build()
227            .unwrap();
228        let mut timeout_config = CloneableLayer::new("timeout");
229        timeout_config.store_put(
230            TimeoutConfig::builder()
231                .operation_timeout(Duration::from_millis(250))
232                .build(),
233        );
234        let cfg = ConfigBag::of_layers(vec![timeout_config.into()]);
235
236        let maybe_timeout =
237            MaybeTimeoutConfig::new(&runtime_components, &cfg, TimeoutKind::Operation);
238        let result = underlying_future.maybe_timeout(maybe_timeout).await;
239        let err = result.expect_err("should have timed out");
240
241        assert_eq!(format!("{:?}", err), "TimeoutError(TimeoutError { source: MaybeTimeoutError { kind: Operation, duration: 250ms } })");
242        assert_elapsed!(now, Duration::from_secs_f32(0.25));
243    }
244}