1use crate::client::http::connection_poisoning::CaptureSmithyConnection;
7use crate::client::http::hyper_014::timeout_middleware::HttpTimeoutError;
8use aws_smithy_async::future::timeout::TimedOutError;
9use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
10use aws_smithy_runtime_api::box_error::BoxError;
11use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
12use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
13use aws_smithy_runtime_api::client::http::{
14 HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
15 SharedHttpConnector,
16};
17use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
18use aws_smithy_runtime_api::client::result::ConnectorError;
19use aws_smithy_runtime_api::client::runtime_components::{
20 RuntimeComponents, RuntimeComponentsBuilder,
21};
22use aws_smithy_runtime_api::shared::IntoShared;
23use aws_smithy_types::body::SdkBody;
24use aws_smithy_types::config_bag::ConfigBag;
25use aws_smithy_types::error::display::DisplayErrorContext;
26use aws_smithy_types::retry::ErrorKind;
27use h2::Reason;
28use hyper_0_14::client::connect::{capture_connection, CaptureConnection, Connection, HttpInfo};
29use std::borrow::Cow;
30use std::collections::HashMap;
31use std::error::Error;
32use std::fmt;
33use std::sync::RwLock;
34use std::time::Duration;
35use tokio::io::{AsyncRead, AsyncWrite};
36
37#[cfg(feature = "tls-rustls")]
38mod default_connector {
39 use aws_smithy_async::rt::sleep::SharedAsyncSleep;
40 use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
41
42 pub(crate) static HTTPS_NATIVE_ROOTS: once_cell::sync::Lazy<
45 hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector>,
46 > = once_cell::sync::Lazy::new(default_tls);
47
48 fn default_tls() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
49 use hyper_rustls::ConfigBuilderExt;
50 hyper_rustls::HttpsConnectorBuilder::new()
51 .with_tls_config(
52 rustls::ClientConfig::builder()
53 .with_cipher_suites(&[
54 rustls::cipher_suite::TLS13_AES_256_GCM_SHA384,
56 rustls::cipher_suite::TLS13_AES_128_GCM_SHA256,
57 rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
59 rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
60 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
61 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
62 rustls::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
63 ])
64 .with_safe_default_kx_groups()
65 .with_safe_default_protocol_versions()
66 .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.")
67 .with_native_roots()
68 .with_no_client_auth()
69 )
70 .https_or_http()
71 .enable_http1()
72 .enable_http2()
73 .build()
74 }
75
76 pub(super) fn base(
77 settings: &HttpConnectorSettings,
78 sleep: Option<SharedAsyncSleep>,
79 ) -> super::HyperConnectorBuilder {
80 let mut hyper = super::HyperConnector::builder().connector_settings(settings.clone());
81 if let Some(sleep) = sleep {
82 hyper = hyper.sleep_impl(sleep);
83 }
84 hyper
85 }
86
87 pub(super) fn https() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
92 HTTPS_NATIVE_ROOTS.clone()
93 }
94}
95
96pub fn default_connector(
98 settings: &HttpConnectorSettings,
99 sleep: Option<SharedAsyncSleep>,
100) -> Option<SharedHttpConnector> {
101 #[cfg(feature = "tls-rustls")]
102 {
103 tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
104 let hyper = default_connector::base(settings, sleep).build_https();
105 Some(SharedHttpConnector::new(hyper))
106 }
107 #[cfg(not(feature = "tls-rustls"))]
108 {
109 tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
110 None
111 }
112}
113
114pub fn default_client() -> Option<SharedHttpClient> {
116 #[cfg(feature = "tls-rustls")]
117 {
118 tracing::trace!("creating a new default hyper 0.14.x client");
119 Some(HyperClientBuilder::new().build_https())
120 }
121 #[cfg(not(feature = "tls-rustls"))]
122 {
123 tracing::trace!("no default connector available");
124 None
125 }
126}
127
128#[derive(Debug)]
136pub struct HyperConnector {
137 adapter: Box<dyn HttpConnector>,
138}
139
140impl HyperConnector {
141 pub fn builder() -> HyperConnectorBuilder {
143 Default::default()
144 }
145}
146
147impl HttpConnector for HyperConnector {
148 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
149 self.adapter.call(request)
150 }
151}
152
153#[derive(Default, Debug)]
155pub struct HyperConnectorBuilder {
156 connector_settings: Option<HttpConnectorSettings>,
157 sleep_impl: Option<SharedAsyncSleep>,
158 client_builder: Option<hyper_0_14::client::Builder>,
159}
160
161impl HyperConnectorBuilder {
162 pub fn build<C>(self, tcp_connector: C) -> HyperConnector
164 where
165 C: Clone + Send + Sync + 'static,
166 C: hyper_0_14::service::Service<http_02x::Uri>,
167 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
168 C::Future: Unpin + Send + 'static,
169 C::Error: Into<BoxError>,
170 {
171 let client_builder = self.client_builder.unwrap_or_default();
172 let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
173 let (connect_timeout, read_timeout) = self
174 .connector_settings
175 .map(|c| (c.connect_timeout(), c.read_timeout()))
176 .unwrap_or((None, None));
177
178 let connector = match connect_timeout {
179 Some(duration) => timeout_middleware::ConnectTimeout::new(
180 tcp_connector,
181 sleep_impl
182 .clone()
183 .expect("a sleep impl must be provided in order to have a connect timeout"),
184 duration,
185 ),
186 None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
187 };
188 let base = client_builder.build(connector);
189 let read_timeout = match read_timeout {
190 Some(duration) => timeout_middleware::HttpReadTimeout::new(
191 base,
192 sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
193 duration,
194 ),
195 None => timeout_middleware::HttpReadTimeout::no_timeout(base),
196 };
197 HyperConnector {
198 adapter: Box::new(Adapter {
199 client: read_timeout,
200 }),
201 }
202 }
203
204 #[cfg(feature = "tls-rustls")]
206 pub fn build_https(self) -> HyperConnector {
207 self.build(default_connector::https())
208 }
209
210 pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
215 self.sleep_impl = Some(sleep_impl.into_shared());
216 self
217 }
218
219 pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
224 self.sleep_impl = sleep_impl;
225 self
226 }
227
228 pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
230 self.connector_settings = Some(connector_settings);
231 self
232 }
233
234 pub fn set_connector_settings(
236 &mut self,
237 connector_settings: Option<HttpConnectorSettings>,
238 ) -> &mut Self {
239 self.connector_settings = connector_settings;
240 self
241 }
242
243 pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
247 self.client_builder = Some(hyper_builder);
248 self
249 }
250
251 pub fn set_hyper_builder(
255 &mut self,
256 hyper_builder: Option<hyper_0_14::client::Builder>,
257 ) -> &mut Self {
258 self.client_builder = hyper_builder;
259 self
260 }
261}
262
263struct Adapter<C> {
267 client: timeout_middleware::HttpReadTimeout<
268 hyper_0_14::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
269 >,
270}
271
272impl<C> fmt::Debug for Adapter<C> {
273 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274 f.debug_struct("Adapter")
275 .field("client", &"** hyper client **")
276 .finish()
277 }
278}
279
280fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
282 let capture_conn = capture_conn.clone();
283 if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
284 let mut extensions = http_02x::Extensions::new();
285 conn.get_extras(&mut extensions);
286 let http_info = extensions.get::<HttpInfo>();
287 let mut builder = ConnectionMetadata::builder()
288 .proxied(conn.is_proxied())
289 .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
290 Some(conn) => conn.poison(),
291 None => tracing::trace!("no connection existed to poison"),
292 });
293
294 builder
295 .set_local_addr(http_info.map(|info| info.local_addr()))
296 .set_remote_addr(http_info.map(|info| info.remote_addr()));
297
298 let smithy_connection = builder.build();
299
300 Some(smithy_connection)
301 } else {
302 None
303 }
304}
305
306impl<C> HttpConnector for Adapter<C>
307where
308 C: Clone + Send + Sync + 'static,
309 C: hyper_0_14::service::Service<http_02x::Uri>,
310 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
311 C::Future: Unpin + Send + 'static,
312 C::Error: Into<BoxError>,
313{
314 fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
315 use hyper_0_14::service::Service;
316
317 let mut request = match request.try_into_http02x() {
318 Ok(request) => request,
319 Err(err) => {
320 return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
321 }
322 };
323 let capture_connection = capture_connection(&mut request);
324 if let Some(capture_smithy_connection) =
325 request.extensions().get::<CaptureSmithyConnection>()
326 {
327 capture_smithy_connection
328 .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
329 }
330 let mut client = self.client.clone();
331 let fut = client.call(request);
332 HttpConnectorFuture::new(async move {
333 let response = fut
334 .await
335 .map_err(downcast_error)?
336 .map(SdkBody::from_body_0_4);
337 match HttpResponse::try_from(response) {
338 Ok(response) => Ok(response),
339 Err(err) => Err(ConnectorError::other(err.into(), None)),
340 }
341 })
342 }
343}
344
345fn downcast_error(err: BoxError) -> ConnectorError {
347 if find_source::<TimedOutError>(err.as_ref()).is_some() {
349 return ConnectorError::timeout(err);
350 }
351 let err = match err.downcast::<ConnectorError>() {
353 Ok(connector_error) => return *connector_error,
354 Err(box_error) => box_error,
355 };
356 let err = match err.downcast::<hyper_0_14::Error>() {
359 Ok(hyper_error) => return to_connector_error(*hyper_error),
360 Err(box_error) => box_error,
361 };
362
363 ConnectorError::other(err, None)
365}
366
367fn to_connector_error(err: hyper_0_14::Error) -> ConnectorError {
369 if err.is_timeout() || find_source::<HttpTimeoutError>(&err).is_some() {
370 return ConnectorError::timeout(err.into());
371 }
372 if err.is_user() {
373 return ConnectorError::user(err.into());
374 }
375 if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(&err).is_some() {
376 return ConnectorError::io(err.into());
377 }
378 if err.is_incomplete_message() {
380 return ConnectorError::other(err.into(), Some(ErrorKind::TransientError));
381 }
382 if let Some(h2_err) = find_source::<h2::Error>(&err) {
383 if h2_err.is_go_away()
384 || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
385 {
386 return ConnectorError::io(err.into());
387 }
388 }
389
390 tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
391 ConnectorError::other(err.into(), None)
392}
393
394fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
395 let mut next = Some(err);
396 while let Some(err) = next {
397 if let Some(matching_err) = err.downcast_ref::<E>() {
398 return Some(matching_err);
399 }
400 next = err.source();
401 }
402 None
403}
404
405#[derive(Clone, Debug, Eq, PartialEq, Hash)]
406struct CacheKey {
407 connect_timeout: Option<Duration>,
408 read_timeout: Option<Duration>,
409}
410
411impl From<&HttpConnectorSettings> for CacheKey {
412 fn from(value: &HttpConnectorSettings) -> Self {
413 Self {
414 connect_timeout: value.connect_timeout(),
415 read_timeout: value.read_timeout(),
416 }
417 }
418}
419
420struct HyperClient<F> {
421 connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
422 client_builder: hyper_0_14::client::Builder,
423 tcp_connector_fn: F,
424}
425
426impl<F> fmt::Debug for HyperClient<F> {
427 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
428 f.debug_struct("HyperClient")
429 .field("connector_cache", &self.connector_cache)
430 .field("client_builder", &self.client_builder)
431 .finish()
432 }
433}
434
435impl<C, F> HttpClient for HyperClient<F>
436where
437 F: Fn() -> C + Send + Sync,
438 C: Clone + Send + Sync + 'static,
439 C: hyper_0_14::service::Service<http_02x::Uri>,
440 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
441 C::Future: Unpin + Send + 'static,
442 C::Error: Into<BoxError>,
443{
444 fn http_connector(
445 &self,
446 settings: &HttpConnectorSettings,
447 components: &RuntimeComponents,
448 ) -> SharedHttpConnector {
449 let key = CacheKey::from(settings);
450 let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
451 if connector.is_none() {
452 let mut cache = self.connector_cache.write().unwrap();
453 if !cache.contains_key(&key) {
455 let mut builder = HyperConnector::builder()
456 .hyper_builder(self.client_builder.clone())
457 .connector_settings(settings.clone());
458 builder.set_sleep_impl(components.sleep_impl());
459
460 let start = components.time_source().map(|ts| ts.now());
461 let tcp_connector = (self.tcp_connector_fn)();
462 let end = components.time_source().map(|ts| ts.now());
463 if let (Some(start), Some(end)) = (start, end) {
464 if let Ok(elapsed) = end.duration_since(start) {
465 tracing::debug!("new TCP connector created in {:?}", elapsed);
466 }
467 }
468 let connector = SharedHttpConnector::new(builder.build(tcp_connector));
469 cache.insert(key.clone(), connector);
470 }
471 connector = cache.get(&key).cloned();
472 }
473
474 connector.expect("cache populated above")
475 }
476
477 fn validate_base_client_config(
478 &self,
479 _: &RuntimeComponentsBuilder,
480 _: &ConfigBag,
481 ) -> Result<(), BoxError> {
482 let _ = (self.tcp_connector_fn)();
488 Ok(())
489 }
490
491 fn connector_metadata(&self) -> Option<ConnectorMetadata> {
492 Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("0.x"))))
493 }
494}
495
496#[derive(Clone, Default, Debug)]
550pub struct HyperClientBuilder {
551 client_builder: Option<hyper_0_14::client::Builder>,
552}
553
554impl HyperClientBuilder {
555 pub fn new() -> Self {
557 Self::default()
558 }
559
560 pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
564 self.client_builder = Some(hyper_builder);
565 self
566 }
567
568 pub fn set_hyper_builder(
572 &mut self,
573 hyper_builder: Option<hyper_0_14::client::Builder>,
574 ) -> &mut Self {
575 self.client_builder = hyper_builder;
576 self
577 }
578
579 #[cfg(feature = "tls-rustls")]
584 pub fn build_https(self) -> SharedHttpClient {
585 self.build_with_fn(default_connector::https)
586 }
587
588 #[cfg_attr(
591 feature = "tls-rustls",
592 doc = "Use [`build_https`](HyperClientBuilder::build_https) if you don't want to provide a custom TCP connector."
593 )]
594 pub fn build<C>(self, tcp_connector: C) -> SharedHttpClient
595 where
596 C: Clone + Send + Sync + 'static,
597 C: hyper_0_14::service::Service<http_02x::Uri>,
598 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
599 C::Future: Unpin + Send + 'static,
600 C::Error: Into<BoxError>,
601 {
602 self.build_with_fn(move || tcp_connector.clone())
603 }
604
605 fn build_with_fn<C, F>(self, tcp_connector_fn: F) -> SharedHttpClient
606 where
607 F: Fn() -> C + Send + Sync + 'static,
608 C: Clone + Send + Sync + 'static,
609 C: hyper_0_14::service::Service<http_02x::Uri>,
610 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
611 C::Future: Unpin + Send + 'static,
612 C::Error: Into<BoxError>,
613 {
614 SharedHttpClient::new(HyperClient {
615 connector_cache: RwLock::new(HashMap::new()),
616 client_builder: self.client_builder.unwrap_or_default(),
617 tcp_connector_fn,
618 })
619 }
620}
621
622mod timeout_middleware {
623 use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
624 use aws_smithy_async::rt::sleep::Sleep;
625 use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
626 use aws_smithy_runtime_api::box_error::BoxError;
627 use pin_project_lite::pin_project;
628 use std::error::Error;
629 use std::fmt::Formatter;
630 use std::future::Future;
631 use std::pin::Pin;
632 use std::task::{Context, Poll};
633 use std::time::Duration;
634
635 #[derive(Debug)]
636 pub(crate) struct HttpTimeoutError {
637 kind: &'static str,
638 duration: Duration,
639 }
640
641 impl std::fmt::Display for HttpTimeoutError {
642 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
643 write!(
644 f,
645 "{} timeout occurred after {:?}",
646 self.kind, self.duration
647 )
648 }
649 }
650
651 impl Error for HttpTimeoutError {
652 fn source(&self) -> Option<&(dyn Error + 'static)> {
656 Some(&TimedOutError)
657 }
658 }
659
660 #[derive(Clone, Debug)]
665 pub(super) struct ConnectTimeout<I> {
666 inner: I,
667 timeout: Option<(SharedAsyncSleep, Duration)>,
668 }
669
670 impl<I> ConnectTimeout<I> {
671 pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
675 Self {
676 inner,
677 timeout: Some((sleep, timeout)),
678 }
679 }
680
681 pub(crate) fn no_timeout(inner: I) -> Self {
682 Self {
683 inner,
684 timeout: None,
685 }
686 }
687 }
688
689 #[derive(Clone, Debug)]
690 pub(crate) struct HttpReadTimeout<I> {
691 inner: I,
692 timeout: Option<(SharedAsyncSleep, Duration)>,
693 }
694
695 impl<I> HttpReadTimeout<I> {
696 pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
700 Self {
701 inner,
702 timeout: Some((sleep, timeout)),
703 }
704 }
705
706 pub(crate) fn no_timeout(inner: I) -> Self {
707 Self {
708 inner,
709 timeout: None,
710 }
711 }
712 }
713
714 pin_project! {
715 #[project = MaybeTimeoutFutureProj]
720 pub enum MaybeTimeoutFuture<F> {
721 Timeout {
722 #[pin]
723 timeout: Timeout<F, Sleep>,
724 error_type: &'static str,
725 duration: Duration,
726 },
727 NoTimeout {
728 #[pin]
729 future: F
730 }
731 }
732 }
733
734 impl<F, T, E> Future for MaybeTimeoutFuture<F>
735 where
736 F: Future<Output = Result<T, E>>,
737 E: Into<BoxError>,
738 {
739 type Output = Result<T, BoxError>;
740
741 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
742 let (timeout_future, kind, &mut duration) = match self.project() {
743 MaybeTimeoutFutureProj::NoTimeout { future } => {
744 return future.poll(cx).map_err(|err| err.into());
745 }
746 MaybeTimeoutFutureProj::Timeout {
747 timeout,
748 error_type,
749 duration,
750 } => (timeout, error_type, duration),
751 };
752 match timeout_future.poll(cx) {
753 Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
754 Poll::Ready(Err(_timeout)) => {
755 Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
756 }
757 Poll::Pending => Poll::Pending,
758 }
759 }
760 }
761
762 impl<I> hyper_0_14::service::Service<http_02x::Uri> for ConnectTimeout<I>
763 where
764 I: hyper_0_14::service::Service<http_02x::Uri>,
765 I::Error: Into<BoxError>,
766 {
767 type Response = I::Response;
768 type Error = BoxError;
769 type Future = MaybeTimeoutFuture<I::Future>;
770
771 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
772 self.inner.poll_ready(cx).map_err(|err| err.into())
773 }
774
775 fn call(&mut self, req: http_02x::Uri) -> Self::Future {
776 match &self.timeout {
777 Some((sleep, duration)) => {
778 let sleep = sleep.sleep(*duration);
779 MaybeTimeoutFuture::Timeout {
780 timeout: Timeout::new(self.inner.call(req), sleep),
781 error_type: "HTTP connect",
782 duration: *duration,
783 }
784 }
785 None => MaybeTimeoutFuture::NoTimeout {
786 future: self.inner.call(req),
787 },
788 }
789 }
790 }
791
792 impl<I, B> hyper_0_14::service::Service<http_02x::Request<B>> for HttpReadTimeout<I>
793 where
794 I: hyper_0_14::service::Service<http_02x::Request<B>, Error = hyper_0_14::Error>,
795 {
796 type Response = I::Response;
797 type Error = BoxError;
798 type Future = MaybeTimeoutFuture<I::Future>;
799
800 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
801 self.inner.poll_ready(cx).map_err(|err| err.into())
802 }
803
804 fn call(&mut self, req: http_02x::Request<B>) -> Self::Future {
805 match &self.timeout {
806 Some((sleep, duration)) => {
807 let sleep = sleep.sleep(*duration);
808 MaybeTimeoutFuture::Timeout {
809 timeout: Timeout::new(self.inner.call(req), sleep),
810 error_type: "HTTP read",
811 duration: *duration,
812 }
813 }
814 None => MaybeTimeoutFuture::NoTimeout {
815 future: self.inner.call(req),
816 },
817 }
818 }
819 }
820
821 #[cfg(test)]
822 mod test {
823 use crate::client::http::hyper_014::HyperConnector;
824 use aws_smithy_async::assert_elapsed;
825 use aws_smithy_async::future::never::Never;
826 use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
827 use aws_smithy_runtime_api::box_error::BoxError;
828 use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
829 use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
830 use aws_smithy_runtime_api::client::result::ConnectorError;
831 use aws_smithy_types::error::display::DisplayErrorContext;
832 use hyper_0_14::client::connect::{Connected, Connection};
833 use std::future::Future;
834 use std::pin::Pin;
835 use std::task::{Context, Poll};
836 use std::time::Duration;
837 use tokio::io::ReadBuf;
838 use tokio::io::{AsyncRead, AsyncWrite};
839 use tokio::net::TcpStream;
840
841 #[allow(unused)]
842 fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
843 is_send_sync::<super::ConnectTimeout<T>>();
844 }
845
846 #[allow(unused)]
847 fn is_send_sync<T: Send + Sync>() {}
848
849 #[non_exhaustive]
853 #[derive(Clone, Default, Debug)]
854 struct NeverConnects;
855 impl hyper_0_14::service::Service<http_02x::Uri> for NeverConnects {
856 type Response = TcpStream;
857 type Error = ConnectorError;
858 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
859
860 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
861 Poll::Ready(Ok(()))
862 }
863
864 fn call(&mut self, _uri: http_02x::Uri) -> Self::Future {
865 Box::pin(async move {
866 Never::new().await;
867 unreachable!()
868 })
869 }
870 }
871
872 #[derive(Clone, Debug, Default)]
874 struct NeverReplies;
875 impl hyper_0_14::service::Service<http_02x::Uri> for NeverReplies {
876 type Response = EmptyStream;
877 type Error = BoxError;
878 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
879
880 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
881 Poll::Ready(Ok(()))
882 }
883
884 fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
885 std::future::ready(Ok(EmptyStream))
886 }
887 }
888
889 #[non_exhaustive]
891 #[derive(Debug, Default)]
892 struct EmptyStream;
893 impl AsyncRead for EmptyStream {
894 fn poll_read(
895 self: Pin<&mut Self>,
896 _cx: &mut Context<'_>,
897 _buf: &mut ReadBuf<'_>,
898 ) -> Poll<std::io::Result<()>> {
899 Poll::Pending
900 }
901 }
902 impl AsyncWrite for EmptyStream {
903 fn poll_write(
904 self: Pin<&mut Self>,
905 _cx: &mut Context<'_>,
906 _buf: &[u8],
907 ) -> Poll<Result<usize, std::io::Error>> {
908 Poll::Pending
909 }
910
911 fn poll_flush(
912 self: Pin<&mut Self>,
913 _cx: &mut Context<'_>,
914 ) -> Poll<Result<(), std::io::Error>> {
915 Poll::Pending
916 }
917
918 fn poll_shutdown(
919 self: Pin<&mut Self>,
920 _cx: &mut Context<'_>,
921 ) -> Poll<Result<(), std::io::Error>> {
922 Poll::Pending
923 }
924 }
925 impl Connection for EmptyStream {
926 fn connected(&self) -> Connected {
927 Connected::new()
928 }
929 }
930
931 #[tokio::test]
932 async fn http_connect_timeout_works() {
933 let tcp_connector = NeverConnects::default();
934 let connector_settings = HttpConnectorSettings::builder()
935 .connect_timeout(Duration::from_secs(1))
936 .build();
937 let hyper = HyperConnector::builder()
938 .connector_settings(connector_settings)
939 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
940 .build(tcp_connector)
941 .adapter;
942 let now = tokio::time::Instant::now();
943 tokio::time::pause();
944 let resp = hyper
945 .call(HttpRequest::get("https://static-uri.com").unwrap())
946 .await
947 .unwrap_err();
948 assert!(
949 resp.is_timeout(),
950 "expected resp.is_timeout() to be true but it was false, resp == {:?}",
951 resp
952 );
953 let message = DisplayErrorContext(&resp).to_string();
954 let expected =
955 "timeout: error trying to connect: HTTP connect timeout occurred after 1s";
956 assert!(
957 message.contains(expected),
958 "expected '{message}' to contain '{expected}'"
959 );
960 assert_elapsed!(now, Duration::from_secs(1));
961 }
962
963 #[tokio::test]
964 async fn http_read_timeout_works() {
965 let tcp_connector = NeverReplies;
966 let connector_settings = HttpConnectorSettings::builder()
967 .connect_timeout(Duration::from_secs(1))
968 .read_timeout(Duration::from_secs(2))
969 .build();
970 let hyper = HyperConnector::builder()
971 .connector_settings(connector_settings)
972 .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
973 .build(tcp_connector)
974 .adapter;
975 let now = tokio::time::Instant::now();
976 tokio::time::pause();
977 let err = hyper
978 .call(HttpRequest::get("https://fake-uri.com").unwrap())
979 .await
980 .unwrap_err();
981 assert!(
982 err.is_timeout(),
983 "expected err.is_timeout() to be true but it was false, err == {err:?}",
984 );
985 let message = format!("{}", DisplayErrorContext(&err));
986 let expected = "timeout: HTTP read timeout occurred after 2s";
987 assert!(
988 message.contains(expected),
989 "expected '{message}' to contain '{expected}'"
990 );
991 assert_elapsed!(now, Duration::from_secs(2));
992 }
993 }
994}
995
996#[cfg(all(test, feature = "test-util"))]
997mod test {
998 use crate::client::http::hyper_014::{HyperClientBuilder, HyperConnector};
999 use crate::client::http::test_util::NeverTcpConnector;
1000 use aws_smithy_async::time::SystemTimeSource;
1001 use aws_smithy_runtime_api::box_error::BoxError;
1002 use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnectorSettings};
1003 use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1004 use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1005 use hyper_0_14::client::connect::{Connected, Connection};
1006 use std::io::{Error, ErrorKind};
1007 use std::pin::Pin;
1008 use std::sync::atomic::{AtomicU32, Ordering};
1009 use std::sync::Arc;
1010 use std::task::{Context, Poll};
1011 use std::time::Duration;
1012 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1013
1014 #[tokio::test]
1015 async fn connector_selection() {
1016 let creation_count = Arc::new(AtomicU32::new(0));
1018 let http_client = HyperClientBuilder::new().build_with_fn({
1019 let count = creation_count.clone();
1020 move || {
1021 count.fetch_add(1, Ordering::Relaxed);
1022 NeverTcpConnector::new()
1023 }
1024 });
1025
1026 let settings = [
1028 HttpConnectorSettings::builder()
1029 .connect_timeout(Duration::from_secs(3))
1030 .build(),
1031 HttpConnectorSettings::builder()
1032 .read_timeout(Duration::from_secs(3))
1033 .build(),
1034 HttpConnectorSettings::builder()
1035 .connect_timeout(Duration::from_secs(3))
1036 .read_timeout(Duration::from_secs(3))
1037 .build(),
1038 HttpConnectorSettings::builder()
1039 .connect_timeout(Duration::from_secs(5))
1040 .read_timeout(Duration::from_secs(3))
1041 .build(),
1042 ];
1043
1044 let components = RuntimeComponentsBuilder::for_tests()
1046 .with_time_source(Some(SystemTimeSource::new()))
1047 .build()
1048 .unwrap();
1049 let mut handles = Vec::new();
1050 for setting in &settings {
1051 for _ in 0..1000 {
1052 let client = http_client.clone();
1053 handles.push(tokio::spawn({
1054 let setting = setting.clone();
1055 let components = components.clone();
1056 async move {
1057 let _ = client.http_connector(&setting, &components);
1058 }
1059 }));
1060 }
1061 }
1062 for handle in handles {
1063 handle.await.unwrap();
1064 }
1065
1066 assert_eq!(4, creation_count.load(Ordering::Relaxed));
1068 }
1069
1070 #[tokio::test]
1071 async fn hyper_io_error() {
1072 let connector = TestConnection {
1073 inner: HangupStream,
1074 };
1075 let adapter = HyperConnector::builder().build(connector).adapter;
1076 let err = adapter
1077 .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1078 .await
1079 .expect_err("socket hangup");
1080 assert!(err.is_io(), "{:?}", err);
1081 }
1082
1083 #[derive(Clone)]
1085 struct HangupStream;
1086
1087 impl Connection for HangupStream {
1088 fn connected(&self) -> Connected {
1089 Connected::new()
1090 }
1091 }
1092
1093 impl AsyncRead for HangupStream {
1094 fn poll_read(
1095 self: Pin<&mut Self>,
1096 _cx: &mut Context<'_>,
1097 _buf: &mut ReadBuf<'_>,
1098 ) -> Poll<std::io::Result<()>> {
1099 Poll::Ready(Err(Error::new(
1100 ErrorKind::ConnectionReset,
1101 "connection reset",
1102 )))
1103 }
1104 }
1105
1106 impl AsyncWrite for HangupStream {
1107 fn poll_write(
1108 self: Pin<&mut Self>,
1109 _cx: &mut Context<'_>,
1110 _buf: &[u8],
1111 ) -> Poll<Result<usize, Error>> {
1112 Poll::Pending
1113 }
1114
1115 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1116 Poll::Pending
1117 }
1118
1119 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1120 Poll::Pending
1121 }
1122 }
1123
1124 #[derive(Clone)]
1125 struct TestConnection<T> {
1126 inner: T,
1127 }
1128
1129 impl<T> hyper_0_14::service::Service<http_02x::Uri> for TestConnection<T>
1130 where
1131 T: Clone + Connection,
1132 {
1133 type Response = T;
1134 type Error = BoxError;
1135 type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1136
1137 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1138 Poll::Ready(Ok(()))
1139 }
1140
1141 fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
1142 std::future::ready(Ok(self.inner.clone()))
1143 }
1144 }
1145}