aws_smithy_runtime/client/
defaults.rs
1use crate::client::http::body::content_length_enforcement::EnforceContentLengthRuntimePlugin;
13use crate::client::identity::IdentityCache;
14use crate::client::retries::strategy::standard::TokenBucketProvider;
15use crate::client::retries::strategy::StandardRetryStrategy;
16use crate::client::retries::RetryPartition;
17use aws_smithy_async::rt::sleep::default_async_sleep;
18use aws_smithy_async::time::SystemTimeSource;
19use aws_smithy_runtime_api::box_error::BoxError;
20use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
21use aws_smithy_runtime_api::client::http::SharedHttpClient;
22use aws_smithy_runtime_api::client::runtime_components::{
23 RuntimeComponentsBuilder, SharedConfigValidator,
24};
25use aws_smithy_runtime_api::client::runtime_plugin::{
26 Order, SharedRuntimePlugin, StaticRuntimePlugin,
27};
28use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
29use aws_smithy_runtime_api::shared::IntoShared;
30use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
31use aws_smithy_types::retry::RetryConfig;
32use aws_smithy_types::timeout::TimeoutConfig;
33use std::borrow::Cow;
34use std::time::Duration;
35
36fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
37where
38 CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
39{
40 StaticRuntimePlugin::new()
41 .with_order(Order::Defaults)
42 .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
43}
44
45fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
46where
47 LayerFn: FnOnce(&mut Layer),
48{
49 let mut layer = Layer::new(name);
50 (layer_fn)(&mut layer);
51 layer.freeze()
52}
53
54pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
56 let _default: Option<SharedHttpClient> = None;
57 #[cfg(feature = "connector-hyper-0-14-x")]
58 let _default = crate::client::http::hyper_014::default_client();
59
60 _default.map(|default| {
61 default_plugin("default_http_client_plugin", |components| {
62 components.with_http_client(Some(default))
63 })
64 .into_shared()
65 })
66}
67
68pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
70 default_async_sleep().map(|default| {
71 default_plugin("default_sleep_impl_plugin", |components| {
72 components.with_sleep_impl(Some(default))
73 })
74 .into_shared()
75 })
76}
77
78pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
80 Some(
81 default_plugin("default_time_source_plugin", |components| {
82 components.with_time_source(Some(SystemTimeSource::new()))
83 })
84 .into_shared(),
85 )
86}
87
88pub fn default_retry_config_plugin(
90 default_partition_name: impl Into<Cow<'static, str>>,
91) -> Option<SharedRuntimePlugin> {
92 let retry_partition = RetryPartition::new(default_partition_name);
93 Some(
94 default_plugin("default_retry_config_plugin", |components| {
95 components
96 .with_retry_strategy(Some(StandardRetryStrategy::new()))
97 .with_config_validator(SharedConfigValidator::base_client_config_fn(
98 validate_retry_config,
99 ))
100 .with_interceptor(TokenBucketProvider::new(retry_partition.clone()))
101 })
102 .with_config(layer("default_retry_config", |layer| {
103 layer.store_put(RetryConfig::disabled());
104 layer.store_put(retry_partition);
105 }))
106 .into_shared(),
107 )
108}
109
110fn validate_retry_config(
111 components: &RuntimeComponentsBuilder,
112 cfg: &ConfigBag,
113) -> Result<(), BoxError> {
114 if let Some(retry_config) = cfg.load::<RetryConfig>() {
115 if retry_config.has_retry() && components.sleep_impl().is_none() {
116 Err("An async sleep implementation is required for retry to work. Please provide a `sleep_impl` on \
117 the config, or disable timeouts.".into())
118 } else {
119 Ok(())
120 }
121 } else {
122 Err(
123 "The default retry config was removed, and no other config was put in its place."
124 .into(),
125 )
126 }
127}
128
129pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
131 Some(
132 default_plugin("default_timeout_config_plugin", |components| {
133 components.with_config_validator(SharedConfigValidator::base_client_config_fn(
134 validate_timeout_config,
135 ))
136 })
137 .with_config(layer("default_timeout_config", |layer| {
138 layer.store_put(TimeoutConfig::disabled());
139 }))
140 .into_shared(),
141 )
142}
143
144fn validate_timeout_config(
145 components: &RuntimeComponentsBuilder,
146 cfg: &ConfigBag,
147) -> Result<(), BoxError> {
148 if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
149 if timeout_config.has_timeouts() && components.sleep_impl().is_none() {
150 Err("An async sleep implementation is required for timeouts to work. Please provide a `sleep_impl` on \
151 the config, or disable timeouts.".into())
152 } else {
153 Ok(())
154 }
155 } else {
156 Err(
157 "The default timeout config was removed, and no other config was put in its place."
158 .into(),
159 )
160 }
161}
162
163pub fn default_identity_cache_plugin() -> Option<SharedRuntimePlugin> {
165 Some(
166 default_plugin("default_identity_cache_plugin", |components| {
167 components.with_identity_cache(Some(IdentityCache::lazy().build()))
168 })
169 .into_shared(),
170 )
171}
172
173#[deprecated(
178 since = "1.2.0",
179 note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
180)]
181pub fn default_stalled_stream_protection_config_plugin() -> Option<SharedRuntimePlugin> {
182 #[allow(deprecated)]
183 default_stalled_stream_protection_config_plugin_v2(BehaviorVersion::v2023_11_09())
184}
185fn default_stalled_stream_protection_config_plugin_v2(
186 behavior_version: BehaviorVersion,
187) -> Option<SharedRuntimePlugin> {
188 Some(
189 default_plugin(
190 "default_stalled_stream_protection_config_plugin",
191 |components| {
192 components.with_config_validator(SharedConfigValidator::base_client_config_fn(
193 validate_stalled_stream_protection_config,
194 ))
195 },
196 )
197 .with_config(layer("default_stalled_stream_protection_config", |layer| {
198 let mut config =
199 StalledStreamProtectionConfig::enabled().grace_period(Duration::from_secs(5));
200 if !behavior_version.is_at_least(BehaviorVersion::v2024_03_28()) {
202 config = config.upload_enabled(false);
203 }
204 layer.store_put(config.build());
205 }))
206 .into_shared(),
207 )
208}
209
210fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
211 Some(EnforceContentLengthRuntimePlugin::new().into_shared())
212}
213
214fn validate_stalled_stream_protection_config(
215 components: &RuntimeComponentsBuilder,
216 cfg: &ConfigBag,
217) -> Result<(), BoxError> {
218 if let Some(stalled_stream_protection_config) = cfg.load::<StalledStreamProtectionConfig>() {
219 if stalled_stream_protection_config.is_enabled() {
220 if components.sleep_impl().is_none() {
221 return Err(
222 "An async sleep implementation is required for stalled stream protection to work. \
223 Please provide a `sleep_impl` on the config, or disable stalled stream protection.".into());
224 }
225
226 if components.time_source().is_none() {
227 return Err(
228 "A time source is required for stalled stream protection to work.\
229 Please provide a `time_source` on the config, or disable stalled stream protection.".into());
230 }
231 }
232
233 Ok(())
234 } else {
235 Err(
236 "The default stalled stream protection config was removed, and no other config was put in its place."
237 .into(),
238 )
239 }
240}
241
242#[non_exhaustive]
246#[derive(Debug, Default)]
247pub struct DefaultPluginParams {
248 retry_partition_name: Option<Cow<'static, str>>,
249 behavior_version: Option<BehaviorVersion>,
250}
251
252impl DefaultPluginParams {
253 pub fn new() -> Self {
255 Default::default()
256 }
257
258 pub fn with_retry_partition_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
260 self.retry_partition_name = Some(name.into());
261 self
262 }
263
264 pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
266 self.behavior_version = Some(version);
267 self
268 }
269}
270
271pub fn default_plugins(
273 params: DefaultPluginParams,
274) -> impl IntoIterator<Item = SharedRuntimePlugin> {
275 let behavior_version = params
276 .behavior_version
277 .unwrap_or_else(BehaviorVersion::latest);
278
279 [
280 default_http_client_plugin(),
281 default_identity_cache_plugin(),
282 default_retry_config_plugin(
283 params
284 .retry_partition_name
285 .expect("retry_partition_name is required"),
286 ),
287 default_sleep_impl_plugin(),
288 default_time_source_plugin(),
289 default_timeout_config_plugin(),
290 enforce_content_length_runtime_plugin(),
291 default_stalled_stream_protection_config_plugin_v2(behavior_version),
292 ]
293 .into_iter()
294 .flatten()
295 .collect::<Vec<SharedRuntimePlugin>>()
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
302
303 fn test_plugin_params(version: BehaviorVersion) -> DefaultPluginParams {
304 DefaultPluginParams::new()
305 .with_behavior_version(version)
306 .with_retry_partition_name("dontcare")
307 }
308 fn config_for(plugins: impl IntoIterator<Item = SharedRuntimePlugin>) -> ConfigBag {
309 let mut config = ConfigBag::base();
310 let plugins = RuntimePlugins::new().with_client_plugins(plugins);
311 plugins.apply_client_configuration(&mut config).unwrap();
312 config
313 }
314
315 #[test]
316 #[allow(deprecated)]
317 fn v2024_03_28_stalled_stream_protection_difference() {
318 let latest = config_for(default_plugins(test_plugin_params(
319 BehaviorVersion::latest(),
320 )));
321 let v2023 = config_for(default_plugins(test_plugin_params(
322 BehaviorVersion::v2023_11_09(),
323 )));
324
325 assert!(
326 latest
327 .load::<StalledStreamProtectionConfig>()
328 .unwrap()
329 .upload_enabled(),
330 "stalled stream protection on uploads MUST be enabled after v2024_03_28"
331 );
332 assert!(
333 !v2023
334 .load::<StalledStreamProtectionConfig>()
335 .unwrap()
336 .upload_enabled(),
337 "stalled stream protection on uploads MUST NOT be enabled before v2024_03_28"
338 );
339 }
340}