1use std::fmt;
83use std::fmt::{Debug, Formatter};
84use std::ops::Deref;
85use std::sync::atomic::{AtomicBool, Ordering};
86use std::sync::Arc;
87
88use ::http::Extensions;
89use tokio::sync::watch;
90
91cfg_feature! {
92 #![feature = "tcp"]
93
94 pub use self::http::{HttpConnector, HttpInfo};
95
96 pub mod dns;
97 mod http;
98}
99
100cfg_feature! {
101 #![any(feature = "http1", feature = "http2")]
102
103 pub use self::sealed::Connect;
104}
105
106pub trait Connection {
108 fn connected(&self) -> Connected;
110}
111
112#[derive(Debug)]
117pub struct Connected {
118 pub(super) alpn: Alpn,
119 pub(super) is_proxied: bool,
120 pub(super) extra: Option<Extra>,
121 pub(super) poisoned: PoisonPill,
122}
123
124#[derive(Clone)]
125pub(crate) struct PoisonPill {
126 poisoned: Arc<AtomicBool>,
127}
128
129impl Debug for PoisonPill {
130 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131 write!(
133 f,
134 "PoisonPill@{:p} {{ poisoned: {} }}",
135 self.poisoned,
136 self.poisoned.load(Ordering::Relaxed)
137 )
138 }
139}
140
141impl PoisonPill {
142 pub(crate) fn healthy() -> Self {
143 Self {
144 poisoned: Arc::new(AtomicBool::new(false)),
145 }
146 }
147 pub(crate) fn poison(&self) {
148 self.poisoned.store(true, Ordering::Relaxed)
149 }
150
151 pub(crate) fn poisoned(&self) -> bool {
152 self.poisoned.load(Ordering::Relaxed)
153 }
154}
155
156#[derive(Debug, Clone)]
160pub struct CaptureConnection {
161 rx: watch::Receiver<Option<Connected>>,
162}
163
164pub fn capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection {
215 let (tx, rx) = CaptureConnection::new();
216 request.extensions_mut().insert(tx);
217 rx
218}
219
220#[derive(Clone)]
224pub(crate) struct CaptureConnectionExtension {
225 tx: Arc<watch::Sender<Option<Connected>>>,
226}
227
228impl CaptureConnectionExtension {
229 pub(crate) fn set(&self, connected: &Connected) {
230 self.tx.send_replace(Some(connected.clone()));
231 }
232}
233
234impl CaptureConnection {
235 pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
237 let (tx, rx) = watch::channel(None);
238 (
239 CaptureConnectionExtension { tx: Arc::new(tx) },
240 CaptureConnection { rx },
241 )
242 }
243
244 pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
246 self.rx.borrow()
247 }
248
249 pub async fn wait_for_connection_metadata(
254 &mut self,
255 ) -> impl Deref<Target = Option<Connected>> + '_ {
256 if self.rx.borrow().is_some() {
257 return self.rx.borrow();
258 }
259 let _ = self.rx.changed().await;
260 self.rx.borrow()
261 }
262}
263
264pub(super) struct Extra(Box<dyn ExtraInner>);
265
266#[derive(Clone, Copy, Debug, PartialEq)]
267pub(super) enum Alpn {
268 H2,
269 None,
270}
271
272impl Connected {
273 pub fn new() -> Connected {
275 Connected {
276 alpn: Alpn::None,
277 is_proxied: false,
278 extra: None,
279 poisoned: PoisonPill::healthy(),
280 }
281 }
282
283 pub fn proxy(mut self, is_proxied: bool) -> Connected {
302 self.is_proxied = is_proxied;
303 self
304 }
305
306 pub fn is_proxied(&self) -> bool {
308 self.is_proxied
309 }
310
311 pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
313 if let Some(prev) = self.extra {
314 self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra))));
315 } else {
316 self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
317 }
318 self
319 }
320
321 pub fn get_extras(&self, extensions: &mut Extensions) {
323 if let Some(extra) = &self.extra {
324 extra.set(extensions);
325 }
326 }
327
328 pub fn negotiated_h2(mut self) -> Connected {
330 self.alpn = Alpn::H2;
331 self
332 }
333
334 pub fn is_negotiated_h2(&self) -> bool {
336 self.alpn == Alpn::H2
337 }
338
339 pub fn poison(&self) {
343 self.poisoned.poison();
344 tracing::debug!(
345 poison_pill = ?self.poisoned, "connection was poisoned"
346 );
347 }
348
349 pub(super) fn clone(&self) -> Connected {
352 Connected {
353 alpn: self.alpn.clone(),
354 is_proxied: self.is_proxied,
355 extra: self.extra.clone(),
356 poisoned: self.poisoned.clone(),
357 }
358 }
359}
360
361impl Extra {
364 pub(super) fn set(&self, res: &mut Extensions) {
365 self.0.set(res);
366 }
367}
368
369impl Clone for Extra {
370 fn clone(&self) -> Extra {
371 Extra(self.0.clone_box())
372 }
373}
374
375impl fmt::Debug for Extra {
376 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377 f.debug_struct("Extra").finish()
378 }
379}
380
381trait ExtraInner: Send + Sync {
382 fn clone_box(&self) -> Box<dyn ExtraInner>;
383 fn set(&self, res: &mut Extensions);
384}
385
386#[derive(Clone)]
390struct ExtraEnvelope<T>(T);
391
392impl<T> ExtraInner for ExtraEnvelope<T>
393where
394 T: Clone + Send + Sync + 'static,
395{
396 fn clone_box(&self) -> Box<dyn ExtraInner> {
397 Box::new(self.clone())
398 }
399
400 fn set(&self, res: &mut Extensions) {
401 res.insert(self.0.clone());
402 }
403}
404
405struct ExtraChain<T>(Box<dyn ExtraInner>, T);
406
407impl<T: Clone> Clone for ExtraChain<T> {
408 fn clone(&self) -> Self {
409 ExtraChain(self.0.clone_box(), self.1.clone())
410 }
411}
412
413impl<T> ExtraInner for ExtraChain<T>
414where
415 T: Clone + Send + Sync + 'static,
416{
417 fn clone_box(&self) -> Box<dyn ExtraInner> {
418 Box::new(self.clone())
419 }
420
421 fn set(&self, res: &mut Extensions) {
422 self.0.set(res);
423 res.insert(self.1.clone());
424 }
425}
426
427#[cfg(any(feature = "http1", feature = "http2"))]
428pub(super) mod sealed {
429 use std::error::Error as StdError;
430 use std::future::Future;
431 use std::marker::Unpin;
432
433 use ::http::Uri;
434 use tokio::io::{AsyncRead, AsyncWrite};
435
436 use super::Connection;
437
438 pub trait Connect: Sealed + Sized {
451 #[doc(hidden)]
452 type _Svc: ConnectSvc;
453 #[doc(hidden)]
454 fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future;
455 }
456
457 #[allow(unreachable_pub)]
458 pub trait ConnectSvc {
459 type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
460 type Error: Into<Box<dyn StdError + Send + Sync>>;
461 type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;
462
463 fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
464 }
465
466 impl<S, T> Connect for S
467 where
468 S: tower_service::Service<Uri, Response = T> + Send + 'static,
469 S::Error: Into<Box<dyn StdError + Send + Sync>>,
470 S::Future: Unpin + Send,
471 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
472 {
473 type _Svc = S;
474
475 fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
476 crate::service::oneshot(self, dst)
477 }
478 }
479
480 impl<S, T> ConnectSvc for S
481 where
482 S: tower_service::Service<Uri, Response = T> + Send + 'static,
483 S::Error: Into<Box<dyn StdError + Send + Sync>>,
484 S::Future: Unpin + Send,
485 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
486 {
487 type Connection = T;
488 type Error = S::Error;
489 type Future = crate::service::Oneshot<S, Uri>;
490
491 fn connect(self, _: Internal, dst: Uri) -> Self::Future {
492 crate::service::oneshot(self, dst)
493 }
494 }
495
496 impl<S, T> Sealed for S
497 where
498 S: tower_service::Service<Uri, Response = T> + Send,
499 S::Error: Into<Box<dyn StdError + Send + Sync>>,
500 S::Future: Unpin + Send,
501 T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
502 {
503 }
504
505 pub trait Sealed {}
506 #[allow(missing_debug_implementations)]
507 pub struct Internal;
508}
509
510#[cfg(test)]
511mod tests {
512 use super::Connected;
513 use crate::client::connect::CaptureConnection;
514
515 #[derive(Clone, Debug, PartialEq)]
516 struct Ex1(usize);
517
518 #[derive(Clone, Debug, PartialEq)]
519 struct Ex2(&'static str);
520
521 #[derive(Clone, Debug, PartialEq)]
522 struct Ex3(&'static str);
523
524 #[test]
525 fn test_connected_extra() {
526 let c1 = Connected::new().extra(Ex1(41));
527
528 let mut ex = ::http::Extensions::new();
529
530 assert_eq!(ex.get::<Ex1>(), None);
531
532 c1.extra.as_ref().expect("c1 extra").set(&mut ex);
533
534 assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41)));
535 }
536
537 #[test]
538 fn test_connected_extra_chain() {
539 let c1 = Connected::new()
543 .extra(Ex1(45))
544 .extra(Ex2("zoom"))
545 .extra(Ex3("pew pew"));
546
547 let mut ex1 = ::http::Extensions::new();
548
549 assert_eq!(ex1.get::<Ex1>(), None);
550 assert_eq!(ex1.get::<Ex2>(), None);
551 assert_eq!(ex1.get::<Ex3>(), None);
552
553 c1.extra.as_ref().expect("c1 extra").set(&mut ex1);
554
555 assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45)));
556 assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom")));
557 assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew")));
558
559 let c2 = Connected::new()
561 .extra(Ex1(33))
562 .extra(Ex2("hiccup"))
563 .extra(Ex1(99));
564
565 let mut ex2 = ::http::Extensions::new();
566
567 c2.extra.as_ref().expect("c2 extra").set(&mut ex2);
568
569 assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
570 assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
571 }
572
573 #[test]
574 fn test_sync_capture_connection() {
575 let (tx, rx) = CaptureConnection::new();
576 assert!(
577 rx.connection_metadata().is_none(),
578 "connection has not been set"
579 );
580 tx.set(&Connected::new().proxy(true));
581 assert_eq!(
582 rx.connection_metadata()
583 .as_ref()
584 .expect("connected should be set")
585 .is_proxied(),
586 true
587 );
588
589 assert_eq!(
591 rx.connection_metadata()
592 .as_ref()
593 .expect("connected should be set")
594 .is_proxied(),
595 true
596 );
597 }
598
599 #[tokio::test]
600 async fn async_capture_connection() {
601 let (tx, mut rx) = CaptureConnection::new();
602 assert!(
603 rx.connection_metadata().is_none(),
604 "connection has not been set"
605 );
606 let test_task = tokio::spawn(async move {
607 assert_eq!(
608 rx.wait_for_connection_metadata()
609 .await
610 .as_ref()
611 .expect("connection should be set")
612 .is_proxied(),
613 true
614 );
615 assert!(
617 rx.wait_for_connection_metadata().await.is_some(),
618 "should be awaitable multiple times"
619 );
620
621 assert_eq!(rx.connection_metadata().is_some(), true);
622 });
623 assert_eq!(test_task.is_finished(), false);
625 tx.set(&Connected::new().proxy(true));
626
627 assert!(test_task.await.is_ok());
628 }
629
630 #[tokio::test]
631 async fn capture_connection_sender_side_dropped() {
632 let (tx, mut rx) = CaptureConnection::new();
633 assert!(
634 rx.connection_metadata().is_none(),
635 "connection has not been set"
636 );
637 drop(tx);
638 assert!(rx.wait_for_connection_metadata().await.is_none());
639 }
640}