aws_smithy_runtime/client/
defaults.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Runtime plugins that provide defaults for clients.
7//!
8//! Note: these are the absolute base-level defaults. They may not be the defaults
9//! for _your_ client, since many things can change these defaults on the way to
10//! code generating and constructing a full client.
11
12use crate::client::http::body::content_length_enforcement::EnforceContentLengthRuntimePlugin;
13use crate::client::identity::IdentityCache;
14use crate::client::retries::strategy::standard::TokenBucketProvider;
15use crate::client::retries::strategy::StandardRetryStrategy;
16use crate::client::retries::RetryPartition;
17use aws_smithy_async::rt::sleep::default_async_sleep;
18use aws_smithy_async::time::SystemTimeSource;
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
21use aws_smithy_runtime_api::client::http::SharedHttpClient;
22use aws_smithy_runtime_api::client::runtime_components::{
23    RuntimeComponentsBuilder, SharedConfigValidator,
24};
25use aws_smithy_runtime_api::client::runtime_plugin::{
26    Order, SharedRuntimePlugin, StaticRuntimePlugin,
27};
28use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
29use aws_smithy_runtime_api::shared::IntoShared;
30use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
31use aws_smithy_types::retry::RetryConfig;
32use aws_smithy_types::timeout::TimeoutConfig;
33use std::borrow::Cow;
34use std::time::Duration;
35
36fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
37where
38    CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
39{
40    StaticRuntimePlugin::new()
41        .with_order(Order::Defaults)
42        .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
43}
44
45fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
46where
47    LayerFn: FnOnce(&mut Layer),
48{
49    let mut layer = Layer::new(name);
50    (layer_fn)(&mut layer);
51    layer.freeze()
52}
53
54/// Runtime plugin that provides a default connector.
55pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
56    let _default: Option<SharedHttpClient> = None;
57    #[cfg(feature = "connector-hyper-0-14-x")]
58    let _default = crate::client::http::hyper_014::default_client();
59
60    _default.map(|default| {
61        default_plugin("default_http_client_plugin", |components| {
62            components.with_http_client(Some(default))
63        })
64        .into_shared()
65    })
66}
67
68/// Runtime plugin that provides a default async sleep implementation.
69pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
70    default_async_sleep().map(|default| {
71        default_plugin("default_sleep_impl_plugin", |components| {
72            components.with_sleep_impl(Some(default))
73        })
74        .into_shared()
75    })
76}
77
78/// Runtime plugin that provides a default time source.
79pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
80    Some(
81        default_plugin("default_time_source_plugin", |components| {
82            components.with_time_source(Some(SystemTimeSource::new()))
83        })
84        .into_shared(),
85    )
86}
87
88/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
89pub fn default_retry_config_plugin(
90    default_partition_name: impl Into<Cow<'static, str>>,
91) -> Option<SharedRuntimePlugin> {
92    let retry_partition = RetryPartition::new(default_partition_name);
93    Some(
94        default_plugin("default_retry_config_plugin", |components| {
95            components
96                .with_retry_strategy(Some(StandardRetryStrategy::new()))
97                .with_config_validator(SharedConfigValidator::base_client_config_fn(
98                    validate_retry_config,
99                ))
100                .with_interceptor(TokenBucketProvider::new(retry_partition.clone()))
101        })
102        .with_config(layer("default_retry_config", |layer| {
103            layer.store_put(RetryConfig::disabled());
104            layer.store_put(retry_partition);
105        }))
106        .into_shared(),
107    )
108}
109
110fn validate_retry_config(
111    components: &RuntimeComponentsBuilder,
112    cfg: &ConfigBag,
113) -> Result<(), BoxError> {
114    if let Some(retry_config) = cfg.load::<RetryConfig>() {
115        if retry_config.has_retry() && components.sleep_impl().is_none() {
116            Err("An async sleep implementation is required for retry to work. Please provide a `sleep_impl` on \
117                 the config, or disable timeouts.".into())
118        } else {
119            Ok(())
120        }
121    } else {
122        Err(
123            "The default retry config was removed, and no other config was put in its place."
124                .into(),
125        )
126    }
127}
128
129/// Runtime plugin that sets the default timeout config (no timeouts).
130pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
131    Some(
132        default_plugin("default_timeout_config_plugin", |components| {
133            components.with_config_validator(SharedConfigValidator::base_client_config_fn(
134                validate_timeout_config,
135            ))
136        })
137        .with_config(layer("default_timeout_config", |layer| {
138            layer.store_put(TimeoutConfig::disabled());
139        }))
140        .into_shared(),
141    )
142}
143
144fn validate_timeout_config(
145    components: &RuntimeComponentsBuilder,
146    cfg: &ConfigBag,
147) -> Result<(), BoxError> {
148    if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
149        if timeout_config.has_timeouts() && components.sleep_impl().is_none() {
150            Err("An async sleep implementation is required for timeouts to work. Please provide a `sleep_impl` on \
151                 the config, or disable timeouts.".into())
152        } else {
153            Ok(())
154        }
155    } else {
156        Err(
157            "The default timeout config was removed, and no other config was put in its place."
158                .into(),
159        )
160    }
161}
162
163/// Runtime plugin that registers the default identity cache implementation.
164pub fn default_identity_cache_plugin() -> Option<SharedRuntimePlugin> {
165    Some(
166        default_plugin("default_identity_cache_plugin", |components| {
167            components.with_identity_cache(Some(IdentityCache::lazy().build()))
168        })
169        .into_shared(),
170    )
171}
172
173/// Runtime plugin that sets the default stalled stream protection config.
174///
175/// By default, when throughput falls below 1/Bs for more than 5 seconds, the
176/// stream is cancelled.
177#[deprecated(
178    since = "1.2.0",
179    note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
180)]
181pub fn default_stalled_stream_protection_config_plugin() -> Option<SharedRuntimePlugin> {
182    #[allow(deprecated)]
183    default_stalled_stream_protection_config_plugin_v2(BehaviorVersion::v2023_11_09())
184}
185fn default_stalled_stream_protection_config_plugin_v2(
186    behavior_version: BehaviorVersion,
187) -> Option<SharedRuntimePlugin> {
188    Some(
189        default_plugin(
190            "default_stalled_stream_protection_config_plugin",
191            |components| {
192                components.with_config_validator(SharedConfigValidator::base_client_config_fn(
193                    validate_stalled_stream_protection_config,
194                ))
195            },
196        )
197        .with_config(layer("default_stalled_stream_protection_config", |layer| {
198            let mut config =
199                StalledStreamProtectionConfig::enabled().grace_period(Duration::from_secs(5));
200            // Before v2024_03_28, upload streams did not have stalled stream protection by default
201            if !behavior_version.is_at_least(BehaviorVersion::v2024_03_28()) {
202                config = config.upload_enabled(false);
203            }
204            layer.store_put(config.build());
205        }))
206        .into_shared(),
207    )
208}
209
210fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
211    Some(EnforceContentLengthRuntimePlugin::new().into_shared())
212}
213
214fn validate_stalled_stream_protection_config(
215    components: &RuntimeComponentsBuilder,
216    cfg: &ConfigBag,
217) -> Result<(), BoxError> {
218    if let Some(stalled_stream_protection_config) = cfg.load::<StalledStreamProtectionConfig>() {
219        if stalled_stream_protection_config.is_enabled() {
220            if components.sleep_impl().is_none() {
221                return Err(
222                    "An async sleep implementation is required for stalled stream protection to work. \
223                     Please provide a `sleep_impl` on the config, or disable stalled stream protection.".into());
224            }
225
226            if components.time_source().is_none() {
227                return Err(
228                    "A time source is required for stalled stream protection to work.\
229                     Please provide a `time_source` on the config, or disable stalled stream protection.".into());
230            }
231        }
232
233        Ok(())
234    } else {
235        Err(
236            "The default stalled stream protection config was removed, and no other config was put in its place."
237                .into(),
238        )
239    }
240}
241
242/// Arguments for the [`default_plugins`] method.
243///
244/// This is a struct to enable adding new parameters in the future without breaking the API.
245#[non_exhaustive]
246#[derive(Debug, Default)]
247pub struct DefaultPluginParams {
248    retry_partition_name: Option<Cow<'static, str>>,
249    behavior_version: Option<BehaviorVersion>,
250}
251
252impl DefaultPluginParams {
253    /// Creates a new [`DefaultPluginParams`].
254    pub fn new() -> Self {
255        Default::default()
256    }
257
258    /// Sets the retry partition name.
259    pub fn with_retry_partition_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
260        self.retry_partition_name = Some(name.into());
261        self
262    }
263
264    /// Sets the behavior major version.
265    pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
266        self.behavior_version = Some(version);
267        self
268    }
269}
270
271/// All default plugins.
272pub fn default_plugins(
273    params: DefaultPluginParams,
274) -> impl IntoIterator<Item = SharedRuntimePlugin> {
275    let behavior_version = params
276        .behavior_version
277        .unwrap_or_else(BehaviorVersion::latest);
278
279    [
280        default_http_client_plugin(),
281        default_identity_cache_plugin(),
282        default_retry_config_plugin(
283            params
284                .retry_partition_name
285                .expect("retry_partition_name is required"),
286        ),
287        default_sleep_impl_plugin(),
288        default_time_source_plugin(),
289        default_timeout_config_plugin(),
290        enforce_content_length_runtime_plugin(),
291        default_stalled_stream_protection_config_plugin_v2(behavior_version),
292    ]
293    .into_iter()
294    .flatten()
295    .collect::<Vec<SharedRuntimePlugin>>()
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
302
303    fn test_plugin_params(version: BehaviorVersion) -> DefaultPluginParams {
304        DefaultPluginParams::new()
305            .with_behavior_version(version)
306            .with_retry_partition_name("dontcare")
307    }
308    fn config_for(plugins: impl IntoIterator<Item = SharedRuntimePlugin>) -> ConfigBag {
309        let mut config = ConfigBag::base();
310        let plugins = RuntimePlugins::new().with_client_plugins(plugins);
311        plugins.apply_client_configuration(&mut config).unwrap();
312        config
313    }
314
315    #[test]
316    #[allow(deprecated)]
317    fn v2024_03_28_stalled_stream_protection_difference() {
318        let latest = config_for(default_plugins(test_plugin_params(
319            BehaviorVersion::latest(),
320        )));
321        let v2023 = config_for(default_plugins(test_plugin_params(
322            BehaviorVersion::v2023_11_09(),
323        )));
324
325        assert!(
326            latest
327                .load::<StalledStreamProtectionConfig>()
328                .unwrap()
329                .upload_enabled(),
330            "stalled stream protection on uploads MUST be enabled after v2024_03_28"
331        );
332        assert!(
333            !v2023
334                .load::<StalledStreamProtectionConfig>()
335                .unwrap()
336                .upload_enabled(),
337            "stalled stream protection on uploads MUST NOT be enabled before v2024_03_28"
338        );
339    }
340}