aws_smithy_runtime/client/http/
hyper_014.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6use crate::client::http::connection_poisoning::CaptureSmithyConnection;
7use crate::client::http::hyper_014::timeout_middleware::HttpTimeoutError;
8use aws_smithy_async::future::timeout::TimedOutError;
9use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
10use aws_smithy_runtime_api::box_error::BoxError;
11use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
12use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
13use aws_smithy_runtime_api::client::http::{
14    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
15    SharedHttpConnector,
16};
17use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
18use aws_smithy_runtime_api::client::result::ConnectorError;
19use aws_smithy_runtime_api::client::runtime_components::{
20    RuntimeComponents, RuntimeComponentsBuilder,
21};
22use aws_smithy_runtime_api::shared::IntoShared;
23use aws_smithy_types::body::SdkBody;
24use aws_smithy_types::config_bag::ConfigBag;
25use aws_smithy_types::error::display::DisplayErrorContext;
26use aws_smithy_types::retry::ErrorKind;
27use h2::Reason;
28use hyper_0_14::client::connect::{capture_connection, CaptureConnection, Connection, HttpInfo};
29use std::borrow::Cow;
30use std::collections::HashMap;
31use std::error::Error;
32use std::fmt;
33use std::sync::RwLock;
34use std::time::Duration;
35use tokio::io::{AsyncRead, AsyncWrite};
36
37#[cfg(feature = "tls-rustls")]
38mod default_connector {
39    use aws_smithy_async::rt::sleep::SharedAsyncSleep;
40    use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
41
42    // Creating a `with_native_roots` HTTP client takes 300ms on OS X. Cache this so that we
43    // don't need to repeatedly incur that cost.
44    pub(crate) static HTTPS_NATIVE_ROOTS: once_cell::sync::Lazy<
45        hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector>,
46    > = once_cell::sync::Lazy::new(default_tls);
47
48    fn default_tls() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
49        use hyper_rustls::ConfigBuilderExt;
50        hyper_rustls::HttpsConnectorBuilder::new()
51               .with_tls_config(
52                rustls::ClientConfig::builder()
53                    .with_cipher_suites(&[
54                        // TLS1.3 suites
55                        rustls::cipher_suite::TLS13_AES_256_GCM_SHA384,
56                        rustls::cipher_suite::TLS13_AES_128_GCM_SHA256,
57                        // TLS1.2 suites
58                        rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
59                        rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
60                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
61                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
62                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
63                    ])
64                    .with_safe_default_kx_groups()
65                    .with_safe_default_protocol_versions()
66                    .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.")
67                    .with_native_roots()
68                    .with_no_client_auth()
69            )
70            .https_or_http()
71            .enable_http1()
72            .enable_http2()
73            .build()
74    }
75
76    pub(super) fn base(
77        settings: &HttpConnectorSettings,
78        sleep: Option<SharedAsyncSleep>,
79    ) -> super::HyperConnectorBuilder {
80        let mut hyper = super::HyperConnector::builder().connector_settings(settings.clone());
81        if let Some(sleep) = sleep {
82            hyper = hyper.sleep_impl(sleep);
83        }
84        hyper
85    }
86
87    /// Return a default HTTPS connector backed by the `rustls` crate.
88    ///
89    /// It requires a minimum TLS version of 1.2.
90    /// It allows you to connect to both `http` and `https` URLs.
91    pub(super) fn https() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
92        HTTPS_NATIVE_ROOTS.clone()
93    }
94}
95
96/// Given `HttpConnectorSettings` and an `SharedAsyncSleep`, create a `SharedHttpConnector` from defaults depending on what cargo features are activated.
97pub fn default_connector(
98    settings: &HttpConnectorSettings,
99    sleep: Option<SharedAsyncSleep>,
100) -> Option<SharedHttpConnector> {
101    #[cfg(feature = "tls-rustls")]
102    {
103        tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
104        let hyper = default_connector::base(settings, sleep).build_https();
105        Some(SharedHttpConnector::new(hyper))
106    }
107    #[cfg(not(feature = "tls-rustls"))]
108    {
109        tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
110        None
111    }
112}
113
114/// Creates a hyper-backed HTTPS client from defaults depending on what cargo features are activated.
115pub fn default_client() -> Option<SharedHttpClient> {
116    #[cfg(feature = "tls-rustls")]
117    {
118        tracing::trace!("creating a new default hyper 0.14.x client");
119        Some(HyperClientBuilder::new().build_https())
120    }
121    #[cfg(not(feature = "tls-rustls"))]
122    {
123        tracing::trace!("no default connector available");
124        None
125    }
126}
127
128/// [`HttpConnector`] that uses [`hyper_0_14`] to make HTTP requests.
129///
130/// This connector also implements socket connect and read timeouts.
131///
132/// This shouldn't be used directly in most cases.
133/// See the docs on [`HyperClientBuilder`] for examples of how
134/// to customize the Hyper client.
135#[derive(Debug)]
136pub struct HyperConnector {
137    adapter: Box<dyn HttpConnector>,
138}
139
140impl HyperConnector {
141    /// Builder for a Hyper connector.
142    pub fn builder() -> HyperConnectorBuilder {
143        Default::default()
144    }
145}
146
147impl HttpConnector for HyperConnector {
148    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
149        self.adapter.call(request)
150    }
151}
152
153/// Builder for [`HyperConnector`].
154#[derive(Default, Debug)]
155pub struct HyperConnectorBuilder {
156    connector_settings: Option<HttpConnectorSettings>,
157    sleep_impl: Option<SharedAsyncSleep>,
158    client_builder: Option<hyper_0_14::client::Builder>,
159}
160
161impl HyperConnectorBuilder {
162    /// Create a [`HyperConnector`] from this builder and a given connector.
163    pub fn build<C>(self, tcp_connector: C) -> HyperConnector
164    where
165        C: Clone + Send + Sync + 'static,
166        C: hyper_0_14::service::Service<http_02x::Uri>,
167        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
168        C::Future: Unpin + Send + 'static,
169        C::Error: Into<BoxError>,
170    {
171        let client_builder = self.client_builder.unwrap_or_default();
172        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
173        let (connect_timeout, read_timeout) = self
174            .connector_settings
175            .map(|c| (c.connect_timeout(), c.read_timeout()))
176            .unwrap_or((None, None));
177
178        let connector = match connect_timeout {
179            Some(duration) => timeout_middleware::ConnectTimeout::new(
180                tcp_connector,
181                sleep_impl
182                    .clone()
183                    .expect("a sleep impl must be provided in order to have a connect timeout"),
184                duration,
185            ),
186            None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
187        };
188        let base = client_builder.build(connector);
189        let read_timeout = match read_timeout {
190            Some(duration) => timeout_middleware::HttpReadTimeout::new(
191                base,
192                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
193                duration,
194            ),
195            None => timeout_middleware::HttpReadTimeout::no_timeout(base),
196        };
197        HyperConnector {
198            adapter: Box::new(Adapter {
199                client: read_timeout,
200            }),
201        }
202    }
203
204    /// Create a [`HyperConnector`] with the default rustls HTTPS implementation.
205    #[cfg(feature = "tls-rustls")]
206    pub fn build_https(self) -> HyperConnector {
207        self.build(default_connector::https())
208    }
209
210    /// Set the async sleep implementation used for timeouts
211    ///
212    /// Calling this is only necessary for testing or to use something other than
213    /// [`default_async_sleep`].
214    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
215        self.sleep_impl = Some(sleep_impl.into_shared());
216        self
217    }
218
219    /// Set the async sleep implementation used for timeouts
220    ///
221    /// Calling this is only necessary for testing or to use something other than
222    /// [`default_async_sleep`].
223    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
224        self.sleep_impl = sleep_impl;
225        self
226    }
227
228    /// Configure the HTTP settings for the `HyperAdapter`
229    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
230        self.connector_settings = Some(connector_settings);
231        self
232    }
233
234    /// Configure the HTTP settings for the `HyperAdapter`
235    pub fn set_connector_settings(
236        &mut self,
237        connector_settings: Option<HttpConnectorSettings>,
238    ) -> &mut Self {
239        self.connector_settings = connector_settings;
240        self
241    }
242
243    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
244    ///
245    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
246    pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
247        self.client_builder = Some(hyper_builder);
248        self
249    }
250
251    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
252    ///
253    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
254    pub fn set_hyper_builder(
255        &mut self,
256        hyper_builder: Option<hyper_0_14::client::Builder>,
257    ) -> &mut Self {
258        self.client_builder = hyper_builder;
259        self
260    }
261}
262
263/// Adapter from a [`hyper_0_14::Client`] to [`HttpConnector`].
264///
265/// This adapter also enables TCP `CONNECT` and HTTP `READ` timeouts via [`HyperConnector::builder`].
266struct Adapter<C> {
267    client: timeout_middleware::HttpReadTimeout<
268        hyper_0_14::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
269    >,
270}
271
272impl<C> fmt::Debug for Adapter<C> {
273    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274        f.debug_struct("Adapter")
275            .field("client", &"** hyper client **")
276            .finish()
277    }
278}
279
280/// Extract a smithy connection from a hyper CaptureConnection
281fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
282    let capture_conn = capture_conn.clone();
283    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
284        let mut extensions = http_02x::Extensions::new();
285        conn.get_extras(&mut extensions);
286        let http_info = extensions.get::<HttpInfo>();
287        let mut builder = ConnectionMetadata::builder()
288            .proxied(conn.is_proxied())
289            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
290                Some(conn) => conn.poison(),
291                None => tracing::trace!("no connection existed to poison"),
292            });
293
294        builder
295            .set_local_addr(http_info.map(|info| info.local_addr()))
296            .set_remote_addr(http_info.map(|info| info.remote_addr()));
297
298        let smithy_connection = builder.build();
299
300        Some(smithy_connection)
301    } else {
302        None
303    }
304}
305
306impl<C> HttpConnector for Adapter<C>
307where
308    C: Clone + Send + Sync + 'static,
309    C: hyper_0_14::service::Service<http_02x::Uri>,
310    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
311    C::Future: Unpin + Send + 'static,
312    C::Error: Into<BoxError>,
313{
314    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
315        use hyper_0_14::service::Service;
316
317        let mut request = match request.try_into_http02x() {
318            Ok(request) => request,
319            Err(err) => {
320                return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
321            }
322        };
323        let capture_connection = capture_connection(&mut request);
324        if let Some(capture_smithy_connection) =
325            request.extensions().get::<CaptureSmithyConnection>()
326        {
327            capture_smithy_connection
328                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
329        }
330        let mut client = self.client.clone();
331        let fut = client.call(request);
332        HttpConnectorFuture::new(async move {
333            let response = fut
334                .await
335                .map_err(downcast_error)?
336                .map(SdkBody::from_body_0_4);
337            match HttpResponse::try_from(response) {
338                Ok(response) => Ok(response),
339                Err(err) => Err(ConnectorError::other(err.into(), None)),
340            }
341        })
342    }
343}
344
345/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
346fn downcast_error(err: BoxError) -> ConnectorError {
347    // is a `TimedOutError` (from aws_smithy_async::timeout) in the chain? if it is, this is a timeout
348    if find_source::<TimedOutError>(err.as_ref()).is_some() {
349        return ConnectorError::timeout(err);
350    }
351    // is the top of chain error actually already a `ConnectorError`? return that directly
352    let err = match err.downcast::<ConnectorError>() {
353        Ok(connector_error) => return *connector_error,
354        Err(box_error) => box_error,
355    };
356    // generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
357    // error classifications
358    let err = match err.downcast::<hyper_0_14::Error>() {
359        Ok(hyper_error) => return to_connector_error(*hyper_error),
360        Err(box_error) => box_error,
361    };
362
363    // otherwise, we have no idea!
364    ConnectorError::other(err, None)
365}
366
367/// Convert a [`hyper_0_14::Error`] into a [`ConnectorError`]
368fn to_connector_error(err: hyper_0_14::Error) -> ConnectorError {
369    if err.is_timeout() || find_source::<HttpTimeoutError>(&err).is_some() {
370        return ConnectorError::timeout(err.into());
371    }
372    if err.is_user() {
373        return ConnectorError::user(err.into());
374    }
375    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(&err).is_some() {
376        return ConnectorError::io(err.into());
377    }
378    // We sometimes receive this from S3: hyper::Error(IncompleteMessage)
379    if err.is_incomplete_message() {
380        return ConnectorError::other(err.into(), Some(ErrorKind::TransientError));
381    }
382    if let Some(h2_err) = find_source::<h2::Error>(&err) {
383        if h2_err.is_go_away()
384            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
385        {
386            return ConnectorError::io(err.into());
387        }
388    }
389
390    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
391    ConnectorError::other(err.into(), None)
392}
393
394fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
395    let mut next = Some(err);
396    while let Some(err) = next {
397        if let Some(matching_err) = err.downcast_ref::<E>() {
398            return Some(matching_err);
399        }
400        next = err.source();
401    }
402    None
403}
404
405#[derive(Clone, Debug, Eq, PartialEq, Hash)]
406struct CacheKey {
407    connect_timeout: Option<Duration>,
408    read_timeout: Option<Duration>,
409}
410
411impl From<&HttpConnectorSettings> for CacheKey {
412    fn from(value: &HttpConnectorSettings) -> Self {
413        Self {
414            connect_timeout: value.connect_timeout(),
415            read_timeout: value.read_timeout(),
416        }
417    }
418}
419
420struct HyperClient<F> {
421    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
422    client_builder: hyper_0_14::client::Builder,
423    tcp_connector_fn: F,
424}
425
426impl<F> fmt::Debug for HyperClient<F> {
427    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
428        f.debug_struct("HyperClient")
429            .field("connector_cache", &self.connector_cache)
430            .field("client_builder", &self.client_builder)
431            .finish()
432    }
433}
434
435impl<C, F> HttpClient for HyperClient<F>
436where
437    F: Fn() -> C + Send + Sync,
438    C: Clone + Send + Sync + 'static,
439    C: hyper_0_14::service::Service<http_02x::Uri>,
440    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
441    C::Future: Unpin + Send + 'static,
442    C::Error: Into<BoxError>,
443{
444    fn http_connector(
445        &self,
446        settings: &HttpConnectorSettings,
447        components: &RuntimeComponents,
448    ) -> SharedHttpConnector {
449        let key = CacheKey::from(settings);
450        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
451        if connector.is_none() {
452            let mut cache = self.connector_cache.write().unwrap();
453            // Short-circuit if another thread already wrote a connector to the cache for this key
454            if !cache.contains_key(&key) {
455                let mut builder = HyperConnector::builder()
456                    .hyper_builder(self.client_builder.clone())
457                    .connector_settings(settings.clone());
458                builder.set_sleep_impl(components.sleep_impl());
459
460                let start = components.time_source().map(|ts| ts.now());
461                let tcp_connector = (self.tcp_connector_fn)();
462                let end = components.time_source().map(|ts| ts.now());
463                if let (Some(start), Some(end)) = (start, end) {
464                    if let Ok(elapsed) = end.duration_since(start) {
465                        tracing::debug!("new TCP connector created in {:?}", elapsed);
466                    }
467                }
468                let connector = SharedHttpConnector::new(builder.build(tcp_connector));
469                cache.insert(key.clone(), connector);
470            }
471            connector = cache.get(&key).cloned();
472        }
473
474        connector.expect("cache populated above")
475    }
476
477    fn validate_base_client_config(
478        &self,
479        _: &RuntimeComponentsBuilder,
480        _: &ConfigBag,
481    ) -> Result<(), BoxError> {
482        // Initialize the TCP connector at this point so that native certs load
483        // at client initialization time instead of upon first request. We do it
484        // here rather than at construction so that it won't run if this is not
485        // the selected HTTP client for the base config (for example, if this was
486        // the default HTTP client, and it was overridden by a later plugin).
487        let _ = (self.tcp_connector_fn)();
488        Ok(())
489    }
490
491    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
492        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("0.x"))))
493    }
494}
495
496/// Builder for a hyper-backed [`HttpClient`] implementation.
497///
498/// This builder can be used to customize the underlying TCP connector used, as well as
499/// hyper client configuration.
500///
501/// # Examples
502///
503/// Construct a Hyper client with the default TLS implementation (rustls).
504/// This can be useful when you want to share a Hyper connector between multiple
505/// generated Smithy clients.
506///
507/// ```no_run,ignore
508/// use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
509///
510/// let http_client = HyperClientBuilder::new().build_https();
511///
512/// // This connector can then be given to a generated service Config
513/// let config = my_service_client::Config::builder()
514///     .endpoint_url("http://localhost:1234")
515///     .http_client(http_client)
516///     .build();
517/// let client = my_service_client::Client::from_conf(config);
518/// ```
519///
520/// ## Use a Hyper client with WebPKI roots
521///
522/// A use case for where you may want to use the [`HyperClientBuilder`] is when
523/// setting Hyper client settings that aren't otherwise exposed by the `Config`
524/// builder interface. Some examples include changing:
525///
526/// - Hyper client settings
527/// - Allowed TLS cipher suites
528/// - Using an alternative TLS connector library (not the default, rustls)
529/// - CA trust root certificates (illustrated using WebPKI below)
530///
531/// ```no_run,ignore
532/// use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
533///
534/// let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
535///     .with_webpki_roots()
536///     .https_only()
537///     .enable_http1()
538///     .enable_http2()
539///     .build();
540/// let http_client = HyperClientBuilder::new().build(https_connector);
541///
542/// // This connector can then be given to a generated service Config
543/// let config = my_service_client::Config::builder()
544///     .endpoint_url("https://example.com")
545///     .http_client(http_client)
546///     .build();
547/// let client = my_service_client::Client::from_conf(config);
548/// ```
549#[derive(Clone, Default, Debug)]
550pub struct HyperClientBuilder {
551    client_builder: Option<hyper_0_14::client::Builder>,
552}
553
554impl HyperClientBuilder {
555    /// Creates a new builder.
556    pub fn new() -> Self {
557        Self::default()
558    }
559
560    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
561    ///
562    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
563    pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
564        self.client_builder = Some(hyper_builder);
565        self
566    }
567
568    /// Override the Hyper client [`Builder`](hyper_0_14::client::Builder) used to construct this client.
569    ///
570    /// This enables changing settings like forcing HTTP2 and modifying other default client behavior.
571    pub fn set_hyper_builder(
572        &mut self,
573        hyper_builder: Option<hyper_0_14::client::Builder>,
574    ) -> &mut Self {
575        self.client_builder = hyper_builder;
576        self
577    }
578
579    /// Create a hyper client with the default rustls HTTPS implementation.
580    ///
581    /// The trusted certificates will be loaded later when this becomes the selected
582    /// HTTP client for a Smithy client.
583    #[cfg(feature = "tls-rustls")]
584    pub fn build_https(self) -> SharedHttpClient {
585        self.build_with_fn(default_connector::https)
586    }
587
588    /// Create a [`SharedHttpClient`] from this builder and a given connector.
589    ///
590    #[cfg_attr(
591        feature = "tls-rustls",
592        doc = "Use [`build_https`](HyperClientBuilder::build_https) if you don't want to provide a custom TCP connector."
593    )]
594    pub fn build<C>(self, tcp_connector: C) -> SharedHttpClient
595    where
596        C: Clone + Send + Sync + 'static,
597        C: hyper_0_14::service::Service<http_02x::Uri>,
598        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
599        C::Future: Unpin + Send + 'static,
600        C::Error: Into<BoxError>,
601    {
602        self.build_with_fn(move || tcp_connector.clone())
603    }
604
605    fn build_with_fn<C, F>(self, tcp_connector_fn: F) -> SharedHttpClient
606    where
607        F: Fn() -> C + Send + Sync + 'static,
608        C: Clone + Send + Sync + 'static,
609        C: hyper_0_14::service::Service<http_02x::Uri>,
610        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
611        C::Future: Unpin + Send + 'static,
612        C::Error: Into<BoxError>,
613    {
614        SharedHttpClient::new(HyperClient {
615            connector_cache: RwLock::new(HashMap::new()),
616            client_builder: self.client_builder.unwrap_or_default(),
617            tcp_connector_fn,
618        })
619    }
620}
621
622mod timeout_middleware {
623    use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
624    use aws_smithy_async::rt::sleep::Sleep;
625    use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
626    use aws_smithy_runtime_api::box_error::BoxError;
627    use pin_project_lite::pin_project;
628    use std::error::Error;
629    use std::fmt::Formatter;
630    use std::future::Future;
631    use std::pin::Pin;
632    use std::task::{Context, Poll};
633    use std::time::Duration;
634
635    #[derive(Debug)]
636    pub(crate) struct HttpTimeoutError {
637        kind: &'static str,
638        duration: Duration,
639    }
640
641    impl std::fmt::Display for HttpTimeoutError {
642        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
643            write!(
644                f,
645                "{} timeout occurred after {:?}",
646                self.kind, self.duration
647            )
648        }
649    }
650
651    impl Error for HttpTimeoutError {
652        // We implement the `source` function as returning a `TimedOutError` because when `downcast_error`
653        // or `find_source` is called with an `HttpTimeoutError` (or another error wrapping an `HttpTimeoutError`)
654        // this method will be checked to determine if it's a timeout-related error.
655        fn source(&self) -> Option<&(dyn Error + 'static)> {
656            Some(&TimedOutError)
657        }
658    }
659
660    /// Timeout wrapper that will timeout on the initial TCP connection
661    ///
662    /// # Stability
663    /// This interface is unstable.
664    #[derive(Clone, Debug)]
665    pub(super) struct ConnectTimeout<I> {
666        inner: I,
667        timeout: Option<(SharedAsyncSleep, Duration)>,
668    }
669
670    impl<I> ConnectTimeout<I> {
671        /// Create a new `ConnectTimeout` around `inner`.
672        ///
673        /// Typically, `I` will implement [`hyper_0_14::client::connect::Connect`].
674        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
675            Self {
676                inner,
677                timeout: Some((sleep, timeout)),
678            }
679        }
680
681        pub(crate) fn no_timeout(inner: I) -> Self {
682            Self {
683                inner,
684                timeout: None,
685            }
686        }
687    }
688
689    #[derive(Clone, Debug)]
690    pub(crate) struct HttpReadTimeout<I> {
691        inner: I,
692        timeout: Option<(SharedAsyncSleep, Duration)>,
693    }
694
695    impl<I> HttpReadTimeout<I> {
696        /// Create a new `HttpReadTimeout` around `inner`.
697        ///
698        /// Typically, `I` will implement [`hyper_0_14::service::Service<http::Request<SdkBody>>`].
699        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
700            Self {
701                inner,
702                timeout: Some((sleep, timeout)),
703            }
704        }
705
706        pub(crate) fn no_timeout(inner: I) -> Self {
707            Self {
708                inner,
709                timeout: None,
710            }
711        }
712    }
713
714    pin_project! {
715        /// Timeout future for Tower services
716        ///
717        /// Timeout future to handle timing out, mapping errors, and the possibility of not timing out
718        /// without incurring an additional allocation for each timeout layer.
719        #[project = MaybeTimeoutFutureProj]
720        pub enum MaybeTimeoutFuture<F> {
721            Timeout {
722                #[pin]
723                timeout: Timeout<F, Sleep>,
724                error_type: &'static str,
725                duration: Duration,
726            },
727            NoTimeout {
728                #[pin]
729                future: F
730            }
731        }
732    }
733
734    impl<F, T, E> Future for MaybeTimeoutFuture<F>
735    where
736        F: Future<Output = Result<T, E>>,
737        E: Into<BoxError>,
738    {
739        type Output = Result<T, BoxError>;
740
741        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
742            let (timeout_future, kind, &mut duration) = match self.project() {
743                MaybeTimeoutFutureProj::NoTimeout { future } => {
744                    return future.poll(cx).map_err(|err| err.into());
745                }
746                MaybeTimeoutFutureProj::Timeout {
747                    timeout,
748                    error_type,
749                    duration,
750                } => (timeout, error_type, duration),
751            };
752            match timeout_future.poll(cx) {
753                Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
754                Poll::Ready(Err(_timeout)) => {
755                    Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
756                }
757                Poll::Pending => Poll::Pending,
758            }
759        }
760    }
761
762    impl<I> hyper_0_14::service::Service<http_02x::Uri> for ConnectTimeout<I>
763    where
764        I: hyper_0_14::service::Service<http_02x::Uri>,
765        I::Error: Into<BoxError>,
766    {
767        type Response = I::Response;
768        type Error = BoxError;
769        type Future = MaybeTimeoutFuture<I::Future>;
770
771        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
772            self.inner.poll_ready(cx).map_err(|err| err.into())
773        }
774
775        fn call(&mut self, req: http_02x::Uri) -> Self::Future {
776            match &self.timeout {
777                Some((sleep, duration)) => {
778                    let sleep = sleep.sleep(*duration);
779                    MaybeTimeoutFuture::Timeout {
780                        timeout: Timeout::new(self.inner.call(req), sleep),
781                        error_type: "HTTP connect",
782                        duration: *duration,
783                    }
784                }
785                None => MaybeTimeoutFuture::NoTimeout {
786                    future: self.inner.call(req),
787                },
788            }
789        }
790    }
791
792    impl<I, B> hyper_0_14::service::Service<http_02x::Request<B>> for HttpReadTimeout<I>
793    where
794        I: hyper_0_14::service::Service<http_02x::Request<B>, Error = hyper_0_14::Error>,
795    {
796        type Response = I::Response;
797        type Error = BoxError;
798        type Future = MaybeTimeoutFuture<I::Future>;
799
800        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
801            self.inner.poll_ready(cx).map_err(|err| err.into())
802        }
803
804        fn call(&mut self, req: http_02x::Request<B>) -> Self::Future {
805            match &self.timeout {
806                Some((sleep, duration)) => {
807                    let sleep = sleep.sleep(*duration);
808                    MaybeTimeoutFuture::Timeout {
809                        timeout: Timeout::new(self.inner.call(req), sleep),
810                        error_type: "HTTP read",
811                        duration: *duration,
812                    }
813                }
814                None => MaybeTimeoutFuture::NoTimeout {
815                    future: self.inner.call(req),
816                },
817            }
818        }
819    }
820
821    #[cfg(test)]
822    mod test {
823        use crate::client::http::hyper_014::HyperConnector;
824        use aws_smithy_async::assert_elapsed;
825        use aws_smithy_async::future::never::Never;
826        use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
827        use aws_smithy_runtime_api::box_error::BoxError;
828        use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
829        use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
830        use aws_smithy_runtime_api::client::result::ConnectorError;
831        use aws_smithy_types::error::display::DisplayErrorContext;
832        use hyper_0_14::client::connect::{Connected, Connection};
833        use std::future::Future;
834        use std::pin::Pin;
835        use std::task::{Context, Poll};
836        use std::time::Duration;
837        use tokio::io::ReadBuf;
838        use tokio::io::{AsyncRead, AsyncWrite};
839        use tokio::net::TcpStream;
840
841        #[allow(unused)]
842        fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
843            is_send_sync::<super::ConnectTimeout<T>>();
844        }
845
846        #[allow(unused)]
847        fn is_send_sync<T: Send + Sync>() {}
848
849        /// A service that will never return whatever it is you want
850        ///
851        /// Returned futures will return Pending forever
852        #[non_exhaustive]
853        #[derive(Clone, Default, Debug)]
854        struct NeverConnects;
855        impl hyper_0_14::service::Service<http_02x::Uri> for NeverConnects {
856            type Response = TcpStream;
857            type Error = ConnectorError;
858            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
859
860            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
861                Poll::Ready(Ok(()))
862            }
863
864            fn call(&mut self, _uri: http_02x::Uri) -> Self::Future {
865                Box::pin(async move {
866                    Never::new().await;
867                    unreachable!()
868                })
869            }
870        }
871
872        /// A service that will connect but never send any data
873        #[derive(Clone, Debug, Default)]
874        struct NeverReplies;
875        impl hyper_0_14::service::Service<http_02x::Uri> for NeverReplies {
876            type Response = EmptyStream;
877            type Error = BoxError;
878            type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
879
880            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
881                Poll::Ready(Ok(()))
882            }
883
884            fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
885                std::future::ready(Ok(EmptyStream))
886            }
887        }
888
889        /// A stream that will never return or accept any data
890        #[non_exhaustive]
891        #[derive(Debug, Default)]
892        struct EmptyStream;
893        impl AsyncRead for EmptyStream {
894            fn poll_read(
895                self: Pin<&mut Self>,
896                _cx: &mut Context<'_>,
897                _buf: &mut ReadBuf<'_>,
898            ) -> Poll<std::io::Result<()>> {
899                Poll::Pending
900            }
901        }
902        impl AsyncWrite for EmptyStream {
903            fn poll_write(
904                self: Pin<&mut Self>,
905                _cx: &mut Context<'_>,
906                _buf: &[u8],
907            ) -> Poll<Result<usize, std::io::Error>> {
908                Poll::Pending
909            }
910
911            fn poll_flush(
912                self: Pin<&mut Self>,
913                _cx: &mut Context<'_>,
914            ) -> Poll<Result<(), std::io::Error>> {
915                Poll::Pending
916            }
917
918            fn poll_shutdown(
919                self: Pin<&mut Self>,
920                _cx: &mut Context<'_>,
921            ) -> Poll<Result<(), std::io::Error>> {
922                Poll::Pending
923            }
924        }
925        impl Connection for EmptyStream {
926            fn connected(&self) -> Connected {
927                Connected::new()
928            }
929        }
930
931        #[tokio::test]
932        async fn http_connect_timeout_works() {
933            let tcp_connector = NeverConnects::default();
934            let connector_settings = HttpConnectorSettings::builder()
935                .connect_timeout(Duration::from_secs(1))
936                .build();
937            let hyper = HyperConnector::builder()
938                .connector_settings(connector_settings)
939                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
940                .build(tcp_connector)
941                .adapter;
942            let now = tokio::time::Instant::now();
943            tokio::time::pause();
944            let resp = hyper
945                .call(HttpRequest::get("https://static-uri.com").unwrap())
946                .await
947                .unwrap_err();
948            assert!(
949                resp.is_timeout(),
950                "expected resp.is_timeout() to be true but it was false, resp == {:?}",
951                resp
952            );
953            let message = DisplayErrorContext(&resp).to_string();
954            let expected =
955                "timeout: error trying to connect: HTTP connect timeout occurred after 1s";
956            assert!(
957                message.contains(expected),
958                "expected '{message}' to contain '{expected}'"
959            );
960            assert_elapsed!(now, Duration::from_secs(1));
961        }
962
963        #[tokio::test]
964        async fn http_read_timeout_works() {
965            let tcp_connector = NeverReplies;
966            let connector_settings = HttpConnectorSettings::builder()
967                .connect_timeout(Duration::from_secs(1))
968                .read_timeout(Duration::from_secs(2))
969                .build();
970            let hyper = HyperConnector::builder()
971                .connector_settings(connector_settings)
972                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
973                .build(tcp_connector)
974                .adapter;
975            let now = tokio::time::Instant::now();
976            tokio::time::pause();
977            let err = hyper
978                .call(HttpRequest::get("https://fake-uri.com").unwrap())
979                .await
980                .unwrap_err();
981            assert!(
982                err.is_timeout(),
983                "expected err.is_timeout() to be true but it was false, err == {err:?}",
984            );
985            let message = format!("{}", DisplayErrorContext(&err));
986            let expected = "timeout: HTTP read timeout occurred after 2s";
987            assert!(
988                message.contains(expected),
989                "expected '{message}' to contain '{expected}'"
990            );
991            assert_elapsed!(now, Duration::from_secs(2));
992        }
993    }
994}
995
996#[cfg(all(test, feature = "test-util"))]
997mod test {
998    use crate::client::http::hyper_014::{HyperClientBuilder, HyperConnector};
999    use crate::client::http::test_util::NeverTcpConnector;
1000    use aws_smithy_async::time::SystemTimeSource;
1001    use aws_smithy_runtime_api::box_error::BoxError;
1002    use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnectorSettings};
1003    use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1004    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1005    use hyper_0_14::client::connect::{Connected, Connection};
1006    use std::io::{Error, ErrorKind};
1007    use std::pin::Pin;
1008    use std::sync::atomic::{AtomicU32, Ordering};
1009    use std::sync::Arc;
1010    use std::task::{Context, Poll};
1011    use std::time::Duration;
1012    use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1013
1014    #[tokio::test]
1015    async fn connector_selection() {
1016        // Create a client that increments a count every time it creates a new HyperConnector
1017        let creation_count = Arc::new(AtomicU32::new(0));
1018        let http_client = HyperClientBuilder::new().build_with_fn({
1019            let count = creation_count.clone();
1020            move || {
1021                count.fetch_add(1, Ordering::Relaxed);
1022                NeverTcpConnector::new()
1023            }
1024        });
1025
1026        // This configuration should result in 4 separate connectors with different timeout settings
1027        let settings = [
1028            HttpConnectorSettings::builder()
1029                .connect_timeout(Duration::from_secs(3))
1030                .build(),
1031            HttpConnectorSettings::builder()
1032                .read_timeout(Duration::from_secs(3))
1033                .build(),
1034            HttpConnectorSettings::builder()
1035                .connect_timeout(Duration::from_secs(3))
1036                .read_timeout(Duration::from_secs(3))
1037                .build(),
1038            HttpConnectorSettings::builder()
1039                .connect_timeout(Duration::from_secs(5))
1040                .read_timeout(Duration::from_secs(3))
1041                .build(),
1042        ];
1043
1044        // Kick off thousands of parallel tasks that will try to create a connector
1045        let components = RuntimeComponentsBuilder::for_tests()
1046            .with_time_source(Some(SystemTimeSource::new()))
1047            .build()
1048            .unwrap();
1049        let mut handles = Vec::new();
1050        for setting in &settings {
1051            for _ in 0..1000 {
1052                let client = http_client.clone();
1053                handles.push(tokio::spawn({
1054                    let setting = setting.clone();
1055                    let components = components.clone();
1056                    async move {
1057                        let _ = client.http_connector(&setting, &components);
1058                    }
1059                }));
1060            }
1061        }
1062        for handle in handles {
1063            handle.await.unwrap();
1064        }
1065
1066        // Verify only 4 connectors were created amidst the chaos
1067        assert_eq!(4, creation_count.load(Ordering::Relaxed));
1068    }
1069
1070    #[tokio::test]
1071    async fn hyper_io_error() {
1072        let connector = TestConnection {
1073            inner: HangupStream,
1074        };
1075        let adapter = HyperConnector::builder().build(connector).adapter;
1076        let err = adapter
1077            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1078            .await
1079            .expect_err("socket hangup");
1080        assert!(err.is_io(), "{:?}", err);
1081    }
1082
1083    // ---- machinery to make a Hyper connector that responds with an IO Error
1084    #[derive(Clone)]
1085    struct HangupStream;
1086
1087    impl Connection for HangupStream {
1088        fn connected(&self) -> Connected {
1089            Connected::new()
1090        }
1091    }
1092
1093    impl AsyncRead for HangupStream {
1094        fn poll_read(
1095            self: Pin<&mut Self>,
1096            _cx: &mut Context<'_>,
1097            _buf: &mut ReadBuf<'_>,
1098        ) -> Poll<std::io::Result<()>> {
1099            Poll::Ready(Err(Error::new(
1100                ErrorKind::ConnectionReset,
1101                "connection reset",
1102            )))
1103        }
1104    }
1105
1106    impl AsyncWrite for HangupStream {
1107        fn poll_write(
1108            self: Pin<&mut Self>,
1109            _cx: &mut Context<'_>,
1110            _buf: &[u8],
1111        ) -> Poll<Result<usize, Error>> {
1112            Poll::Pending
1113        }
1114
1115        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1116            Poll::Pending
1117        }
1118
1119        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1120            Poll::Pending
1121        }
1122    }
1123
1124    #[derive(Clone)]
1125    struct TestConnection<T> {
1126        inner: T,
1127    }
1128
1129    impl<T> hyper_0_14::service::Service<http_02x::Uri> for TestConnection<T>
1130    where
1131        T: Clone + Connection,
1132    {
1133        type Response = T;
1134        type Error = BoxError;
1135        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1136
1137        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1138            Poll::Ready(Ok(()))
1139        }
1140
1141        fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
1142            std::future::ready(Ok(self.inner.clone()))
1143        }
1144    }
1145}