hyper/client/
pool.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::error::Error as StdError;
3use std::fmt;
4use std::future::Future;
5use std::marker::Unpin;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::sync::{Arc, Mutex, Weak};
9use std::task::{Context, Poll};
10
11#[cfg(not(feature = "runtime"))]
12use std::time::{Duration, Instant};
13
14use futures_channel::oneshot;
15#[cfg(feature = "runtime")]
16use tokio::time::{Duration, Instant, Interval};
17use tracing::{debug, trace};
18
19use super::client::Ver;
20use crate::common::exec::Exec;
21
22// FIXME: allow() required due to `impl Trait` leaking types to this lint
23#[allow(missing_debug_implementations)]
24pub(super) struct Pool<T> {
25    // If the pool is disabled, this is None.
26    inner: Option<Arc<Mutex<PoolInner<T>>>>,
27}
28
29// Before using a pooled connection, make sure the sender is not dead.
30//
31// This is a trait to allow the `client::pool::tests` to work for `i32`.
32//
33// See https://github.com/hyperium/hyper/issues/1429
34pub(super) trait Poolable: Unpin + Send + Sized + 'static {
35    fn is_open(&self) -> bool;
36    /// Reserve this connection.
37    ///
38    /// Allows for HTTP/2 to return a shared reservation.
39    fn reserve(self) -> Reservation<Self>;
40    fn can_share(&self) -> bool;
41}
42
43/// When checking out a pooled connection, it might be that the connection
44/// only supports a single reservation, or it might be usable for many.
45///
46/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
47/// used for multiple requests.
48// FIXME: allow() required due to `impl Trait` leaking types to this lint
49#[allow(missing_debug_implementations)]
50pub(super) enum Reservation<T> {
51    /// This connection could be used multiple times, the first one will be
52    /// reinserted into the `idle` pool, and the second will be given to
53    /// the `Checkout`.
54    #[cfg(feature = "http2")]
55    Shared(T, T),
56    /// This connection requires unique access. It will be returned after
57    /// use is complete.
58    Unique(T),
59}
60
61/// Simple type alias in case the key type needs to be adjusted.
62pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
63
64struct PoolInner<T> {
65    // A flag that a connection is being established, and the connection
66    // should be shared. This prevents making multiple HTTP/2 connections
67    // to the same host.
68    connecting: HashSet<Key>,
69    // These are internal Conns sitting in the event loop in the KeepAlive
70    // state, waiting to receive a new Request to send on the socket.
71    idle: HashMap<Key, Vec<Idle<T>>>,
72    max_idle_per_host: usize,
73    // These are outstanding Checkouts that are waiting for a socket to be
74    // able to send a Request one. This is used when "racing" for a new
75    // connection.
76    //
77    // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
78    // for the Pool to receive an idle Conn. When a Conn becomes idle,
79    // this list is checked for any parked Checkouts, and tries to notify
80    // them that the Conn could be used instead of waiting for a brand new
81    // connection.
82    waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
83    // A oneshot channel is used to allow the interval to be notified when
84    // the Pool completely drops. That way, the interval can cancel immediately.
85    #[cfg(feature = "runtime")]
86    idle_interval_ref: Option<oneshot::Sender<std::convert::Infallible>>,
87    #[cfg(feature = "runtime")]
88    exec: Exec,
89    timeout: Option<Duration>,
90}
91
92// This is because `Weak::new()` *allocates* space for `T`, even if it
93// doesn't need it!
94struct WeakOpt<T>(Option<Weak<T>>);
95
96#[derive(Clone, Copy, Debug)]
97pub(super) struct Config {
98    pub(super) idle_timeout: Option<Duration>,
99    pub(super) max_idle_per_host: usize,
100}
101
102impl Config {
103    pub(super) fn is_enabled(&self) -> bool {
104        self.max_idle_per_host > 0
105    }
106}
107
108impl<T> Pool<T> {
109    pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
110        let inner = if config.is_enabled() {
111            Some(Arc::new(Mutex::new(PoolInner {
112                connecting: HashSet::new(),
113                idle: HashMap::new(),
114                #[cfg(feature = "runtime")]
115                idle_interval_ref: None,
116                max_idle_per_host: config.max_idle_per_host,
117                waiters: HashMap::new(),
118                #[cfg(feature = "runtime")]
119                exec: __exec.clone(),
120                timeout: config.idle_timeout.filter(|&t| t > Duration::ZERO),
121            })))
122        } else {
123            None
124        };
125
126        Pool { inner }
127    }
128
129    fn is_enabled(&self) -> bool {
130        self.inner.is_some()
131    }
132
133    #[cfg(test)]
134    pub(super) fn no_timer(&self) {
135        // Prevent an actual interval from being created for this pool...
136        #[cfg(feature = "runtime")]
137        {
138            let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
139            assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
140            let (tx, _) = oneshot::channel();
141            inner.idle_interval_ref = Some(tx);
142        }
143    }
144}
145
146impl<T: Poolable> Pool<T> {
147    /// Returns a `Checkout` which is a future that resolves if an idle
148    /// connection becomes available.
149    pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
150        Checkout {
151            key,
152            pool: self.clone(),
153            waiter: None,
154        }
155    }
156
157    /// Ensure that there is only ever 1 connecting task for HTTP/2
158    /// connections. This does nothing for HTTP/1.
159    pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
160        if ver == Ver::Http2 {
161            if let Some(ref enabled) = self.inner {
162                let mut inner = enabled.lock().unwrap();
163                return if inner.connecting.insert(key.clone()) {
164                    let connecting = Connecting {
165                        key: key.clone(),
166                        pool: WeakOpt::downgrade(enabled),
167                    };
168                    Some(connecting)
169                } else {
170                    trace!("HTTP/2 connecting already in progress for {:?}", key);
171                    None
172                };
173            }
174        }
175
176        // else
177        Some(Connecting {
178            key: key.clone(),
179            // in HTTP/1's case, there is never a lock, so we don't
180            // need to do anything in Drop.
181            pool: WeakOpt::none(),
182        })
183    }
184
185    #[cfg(test)]
186    fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
187        self.inner.as_ref().expect("enabled").lock().expect("lock")
188    }
189
190    /* Used in client/tests.rs...
191    #[cfg(feature = "runtime")]
192    #[cfg(test)]
193    pub(super) fn h1_key(&self, s: &str) -> Key {
194        Arc::new(s.to_string())
195    }
196
197    #[cfg(feature = "runtime")]
198    #[cfg(test)]
199    pub(super) fn idle_count(&self, key: &Key) -> usize {
200        self
201            .locked()
202            .idle
203            .get(key)
204            .map(|list| list.len())
205            .unwrap_or(0)
206    }
207    */
208
209    pub(super) fn pooled(
210        &self,
211        #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
212        value: T,
213    ) -> Pooled<T> {
214        let (value, pool_ref) = if let Some(ref enabled) = self.inner {
215            match value.reserve() {
216                #[cfg(feature = "http2")]
217                Reservation::Shared(to_insert, to_return) => {
218                    let mut inner = enabled.lock().unwrap();
219                    inner.put(connecting.key.clone(), to_insert, enabled);
220                    // Do this here instead of Drop for Connecting because we
221                    // already have a lock, no need to lock the mutex twice.
222                    inner.connected(&connecting.key);
223                    // prevent the Drop of Connecting from repeating inner.connected()
224                    connecting.pool = WeakOpt::none();
225
226                    // Shared reservations don't need a reference to the pool,
227                    // since the pool always keeps a copy.
228                    (to_return, WeakOpt::none())
229                }
230                Reservation::Unique(value) => {
231                    // Unique reservations must take a reference to the pool
232                    // since they hope to reinsert once the reservation is
233                    // completed
234                    (value, WeakOpt::downgrade(enabled))
235                }
236            }
237        } else {
238            // If pool is not enabled, skip all the things...
239
240            // The Connecting should have had no pool ref
241            debug_assert!(connecting.pool.upgrade().is_none());
242
243            (value, WeakOpt::none())
244        };
245        Pooled {
246            key: connecting.key.clone(),
247            is_reused: false,
248            pool: pool_ref,
249            value: Some(value),
250        }
251    }
252
253    fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
254        debug!("reuse idle connection for {:?}", key);
255        // TODO: unhack this
256        // In Pool::pooled(), which is used for inserting brand new connections,
257        // there's some code that adjusts the pool reference taken depending
258        // on if the Reservation can be shared or is unique. By the time
259        // reuse() is called, the reservation has already been made, and
260        // we just have the final value, without knowledge of if this is
261        // unique or shared. So, the hack is to just assume Ver::Http2 means
262        // shared... :(
263        let mut pool_ref = WeakOpt::none();
264        if !value.can_share() {
265            if let Some(ref enabled) = self.inner {
266                pool_ref = WeakOpt::downgrade(enabled);
267            }
268        }
269
270        Pooled {
271            is_reused: true,
272            key: key.clone(),
273            pool: pool_ref,
274            value: Some(value),
275        }
276    }
277}
278
279/// Pop off this list, looking for a usable connection that hasn't expired.
280struct IdlePopper<'a, T> {
281    key: &'a Key,
282    list: &'a mut Vec<Idle<T>>,
283}
284
285impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
286    fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
287        while let Some(entry) = self.list.pop() {
288            // If the connection has been closed, or is older than our idle
289            // timeout, simply drop it and keep looking...
290            if !entry.value.is_open() {
291                trace!("removing closed connection for {:?}", self.key);
292                continue;
293            }
294            // TODO: Actually, since the `idle` list is pushed to the end always,
295            // that would imply that if *this* entry is expired, then anything
296            // "earlier" in the list would *have* to be expired also... Right?
297            //
298            // In that case, we could just break out of the loop and drop the
299            // whole list...
300            if expiration.expires(entry.idle_at) {
301                trace!("removing expired connection for {:?}", self.key);
302                continue;
303            }
304
305            let value = match entry.value.reserve() {
306                #[cfg(feature = "http2")]
307                Reservation::Shared(to_reinsert, to_checkout) => {
308                    self.list.push(Idle {
309                        idle_at: Instant::now(),
310                        value: to_reinsert,
311                    });
312                    to_checkout
313                }
314                Reservation::Unique(unique) => unique,
315            };
316
317            return Some(Idle {
318                idle_at: entry.idle_at,
319                value,
320            });
321        }
322
323        None
324    }
325}
326
327impl<T: Poolable> PoolInner<T> {
328    fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
329        if value.can_share() && self.idle.contains_key(&key) {
330            trace!("put; existing idle HTTP/2 connection for {:?}", key);
331            return;
332        }
333        trace!("put; add idle connection for {:?}", key);
334        let mut remove_waiters = false;
335        let mut value = Some(value);
336        if let Some(waiters) = self.waiters.get_mut(&key) {
337            while let Some(tx) = waiters.pop_front() {
338                if !tx.is_canceled() {
339                    let reserved = value.take().expect("value already sent");
340                    let reserved = match reserved.reserve() {
341                        #[cfg(feature = "http2")]
342                        Reservation::Shared(to_keep, to_send) => {
343                            value = Some(to_keep);
344                            to_send
345                        }
346                        Reservation::Unique(uniq) => uniq,
347                    };
348                    match tx.send(reserved) {
349                        Ok(()) => {
350                            if value.is_none() {
351                                break;
352                            } else {
353                                continue;
354                            }
355                        }
356                        Err(e) => {
357                            value = Some(e);
358                        }
359                    }
360                }
361
362                trace!("put; removing canceled waiter for {:?}", key);
363            }
364            remove_waiters = waiters.is_empty();
365        }
366        if remove_waiters {
367            self.waiters.remove(&key);
368        }
369
370        match value {
371            Some(value) => {
372                // borrow-check scope...
373                {
374                    let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
375                    if self.max_idle_per_host <= idle_list.len() {
376                        trace!("max idle per host for {:?}, dropping connection", key);
377                        return;
378                    }
379
380                    debug!("pooling idle connection for {:?}", key);
381                    idle_list.push(Idle {
382                        value,
383                        idle_at: Instant::now(),
384                    });
385                }
386
387                #[cfg(feature = "runtime")]
388                {
389                    self.spawn_idle_interval(__pool_ref);
390                }
391            }
392            None => trace!("put; found waiter for {:?}", key),
393        }
394    }
395
396    /// A `Connecting` task is complete. Not necessarily successfully,
397    /// but the lock is going away, so clean up.
398    fn connected(&mut self, key: &Key) {
399        let existed = self.connecting.remove(key);
400        debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
401        // cancel any waiters. if there are any, it's because
402        // this Connecting task didn't complete successfully.
403        // those waiters would never receive a connection.
404        self.waiters.remove(key);
405    }
406
407    #[cfg(feature = "runtime")]
408    fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
409        let (dur, rx) = {
410            if self.idle_interval_ref.is_some() {
411                return;
412            }
413
414            if let Some(dur) = self.timeout {
415                let (tx, rx) = oneshot::channel();
416                self.idle_interval_ref = Some(tx);
417                (dur, rx)
418            } else {
419                return;
420            }
421        };
422
423        let interval = IdleTask {
424            interval: tokio::time::interval(dur),
425            pool: WeakOpt::downgrade(pool_ref),
426            pool_drop_notifier: rx,
427        };
428
429        self.exec.execute(interval);
430    }
431}
432
433impl<T> PoolInner<T> {
434    /// Any `FutureResponse`s that were created will have made a `Checkout`,
435    /// and possibly inserted into the pool that it is waiting for an idle
436    /// connection. If a user ever dropped that future, we need to clean out
437    /// those parked senders.
438    fn clean_waiters(&mut self, key: &Key) {
439        let mut remove_waiters = false;
440        if let Some(waiters) = self.waiters.get_mut(key) {
441            waiters.retain(|tx| !tx.is_canceled());
442            remove_waiters = waiters.is_empty();
443        }
444        if remove_waiters {
445            self.waiters.remove(key);
446        }
447    }
448}
449
450#[cfg(feature = "runtime")]
451impl<T: Poolable> PoolInner<T> {
452    /// This should *only* be called by the IdleTask
453    fn clear_expired(&mut self) {
454        let dur = self.timeout.expect("interval assumes timeout");
455
456        let now = Instant::now();
457        //self.last_idle_check_at = now;
458
459        self.idle.retain(|key, values| {
460            values.retain(|entry| {
461                if !entry.value.is_open() {
462                    trace!("idle interval evicting closed for {:?}", key);
463                    return false;
464                }
465
466                // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
467                if now.saturating_duration_since(entry.idle_at) > dur {
468                    trace!("idle interval evicting expired for {:?}", key);
469                    return false;
470                }
471
472                // Otherwise, keep this value...
473                true
474            });
475
476            // returning false evicts this key/val
477            !values.is_empty()
478        });
479    }
480}
481
482impl<T> Clone for Pool<T> {
483    fn clone(&self) -> Pool<T> {
484        Pool {
485            inner: self.inner.clone(),
486        }
487    }
488}
489
490/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
491// Note: The bounds `T: Poolable` is needed for the Drop impl.
492pub(super) struct Pooled<T: Poolable> {
493    value: Option<T>,
494    is_reused: bool,
495    key: Key,
496    pool: WeakOpt<Mutex<PoolInner<T>>>,
497}
498
499impl<T: Poolable> Pooled<T> {
500    pub(super) fn is_reused(&self) -> bool {
501        self.is_reused
502    }
503
504    pub(super) fn is_pool_enabled(&self) -> bool {
505        self.pool.0.is_some()
506    }
507
508    fn as_ref(&self) -> &T {
509        self.value.as_ref().expect("not dropped")
510    }
511
512    fn as_mut(&mut self) -> &mut T {
513        self.value.as_mut().expect("not dropped")
514    }
515}
516
517impl<T: Poolable> Deref for Pooled<T> {
518    type Target = T;
519    fn deref(&self) -> &T {
520        self.as_ref()
521    }
522}
523
524impl<T: Poolable> DerefMut for Pooled<T> {
525    fn deref_mut(&mut self) -> &mut T {
526        self.as_mut()
527    }
528}
529
530impl<T: Poolable> Drop for Pooled<T> {
531    fn drop(&mut self) {
532        if let Some(value) = self.value.take() {
533            if !value.is_open() {
534                // If we *already* know the connection is done here,
535                // it shouldn't be re-inserted back into the pool.
536                return;
537            }
538
539            if let Some(pool) = self.pool.upgrade() {
540                if let Ok(mut inner) = pool.lock() {
541                    inner.put(self.key.clone(), value, &pool);
542                }
543            } else if !value.can_share() {
544                trace!("pool dropped, dropping pooled ({:?})", self.key);
545            }
546            // Ver::Http2 is already in the Pool (or dead), so we wouldn't
547            // have an actual reference to the Pool.
548        }
549    }
550}
551
552impl<T: Poolable> fmt::Debug for Pooled<T> {
553    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554        f.debug_struct("Pooled").field("key", &self.key).finish()
555    }
556}
557
558struct Idle<T> {
559    idle_at: Instant,
560    value: T,
561}
562
563// FIXME: allow() required due to `impl Trait` leaking types to this lint
564#[allow(missing_debug_implementations)]
565pub(super) struct Checkout<T> {
566    key: Key,
567    pool: Pool<T>,
568    waiter: Option<oneshot::Receiver<T>>,
569}
570
571#[derive(Debug)]
572pub(super) struct CheckoutIsClosedError;
573
574impl StdError for CheckoutIsClosedError {}
575
576impl fmt::Display for CheckoutIsClosedError {
577    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578        f.write_str("checked out connection was closed")
579    }
580}
581
582impl<T: Poolable> Checkout<T> {
583    fn poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>> {
584        if let Some(mut rx) = self.waiter.take() {
585            match Pin::new(&mut rx).poll(cx) {
586                Poll::Ready(Ok(value)) => {
587                    if value.is_open() {
588                        Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
589                    } else {
590                        Poll::Ready(Some(Err(
591                            crate::Error::new_canceled().with(CheckoutIsClosedError)
592                        )))
593                    }
594                }
595                Poll::Pending => {
596                    self.waiter = Some(rx);
597                    Poll::Pending
598                }
599                Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
600                    crate::Error::new_canceled().with("request has been canceled")
601                ))),
602            }
603        } else {
604            Poll::Ready(None)
605        }
606    }
607
608    fn checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>> {
609        let entry = {
610            let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
611            let expiration = Expiration::new(inner.timeout);
612            let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
613                trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
614                // A block to end the mutable borrow on list,
615                // so the map below can check is_empty()
616                {
617                    let popper = IdlePopper {
618                        key: &self.key,
619                        list,
620                    };
621                    popper.pop(&expiration)
622                }
623                .map(|e| (e, list.is_empty()))
624            });
625
626            let (entry, empty) = if let Some((e, empty)) = maybe_entry {
627                (Some(e), empty)
628            } else {
629                // No entry found means nuke the list for sure.
630                (None, true)
631            };
632            if empty {
633                //TODO: This could be done with the HashMap::entry API instead.
634                inner.idle.remove(&self.key);
635            }
636
637            if entry.is_none() && self.waiter.is_none() {
638                let (tx, mut rx) = oneshot::channel();
639                trace!("checkout waiting for idle connection: {:?}", self.key);
640                inner
641                    .waiters
642                    .entry(self.key.clone())
643                    .or_insert_with(VecDeque::new)
644                    .push_back(tx);
645
646                // register the waker with this oneshot
647                assert!(Pin::new(&mut rx).poll(cx).is_pending());
648                self.waiter = Some(rx);
649            }
650
651            entry
652        };
653
654        entry.map(|e| self.pool.reuse(&self.key, e.value))
655    }
656}
657
658impl<T: Poolable> Future for Checkout<T> {
659    type Output = crate::Result<Pooled<T>>;
660
661    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
662        if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
663            return Poll::Ready(Ok(pooled));
664        }
665
666        if let Some(pooled) = self.checkout(cx) {
667            Poll::Ready(Ok(pooled))
668        } else if !self.pool.is_enabled() {
669            Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
670        } else {
671            // There's a new waiter, already registered in self.checkout()
672            debug_assert!(self.waiter.is_some());
673            Poll::Pending
674        }
675    }
676}
677
678impl<T> Drop for Checkout<T> {
679    fn drop(&mut self) {
680        if self.waiter.take().is_some() {
681            trace!("checkout dropped for {:?}", self.key);
682            if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
683                inner.clean_waiters(&self.key);
684            }
685        }
686    }
687}
688
689// FIXME: allow() required due to `impl Trait` leaking types to this lint
690#[allow(missing_debug_implementations)]
691pub(super) struct Connecting<T: Poolable> {
692    key: Key,
693    pool: WeakOpt<Mutex<PoolInner<T>>>,
694}
695
696impl<T: Poolable> Connecting<T> {
697    pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
698        debug_assert!(
699            self.pool.0.is_none(),
700            "Connecting::alpn_h2 but already Http2"
701        );
702
703        pool.connecting(&self.key, Ver::Http2)
704    }
705}
706
707impl<T: Poolable> Drop for Connecting<T> {
708    fn drop(&mut self) {
709        if let Some(pool) = self.pool.upgrade() {
710            // No need to panic on drop, that could abort!
711            if let Ok(mut inner) = pool.lock() {
712                inner.connected(&self.key);
713            }
714        }
715    }
716}
717
718struct Expiration(Option<Duration>);
719
720impl Expiration {
721    fn new(dur: Option<Duration>) -> Expiration {
722        Expiration(dur)
723    }
724
725    fn expires(&self, instant: Instant) -> bool {
726        match self.0 {
727            // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
728            Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
729            None => false,
730        }
731    }
732}
733
734#[cfg(feature = "runtime")]
735pin_project_lite::pin_project! {
736    struct IdleTask<T> {
737        #[pin]
738        interval: Interval,
739        pool: WeakOpt<Mutex<PoolInner<T>>>,
740        // This allows the IdleTask to be notified as soon as the entire
741        // Pool is fully dropped, and shutdown. This channel is never sent on,
742        // but Err(Canceled) will be received when the Pool is dropped.
743        #[pin]
744        pool_drop_notifier: oneshot::Receiver<std::convert::Infallible>,
745    }
746}
747
748#[cfg(feature = "runtime")]
749impl<T: Poolable + 'static> Future for IdleTask<T> {
750    type Output = ();
751
752    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
753        let mut this = self.project();
754        loop {
755            match this.pool_drop_notifier.as_mut().poll(cx) {
756                Poll::Ready(Ok(n)) => match n {},
757                Poll::Pending => (),
758                Poll::Ready(Err(_canceled)) => {
759                    trace!("pool closed, canceling idle interval");
760                    return Poll::Ready(());
761                }
762            }
763
764            ready!(this.interval.as_mut().poll_tick(cx));
765
766            if let Some(inner) = this.pool.upgrade() {
767                if let Ok(mut inner) = inner.lock() {
768                    trace!("idle interval checking for expired");
769                    inner.clear_expired();
770                    continue;
771                }
772            }
773            return Poll::Ready(());
774        }
775    }
776}
777
778impl<T> WeakOpt<T> {
779    fn none() -> Self {
780        WeakOpt(None)
781    }
782
783    fn downgrade(arc: &Arc<T>) -> Self {
784        WeakOpt(Some(Arc::downgrade(arc)))
785    }
786
787    fn upgrade(&self) -> Option<Arc<T>> {
788        self.0.as_ref().and_then(Weak::upgrade)
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use std::future::Future;
795    use std::pin::Pin;
796    use std::task::Context;
797    use std::task::Poll;
798    use std::time::Duration;
799
800    use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
801    use crate::common::exec::Exec;
802
803    /// Test unique reservations.
804    #[derive(Debug, PartialEq, Eq)]
805    struct Uniq<T>(T);
806
807    impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
808        fn is_open(&self) -> bool {
809            true
810        }
811
812        fn reserve(self) -> Reservation<Self> {
813            Reservation::Unique(self)
814        }
815
816        fn can_share(&self) -> bool {
817            false
818        }
819    }
820
821    fn c<T: Poolable>(key: Key) -> Connecting<T> {
822        Connecting {
823            key,
824            pool: WeakOpt::none(),
825        }
826    }
827
828    fn host_key(s: &str) -> Key {
829        (http::uri::Scheme::HTTP, s.parse().expect("host key"))
830    }
831
832    fn pool_no_timer<T>() -> Pool<T> {
833        pool_max_idle_no_timer(::std::usize::MAX)
834    }
835
836    fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
837        let pool = Pool::new(
838            super::Config {
839                idle_timeout: Some(Duration::from_millis(100)),
840                max_idle_per_host: max_idle,
841            },
842            &Exec::Default,
843        );
844        pool.no_timer();
845        pool
846    }
847
848    #[tokio::test]
849    async fn test_pool_checkout_smoke() {
850        let pool = pool_no_timer();
851        let key = host_key("foo");
852        let pooled = pool.pooled(c(key.clone()), Uniq(41));
853
854        drop(pooled);
855
856        match pool.checkout(key).await {
857            Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
858            Err(_) => panic!("not ready"),
859        };
860    }
861
862    /// Helper to check if the future is ready after polling once.
863    struct PollOnce<'a, F>(&'a mut F);
864
865    impl<F, T, U> Future for PollOnce<'_, F>
866    where
867        F: Future<Output = Result<T, U>> + Unpin,
868    {
869        type Output = Option<()>;
870
871        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872            match Pin::new(&mut self.0).poll(cx) {
873                Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
874                Poll::Ready(Err(_)) => Poll::Ready(Some(())),
875                Poll::Pending => Poll::Ready(None),
876            }
877        }
878    }
879
880    #[tokio::test]
881    async fn test_pool_checkout_returns_none_if_expired() {
882        let pool = pool_no_timer();
883        let key = host_key("foo");
884        let pooled = pool.pooled(c(key.clone()), Uniq(41));
885
886        drop(pooled);
887        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
888        let mut checkout = pool.checkout(key);
889        let poll_once = PollOnce(&mut checkout);
890        let is_not_ready = poll_once.await.is_none();
891        assert!(is_not_ready);
892    }
893
894    #[cfg(feature = "runtime")]
895    #[tokio::test]
896    async fn test_pool_checkout_removes_expired() {
897        let pool = pool_no_timer();
898        let key = host_key("foo");
899
900        pool.pooled(c(key.clone()), Uniq(41));
901        pool.pooled(c(key.clone()), Uniq(5));
902        pool.pooled(c(key.clone()), Uniq(99));
903
904        assert_eq!(
905            pool.locked().idle.get(&key).map(|entries| entries.len()),
906            Some(3)
907        );
908        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
909
910        let mut checkout = pool.checkout(key.clone());
911        let poll_once = PollOnce(&mut checkout);
912        // checkout.await should clean out the expired
913        poll_once.await;
914        assert!(pool.locked().idle.get(&key).is_none());
915    }
916
917    #[test]
918    fn test_pool_max_idle_per_host() {
919        let pool = pool_max_idle_no_timer(2);
920        let key = host_key("foo");
921
922        pool.pooled(c(key.clone()), Uniq(41));
923        pool.pooled(c(key.clone()), Uniq(5));
924        pool.pooled(c(key.clone()), Uniq(99));
925
926        // pooled and dropped 3, max_idle should only allow 2
927        assert_eq!(
928            pool.locked().idle.get(&key).map(|entries| entries.len()),
929            Some(2)
930        );
931    }
932
933    #[cfg(feature = "runtime")]
934    #[tokio::test]
935    async fn test_pool_timer_removes_expired() {
936        let _ = pretty_env_logger::try_init();
937        tokio::time::pause();
938
939        let pool = Pool::new(
940            super::Config {
941                idle_timeout: Some(Duration::from_millis(10)),
942                max_idle_per_host: std::usize::MAX,
943            },
944            &Exec::Default,
945        );
946
947        let key = host_key("foo");
948
949        pool.pooled(c(key.clone()), Uniq(41));
950        pool.pooled(c(key.clone()), Uniq(5));
951        pool.pooled(c(key.clone()), Uniq(99));
952
953        assert_eq!(
954            pool.locked().idle.get(&key).map(|entries| entries.len()),
955            Some(3)
956        );
957
958        // Let the timer tick passed the expiration...
959        tokio::time::advance(Duration::from_millis(30)).await;
960        // Yield so the Interval can reap...
961        tokio::task::yield_now().await;
962
963        assert!(pool.locked().idle.get(&key).is_none());
964    }
965
966    #[tokio::test]
967    async fn test_pool_checkout_task_unparked() {
968        use futures_util::future::join;
969        use futures_util::FutureExt;
970
971        let pool = pool_no_timer();
972        let key = host_key("foo");
973        let pooled = pool.pooled(c(key.clone()), Uniq(41));
974
975        let checkout = join(pool.checkout(key), async {
976            // the checkout future will park first,
977            // and then this lazy future will be polled, which will insert
978            // the pooled back into the pool
979            //
980            // this test makes sure that doing so will unpark the checkout
981            drop(pooled);
982        })
983        .map(|(entry, _)| entry);
984
985        assert_eq!(*checkout.await.unwrap(), Uniq(41));
986    }
987
988    #[tokio::test]
989    async fn test_pool_checkout_drop_cleans_up_waiters() {
990        let pool = pool_no_timer::<Uniq<i32>>();
991        let key = host_key("foo");
992
993        let mut checkout1 = pool.checkout(key.clone());
994        let mut checkout2 = pool.checkout(key.clone());
995
996        let poll_once1 = PollOnce(&mut checkout1);
997        let poll_once2 = PollOnce(&mut checkout2);
998
999        // first poll needed to get into Pool's parked
1000        poll_once1.await;
1001        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1002        poll_once2.await;
1003        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1004
1005        // on drop, clean up Pool
1006        drop(checkout1);
1007        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1008
1009        drop(checkout2);
1010        assert!(pool.locked().waiters.get(&key).is_none());
1011    }
1012
1013    #[derive(Debug)]
1014    struct CanClose {
1015        #[allow(unused)]
1016        val: i32,
1017        closed: bool,
1018    }
1019
1020    impl Poolable for CanClose {
1021        fn is_open(&self) -> bool {
1022            !self.closed
1023        }
1024
1025        fn reserve(self) -> Reservation<Self> {
1026            Reservation::Unique(self)
1027        }
1028
1029        fn can_share(&self) -> bool {
1030            false
1031        }
1032    }
1033
1034    #[test]
1035    fn pooled_drop_if_closed_doesnt_reinsert() {
1036        let pool = pool_no_timer();
1037        let key = host_key("foo");
1038        pool.pooled(
1039            c(key.clone()),
1040            CanClose {
1041                val: 57,
1042                closed: true,
1043            },
1044        );
1045
1046        assert!(!pool.locked().idle.contains_key(&key));
1047    }
1048}