aws_smithy_runtime_api/client/
orchestrator.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Client request orchestration.
7//!
8//! The orchestrator handles the full request/response lifecycle including:
9//! - Request serialization
10//! - Endpoint resolution
11//! - Identity resolution
12//! - Signing
13//! - Request transmission with retry and timeouts
14//! - Response deserialization
15//!
16//! There are several hook points in the orchestration where [interceptors](crate::client::interceptors)
17//! can read and modify the input, request, response, or output/error.
18
19use crate::box_error::BoxError;
20use crate::client::interceptors::context::phase::Phase;
21use crate::client::interceptors::context::Error;
22use crate::client::interceptors::InterceptorError;
23use crate::client::result::{ConnectorError, SdkError};
24use aws_smithy_types::config_bag::{Storable, StoreReplace};
25use bytes::Bytes;
26use std::borrow::Cow;
27use std::error::Error as StdError;
28use std::fmt;
29
30/// Type alias for the HTTP request type that the orchestrator uses.
31pub type HttpRequest = crate::http::Request;
32
33/// Type alias for the HTTP response type that the orchestrator uses.
34pub type HttpResponse = crate::http::Response;
35
36/// Informs the orchestrator on whether or not the request body needs to be loaded into memory before transmit.
37///
38/// This enum gets placed into the `ConfigBag` to change the orchestrator behavior.
39/// Immediately after serialization (before the `read_after_serialization` interceptor hook),
40/// if it was set to `Requested` in the config bag, it will be replaced back into the config bag as
41/// `Loaded` with the request body contents for use in later interceptors.
42///
43/// This all happens before the attempt loop, so the loaded request body will remain available
44/// for interceptors that run in any subsequent retry attempts.
45#[non_exhaustive]
46#[derive(Clone, Debug)]
47pub enum LoadedRequestBody {
48    /// Don't attempt to load the request body into memory.
49    NotNeeded,
50    /// Attempt to load the request body into memory.
51    Requested,
52    /// The request body is already loaded.
53    Loaded(Bytes),
54}
55
56impl Storable for LoadedRequestBody {
57    type Storer = StoreReplace<Self>;
58}
59
60/// Marker type stored in the config bag to indicate that a response body should be redacted.
61#[derive(Debug)]
62pub struct SensitiveOutput;
63
64impl Storable for SensitiveOutput {
65    type Storer = StoreReplace<Self>;
66}
67
68#[derive(Debug)]
69enum ErrorKind<E> {
70    /// An error occurred within an interceptor.
71    Interceptor { source: InterceptorError },
72    /// An error returned by a service.
73    Operation { err: E },
74    /// An error that occurs when a request times out.
75    Timeout { source: BoxError },
76    /// An error that occurs when request dispatch fails.
77    Connector { source: ConnectorError },
78    /// An error that occurs when a response can't be deserialized.
79    Response { source: BoxError },
80    /// A general orchestrator error.
81    Other { source: BoxError },
82}
83
84/// Errors that can occur while running the orchestrator.
85#[derive(Debug)]
86pub struct OrchestratorError<E> {
87    kind: ErrorKind<E>,
88}
89
90impl<E> OrchestratorError<E> {
91    /// Create a new `OrchestratorError` from the given source.
92    pub fn other(source: impl Into<Box<dyn std::error::Error + Send + Sync + 'static>>) -> Self {
93        Self {
94            kind: ErrorKind::Other {
95                source: source.into(),
96            },
97        }
98    }
99
100    /// Create an operation error.
101    pub fn operation(err: E) -> Self {
102        Self {
103            kind: ErrorKind::Operation { err },
104        }
105    }
106
107    /// True if the underlying error is an operation error.
108    pub fn is_operation_error(&self) -> bool {
109        matches!(self.kind, ErrorKind::Operation { .. })
110    }
111
112    /// Return this orchestrator error as an operation error if possible.
113    pub fn as_operation_error(&self) -> Option<&E> {
114        match &self.kind {
115            ErrorKind::Operation { err } => Some(err),
116            _ => None,
117        }
118    }
119
120    /// Create an interceptor error with the given source.
121    pub fn interceptor(source: InterceptorError) -> Self {
122        Self {
123            kind: ErrorKind::Interceptor { source },
124        }
125    }
126
127    /// True if the underlying error is an interceptor error.
128    pub fn is_interceptor_error(&self) -> bool {
129        matches!(self.kind, ErrorKind::Interceptor { .. })
130    }
131
132    /// Create a timeout error with the given source.
133    pub fn timeout(source: BoxError) -> Self {
134        Self {
135            kind: ErrorKind::Timeout { source },
136        }
137    }
138
139    /// True if the underlying error is a timeout error.
140    pub fn is_timeout_error(&self) -> bool {
141        matches!(self.kind, ErrorKind::Timeout { .. })
142    }
143
144    /// Create a response error with the given source.
145    pub fn response(source: BoxError) -> Self {
146        Self {
147            kind: ErrorKind::Response { source },
148        }
149    }
150
151    /// True if the underlying error is a response error.
152    pub fn is_response_error(&self) -> bool {
153        matches!(self.kind, ErrorKind::Response { .. })
154    }
155
156    /// Create a connector error with the given source.
157    pub fn connector(source: ConnectorError) -> Self {
158        Self {
159            kind: ErrorKind::Connector { source },
160        }
161    }
162
163    /// True if the underlying error is a [`ConnectorError`].
164    pub fn is_connector_error(&self) -> bool {
165        matches!(self.kind, ErrorKind::Connector { .. })
166    }
167
168    /// Return this orchestrator error as a connector error if possible.
169    pub fn as_connector_error(&self) -> Option<&ConnectorError> {
170        match &self.kind {
171            ErrorKind::Connector { source } => Some(source),
172            _ => None,
173        }
174    }
175
176    /// Convert the `OrchestratorError` into an [`SdkError`].
177    pub(crate) fn into_sdk_error(
178        self,
179        phase: &Phase,
180        response: Option<HttpResponse>,
181    ) -> SdkError<E, HttpResponse> {
182        match self.kind {
183            ErrorKind::Interceptor { source } => {
184                use Phase::*;
185                match phase {
186                    BeforeSerialization | Serialization => SdkError::construction_failure(source),
187                    BeforeTransmit | Transmit => match response {
188                        Some(response) => SdkError::response_error(source, response),
189                        None => {
190                            SdkError::dispatch_failure(ConnectorError::other(source.into(), None))
191                        }
192                    },
193                    BeforeDeserialization | Deserialization | AfterDeserialization => {
194                        SdkError::response_error(source, response.expect("phase has a response"))
195                    }
196                }
197            }
198            ErrorKind::Operation { err } => {
199                debug_assert!(phase.is_after_deserialization(), "operation errors are a result of successfully receiving and parsing a response from the server. Therefore, we must be in the 'After Deserialization' phase.");
200                SdkError::service_error(err, response.expect("phase has a response"))
201            }
202            ErrorKind::Connector { source } => SdkError::dispatch_failure(source),
203            ErrorKind::Timeout { source } => SdkError::timeout_error(source),
204            ErrorKind::Response { source } => SdkError::response_error(source, response.unwrap()),
205            ErrorKind::Other { source } => {
206                use Phase::*;
207                match phase {
208                    BeforeSerialization | Serialization => SdkError::construction_failure(source),
209                    BeforeTransmit | Transmit => convert_dispatch_error(source, response),
210                    BeforeDeserialization | Deserialization | AfterDeserialization => {
211                        SdkError::response_error(source, response.expect("phase has a response"))
212                    }
213                }
214            }
215        }
216    }
217
218    /// Maps the error type in `ErrorKind::Operation`
219    pub fn map_operation_error<E2>(self, map: impl FnOnce(E) -> E2) -> OrchestratorError<E2> {
220        let kind = match self.kind {
221            ErrorKind::Connector { source } => ErrorKind::Connector { source },
222            ErrorKind::Operation { err } => ErrorKind::Operation { err: map(err) },
223            ErrorKind::Interceptor { source } => ErrorKind::Interceptor { source },
224            ErrorKind::Response { source } => ErrorKind::Response { source },
225            ErrorKind::Timeout { source } => ErrorKind::Timeout { source },
226            ErrorKind::Other { source } => ErrorKind::Other { source },
227        };
228        OrchestratorError { kind }
229    }
230}
231
232impl<E> StdError for OrchestratorError<E>
233where
234    E: StdError + 'static,
235{
236    fn source(&self) -> Option<&(dyn StdError + 'static)> {
237        Some(match &self.kind {
238            ErrorKind::Connector { source } => source as _,
239            ErrorKind::Operation { err } => err as _,
240            ErrorKind::Interceptor { source } => source as _,
241            ErrorKind::Response { source } => source.as_ref(),
242            ErrorKind::Timeout { source } => source.as_ref(),
243            ErrorKind::Other { source } => source.as_ref(),
244        })
245    }
246}
247
248impl<E> fmt::Display for OrchestratorError<E> {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        f.write_str(match self.kind {
251            ErrorKind::Connector { .. } => "connector error",
252            ErrorKind::Operation { .. } => "operation error",
253            ErrorKind::Interceptor { .. } => "interceptor error",
254            ErrorKind::Response { .. } => "response error",
255            ErrorKind::Timeout { .. } => "timeout",
256            ErrorKind::Other { .. } => "an unknown error occurred",
257        })
258    }
259}
260
261fn convert_dispatch_error<O>(
262    err: BoxError,
263    response: Option<HttpResponse>,
264) -> SdkError<O, HttpResponse> {
265    let err = match err.downcast::<ConnectorError>() {
266        Ok(connector_error) => {
267            return SdkError::dispatch_failure(*connector_error);
268        }
269        Err(e) => e,
270    };
271    match response {
272        Some(response) => SdkError::response_error(err, response),
273        None => SdkError::dispatch_failure(ConnectorError::other(err, None)),
274    }
275}
276
277impl<E> From<InterceptorError> for OrchestratorError<E>
278where
279    E: fmt::Debug + std::error::Error + 'static,
280{
281    fn from(err: InterceptorError) -> Self {
282        Self::interceptor(err)
283    }
284}
285
286impl From<Error> for OrchestratorError<Error> {
287    fn from(err: Error) -> Self {
288        Self::operation(err)
289    }
290}
291
292/// Metadata added to the [`ConfigBag`](aws_smithy_types::config_bag::ConfigBag) that identifies the API being called.
293#[derive(Clone, Debug)]
294pub struct Metadata {
295    operation: Cow<'static, str>,
296    service: Cow<'static, str>,
297}
298
299impl Metadata {
300    /// Returns the operation name.
301    pub fn name(&self) -> &str {
302        &self.operation
303    }
304
305    /// Returns the service name.
306    pub fn service(&self) -> &str {
307        &self.service
308    }
309
310    /// Creates [`Metadata`].
311    pub fn new(
312        operation: impl Into<Cow<'static, str>>,
313        service: impl Into<Cow<'static, str>>,
314    ) -> Self {
315        Metadata {
316            operation: operation.into(),
317            service: service.into(),
318        }
319    }
320}
321
322impl Storable for Metadata {
323    type Storer = StoreReplace<Self>;
324}