aws_smithy_runtime/client/
timeout.rs
1use aws_smithy_async::future::timeout::Timeout;
7use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep, Sleep};
8use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
9use aws_smithy_runtime_api::client::result::SdkError;
10use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
11use aws_smithy_types::config_bag::ConfigBag;
12use aws_smithy_types::timeout::TimeoutConfig;
13use pin_project_lite::pin_project;
14use std::future::Future;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17use std::time::Duration;
18
19#[derive(Debug)]
20struct MaybeTimeoutError {
21 kind: TimeoutKind,
22 duration: Duration,
23}
24
25impl MaybeTimeoutError {
26 fn new(kind: TimeoutKind, duration: Duration) -> Self {
27 Self { kind, duration }
28 }
29}
30
31impl std::fmt::Display for MaybeTimeoutError {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(
34 f,
35 "{} occurred after {:?}",
36 match self.kind {
37 TimeoutKind::Operation => "operation timeout (all attempts including retries)",
38 TimeoutKind::OperationAttempt => "operation attempt timeout (single attempt)",
39 },
40 self.duration
41 )
42 }
43}
44
45impl std::error::Error for MaybeTimeoutError {}
46
47pin_project! {
48 #[non_exhaustive]
49 #[must_use = "futures do nothing unless you `.await` or poll them"]
50 #[allow(missing_docs)]
53 #[project = MaybeTimeoutFutureProj]
54 pub(super) enum MaybeTimeoutFuture<F> {
58 Timeout {
61 #[pin]
62 future: Timeout<F, Sleep>,
63 timeout_kind: TimeoutKind,
64 duration: Duration,
65 },
66 NoTimeout {
68 #[pin]
69 future: F
70 }
71 }
72}
73
74impl<InnerFuture, T, E> Future for MaybeTimeoutFuture<InnerFuture>
75where
76 InnerFuture: Future<Output = Result<T, SdkError<E, HttpResponse>>>,
77{
78 type Output = Result<T, SdkError<E, HttpResponse>>;
79
80 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
81 let (future, kind, duration) = match self.project() {
82 MaybeTimeoutFutureProj::NoTimeout { future } => return future.poll(cx),
83 MaybeTimeoutFutureProj::Timeout {
84 future,
85 timeout_kind,
86 duration,
87 } => (future, timeout_kind, duration),
88 };
89 match future.poll(cx) {
90 Poll::Ready(Ok(response)) => Poll::Ready(response),
91 Poll::Ready(Err(_timeout)) => Poll::Ready(Err(SdkError::timeout_error(
92 MaybeTimeoutError::new(*kind, *duration),
93 ))),
94 Poll::Pending => Poll::Pending,
95 }
96 }
97}
98
99#[derive(Copy, Clone, Debug, Eq, PartialEq)]
100pub(super) enum TimeoutKind {
101 Operation,
102 OperationAttempt,
103}
104
105#[derive(Clone, Debug)]
106pub(super) struct MaybeTimeoutConfig {
107 sleep_impl: Option<SharedAsyncSleep>,
108 timeout: Option<Duration>,
109 timeout_kind: TimeoutKind,
110}
111
112impl MaybeTimeoutConfig {
113 pub(super) fn new(
114 runtime_components: &RuntimeComponents,
115 cfg: &ConfigBag,
116 timeout_kind: TimeoutKind,
117 ) -> MaybeTimeoutConfig {
118 if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
119 let sleep_impl = runtime_components.sleep_impl();
120 let timeout = match (sleep_impl.as_ref(), timeout_kind) {
121 (None, _) => None,
122 (Some(_), TimeoutKind::Operation) => timeout_config.operation_timeout(),
123 (Some(_), TimeoutKind::OperationAttempt) => {
124 timeout_config.operation_attempt_timeout()
125 }
126 };
127 MaybeTimeoutConfig {
128 sleep_impl,
129 timeout,
130 timeout_kind,
131 }
132 } else {
133 MaybeTimeoutConfig {
134 sleep_impl: None,
135 timeout: None,
136 timeout_kind,
137 }
138 }
139 }
140}
141
142pub(super) trait MaybeTimeout<T>: Sized {
144 fn maybe_timeout(self, timeout_config: MaybeTimeoutConfig) -> MaybeTimeoutFuture<Self>;
146}
147
148impl<T> MaybeTimeout<T> for T
149where
150 T: Future,
151{
152 fn maybe_timeout(self, timeout_config: MaybeTimeoutConfig) -> MaybeTimeoutFuture<Self> {
153 match timeout_config {
154 MaybeTimeoutConfig {
155 sleep_impl: Some(sleep_impl),
156 timeout: Some(timeout),
157 timeout_kind,
158 } => MaybeTimeoutFuture::Timeout {
159 future: Timeout::new(self, sleep_impl.sleep(timeout)),
160 timeout_kind,
161 duration: timeout,
162 },
163 _ => MaybeTimeoutFuture::NoTimeout { future: self },
164 }
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use aws_smithy_async::assert_elapsed;
172 use aws_smithy_async::future::never::Never;
173 use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep, TokioSleep};
174 use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
175 use aws_smithy_runtime_api::client::result::SdkError;
176 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
177 use aws_smithy_types::config_bag::{CloneableLayer, ConfigBag};
178 use aws_smithy_types::timeout::TimeoutConfig;
179 use std::time::Duration;
180
181 #[tokio::test]
182 async fn test_no_timeout() {
183 let sleep_impl = SharedAsyncSleep::new(TokioSleep::new());
184 let sleep_future = sleep_impl.sleep(Duration::from_millis(250));
185 let underlying_future = async {
186 sleep_future.await;
187 Result::<_, SdkError<(), HttpResponse>>::Ok(())
188 };
189
190 let now = tokio::time::Instant::now();
191 tokio::time::pause();
192
193 let runtime_components = RuntimeComponentsBuilder::for_tests()
194 .with_sleep_impl(Some(sleep_impl))
195 .build()
196 .unwrap();
197
198 let mut timeout_config = CloneableLayer::new("timeout");
199 timeout_config.store_put(TimeoutConfig::builder().build());
200 let cfg = ConfigBag::of_layers(vec![timeout_config.into()]);
201
202 let maybe_timeout =
203 MaybeTimeoutConfig::new(&runtime_components, &cfg, TimeoutKind::Operation);
204 underlying_future
205 .maybe_timeout(maybe_timeout)
206 .await
207 .expect("success");
208
209 assert_elapsed!(now, Duration::from_secs_f32(0.25));
210 }
211
212 #[tokio::test]
213 async fn test_operation_timeout() {
214 let sleep_impl = SharedAsyncSleep::new(TokioSleep::new());
215 let never = Never::new();
216 let underlying_future = async {
217 never.await;
218 Result::<_, SdkError<(), HttpResponse>>::Ok(())
219 };
220
221 let now = tokio::time::Instant::now();
222 tokio::time::pause();
223
224 let runtime_components = RuntimeComponentsBuilder::for_tests()
225 .with_sleep_impl(Some(sleep_impl))
226 .build()
227 .unwrap();
228 let mut timeout_config = CloneableLayer::new("timeout");
229 timeout_config.store_put(
230 TimeoutConfig::builder()
231 .operation_timeout(Duration::from_millis(250))
232 .build(),
233 );
234 let cfg = ConfigBag::of_layers(vec![timeout_config.into()]);
235
236 let maybe_timeout =
237 MaybeTimeoutConfig::new(&runtime_components, &cfg, TimeoutKind::Operation);
238 let result = underlying_future.maybe_timeout(maybe_timeout).await;
239 let err = result.expect_err("should have timed out");
240
241 assert_eq!(format!("{:?}", err), "TimeoutError(TimeoutError { source: MaybeTimeoutError { kind: Operation, duration: 250ms } })");
242 assert_elapsed!(now, Duration::from_secs_f32(0.25));
243 }
244}