1use std::collections::{HashMap, HashSet, VecDeque};
2use std::error::Error as StdError;
3use std::fmt;
4use std::future::Future;
5use std::marker::Unpin;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::sync::{Arc, Mutex, Weak};
9use std::task::{Context, Poll};
10
11#[cfg(not(feature = "runtime"))]
12use std::time::{Duration, Instant};
13
14use futures_channel::oneshot;
15#[cfg(feature = "runtime")]
16use tokio::time::{Duration, Instant, Interval};
17use tracing::{debug, trace};
18
19use super::client::Ver;
20use crate::common::exec::Exec;
21
22#[allow(missing_debug_implementations)]
24pub(super) struct Pool<T> {
25 inner: Option<Arc<Mutex<PoolInner<T>>>>,
27}
28
29pub(super) trait Poolable: Unpin + Send + Sized + 'static {
35 fn is_open(&self) -> bool;
36 fn reserve(self) -> Reservation<Self>;
40 fn can_share(&self) -> bool;
41}
42
43#[allow(missing_debug_implementations)]
50pub(super) enum Reservation<T> {
51 #[cfg(feature = "http2")]
55 Shared(T, T),
56 Unique(T),
59}
60
61pub(super) type Key = (http::uri::Scheme, http::uri::Authority); struct PoolInner<T> {
65 connecting: HashSet<Key>,
69 idle: HashMap<Key, Vec<Idle<T>>>,
72 max_idle_per_host: usize,
73 waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
83 #[cfg(feature = "runtime")]
86 idle_interval_ref: Option<oneshot::Sender<std::convert::Infallible>>,
87 #[cfg(feature = "runtime")]
88 exec: Exec,
89 timeout: Option<Duration>,
90}
91
92struct WeakOpt<T>(Option<Weak<T>>);
95
96#[derive(Clone, Copy, Debug)]
97pub(super) struct Config {
98 pub(super) idle_timeout: Option<Duration>,
99 pub(super) max_idle_per_host: usize,
100}
101
102impl Config {
103 pub(super) fn is_enabled(&self) -> bool {
104 self.max_idle_per_host > 0
105 }
106}
107
108impl<T> Pool<T> {
109 pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
110 let inner = if config.is_enabled() {
111 Some(Arc::new(Mutex::new(PoolInner {
112 connecting: HashSet::new(),
113 idle: HashMap::new(),
114 #[cfg(feature = "runtime")]
115 idle_interval_ref: None,
116 max_idle_per_host: config.max_idle_per_host,
117 waiters: HashMap::new(),
118 #[cfg(feature = "runtime")]
119 exec: __exec.clone(),
120 timeout: config.idle_timeout.filter(|&t| t > Duration::ZERO),
121 })))
122 } else {
123 None
124 };
125
126 Pool { inner }
127 }
128
129 fn is_enabled(&self) -> bool {
130 self.inner.is_some()
131 }
132
133 #[cfg(test)]
134 pub(super) fn no_timer(&self) {
135 #[cfg(feature = "runtime")]
137 {
138 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
139 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
140 let (tx, _) = oneshot::channel();
141 inner.idle_interval_ref = Some(tx);
142 }
143 }
144}
145
146impl<T: Poolable> Pool<T> {
147 pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
150 Checkout {
151 key,
152 pool: self.clone(),
153 waiter: None,
154 }
155 }
156
157 pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
160 if ver == Ver::Http2 {
161 if let Some(ref enabled) = self.inner {
162 let mut inner = enabled.lock().unwrap();
163 return if inner.connecting.insert(key.clone()) {
164 let connecting = Connecting {
165 key: key.clone(),
166 pool: WeakOpt::downgrade(enabled),
167 };
168 Some(connecting)
169 } else {
170 trace!("HTTP/2 connecting already in progress for {:?}", key);
171 None
172 };
173 }
174 }
175
176 Some(Connecting {
178 key: key.clone(),
179 pool: WeakOpt::none(),
182 })
183 }
184
185 #[cfg(test)]
186 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
187 self.inner.as_ref().expect("enabled").lock().expect("lock")
188 }
189
190 pub(super) fn pooled(
210 &self,
211 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
212 value: T,
213 ) -> Pooled<T> {
214 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
215 match value.reserve() {
216 #[cfg(feature = "http2")]
217 Reservation::Shared(to_insert, to_return) => {
218 let mut inner = enabled.lock().unwrap();
219 inner.put(connecting.key.clone(), to_insert, enabled);
220 inner.connected(&connecting.key);
223 connecting.pool = WeakOpt::none();
225
226 (to_return, WeakOpt::none())
229 }
230 Reservation::Unique(value) => {
231 (value, WeakOpt::downgrade(enabled))
235 }
236 }
237 } else {
238 debug_assert!(connecting.pool.upgrade().is_none());
242
243 (value, WeakOpt::none())
244 };
245 Pooled {
246 key: connecting.key.clone(),
247 is_reused: false,
248 pool: pool_ref,
249 value: Some(value),
250 }
251 }
252
253 fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
254 debug!("reuse idle connection for {:?}", key);
255 let mut pool_ref = WeakOpt::none();
264 if !value.can_share() {
265 if let Some(ref enabled) = self.inner {
266 pool_ref = WeakOpt::downgrade(enabled);
267 }
268 }
269
270 Pooled {
271 is_reused: true,
272 key: key.clone(),
273 pool: pool_ref,
274 value: Some(value),
275 }
276 }
277}
278
279struct IdlePopper<'a, T> {
281 key: &'a Key,
282 list: &'a mut Vec<Idle<T>>,
283}
284
285impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
286 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
287 while let Some(entry) = self.list.pop() {
288 if !entry.value.is_open() {
291 trace!("removing closed connection for {:?}", self.key);
292 continue;
293 }
294 if expiration.expires(entry.idle_at) {
301 trace!("removing expired connection for {:?}", self.key);
302 continue;
303 }
304
305 let value = match entry.value.reserve() {
306 #[cfg(feature = "http2")]
307 Reservation::Shared(to_reinsert, to_checkout) => {
308 self.list.push(Idle {
309 idle_at: Instant::now(),
310 value: to_reinsert,
311 });
312 to_checkout
313 }
314 Reservation::Unique(unique) => unique,
315 };
316
317 return Some(Idle {
318 idle_at: entry.idle_at,
319 value,
320 });
321 }
322
323 None
324 }
325}
326
327impl<T: Poolable> PoolInner<T> {
328 fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
329 if value.can_share() && self.idle.contains_key(&key) {
330 trace!("put; existing idle HTTP/2 connection for {:?}", key);
331 return;
332 }
333 trace!("put; add idle connection for {:?}", key);
334 let mut remove_waiters = false;
335 let mut value = Some(value);
336 if let Some(waiters) = self.waiters.get_mut(&key) {
337 while let Some(tx) = waiters.pop_front() {
338 if !tx.is_canceled() {
339 let reserved = value.take().expect("value already sent");
340 let reserved = match reserved.reserve() {
341 #[cfg(feature = "http2")]
342 Reservation::Shared(to_keep, to_send) => {
343 value = Some(to_keep);
344 to_send
345 }
346 Reservation::Unique(uniq) => uniq,
347 };
348 match tx.send(reserved) {
349 Ok(()) => {
350 if value.is_none() {
351 break;
352 } else {
353 continue;
354 }
355 }
356 Err(e) => {
357 value = Some(e);
358 }
359 }
360 }
361
362 trace!("put; removing canceled waiter for {:?}", key);
363 }
364 remove_waiters = waiters.is_empty();
365 }
366 if remove_waiters {
367 self.waiters.remove(&key);
368 }
369
370 match value {
371 Some(value) => {
372 {
374 let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
375 if self.max_idle_per_host <= idle_list.len() {
376 trace!("max idle per host for {:?}, dropping connection", key);
377 return;
378 }
379
380 debug!("pooling idle connection for {:?}", key);
381 idle_list.push(Idle {
382 value,
383 idle_at: Instant::now(),
384 });
385 }
386
387 #[cfg(feature = "runtime")]
388 {
389 self.spawn_idle_interval(__pool_ref);
390 }
391 }
392 None => trace!("put; found waiter for {:?}", key),
393 }
394 }
395
396 fn connected(&mut self, key: &Key) {
399 let existed = self.connecting.remove(key);
400 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
401 self.waiters.remove(key);
405 }
406
407 #[cfg(feature = "runtime")]
408 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
409 let (dur, rx) = {
410 if self.idle_interval_ref.is_some() {
411 return;
412 }
413
414 if let Some(dur) = self.timeout {
415 let (tx, rx) = oneshot::channel();
416 self.idle_interval_ref = Some(tx);
417 (dur, rx)
418 } else {
419 return;
420 }
421 };
422
423 let interval = IdleTask {
424 interval: tokio::time::interval(dur),
425 pool: WeakOpt::downgrade(pool_ref),
426 pool_drop_notifier: rx,
427 };
428
429 self.exec.execute(interval);
430 }
431}
432
433impl<T> PoolInner<T> {
434 fn clean_waiters(&mut self, key: &Key) {
439 let mut remove_waiters = false;
440 if let Some(waiters) = self.waiters.get_mut(key) {
441 waiters.retain(|tx| !tx.is_canceled());
442 remove_waiters = waiters.is_empty();
443 }
444 if remove_waiters {
445 self.waiters.remove(key);
446 }
447 }
448}
449
450#[cfg(feature = "runtime")]
451impl<T: Poolable> PoolInner<T> {
452 fn clear_expired(&mut self) {
454 let dur = self.timeout.expect("interval assumes timeout");
455
456 let now = Instant::now();
457 self.idle.retain(|key, values| {
460 values.retain(|entry| {
461 if !entry.value.is_open() {
462 trace!("idle interval evicting closed for {:?}", key);
463 return false;
464 }
465
466 if now.saturating_duration_since(entry.idle_at) > dur {
468 trace!("idle interval evicting expired for {:?}", key);
469 return false;
470 }
471
472 true
474 });
475
476 !values.is_empty()
478 });
479 }
480}
481
482impl<T> Clone for Pool<T> {
483 fn clone(&self) -> Pool<T> {
484 Pool {
485 inner: self.inner.clone(),
486 }
487 }
488}
489
490pub(super) struct Pooled<T: Poolable> {
493 value: Option<T>,
494 is_reused: bool,
495 key: Key,
496 pool: WeakOpt<Mutex<PoolInner<T>>>,
497}
498
499impl<T: Poolable> Pooled<T> {
500 pub(super) fn is_reused(&self) -> bool {
501 self.is_reused
502 }
503
504 pub(super) fn is_pool_enabled(&self) -> bool {
505 self.pool.0.is_some()
506 }
507
508 fn as_ref(&self) -> &T {
509 self.value.as_ref().expect("not dropped")
510 }
511
512 fn as_mut(&mut self) -> &mut T {
513 self.value.as_mut().expect("not dropped")
514 }
515}
516
517impl<T: Poolable> Deref for Pooled<T> {
518 type Target = T;
519 fn deref(&self) -> &T {
520 self.as_ref()
521 }
522}
523
524impl<T: Poolable> DerefMut for Pooled<T> {
525 fn deref_mut(&mut self) -> &mut T {
526 self.as_mut()
527 }
528}
529
530impl<T: Poolable> Drop for Pooled<T> {
531 fn drop(&mut self) {
532 if let Some(value) = self.value.take() {
533 if !value.is_open() {
534 return;
537 }
538
539 if let Some(pool) = self.pool.upgrade() {
540 if let Ok(mut inner) = pool.lock() {
541 inner.put(self.key.clone(), value, &pool);
542 }
543 } else if !value.can_share() {
544 trace!("pool dropped, dropping pooled ({:?})", self.key);
545 }
546 }
549 }
550}
551
552impl<T: Poolable> fmt::Debug for Pooled<T> {
553 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554 f.debug_struct("Pooled").field("key", &self.key).finish()
555 }
556}
557
558struct Idle<T> {
559 idle_at: Instant,
560 value: T,
561}
562
563#[allow(missing_debug_implementations)]
565pub(super) struct Checkout<T> {
566 key: Key,
567 pool: Pool<T>,
568 waiter: Option<oneshot::Receiver<T>>,
569}
570
571#[derive(Debug)]
572pub(super) struct CheckoutIsClosedError;
573
574impl StdError for CheckoutIsClosedError {}
575
576impl fmt::Display for CheckoutIsClosedError {
577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578 f.write_str("checked out connection was closed")
579 }
580}
581
582impl<T: Poolable> Checkout<T> {
583 fn poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>> {
584 if let Some(mut rx) = self.waiter.take() {
585 match Pin::new(&mut rx).poll(cx) {
586 Poll::Ready(Ok(value)) => {
587 if value.is_open() {
588 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
589 } else {
590 Poll::Ready(Some(Err(
591 crate::Error::new_canceled().with(CheckoutIsClosedError)
592 )))
593 }
594 }
595 Poll::Pending => {
596 self.waiter = Some(rx);
597 Poll::Pending
598 }
599 Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
600 crate::Error::new_canceled().with("request has been canceled")
601 ))),
602 }
603 } else {
604 Poll::Ready(None)
605 }
606 }
607
608 fn checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>> {
609 let entry = {
610 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
611 let expiration = Expiration::new(inner.timeout);
612 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
613 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
614 {
617 let popper = IdlePopper {
618 key: &self.key,
619 list,
620 };
621 popper.pop(&expiration)
622 }
623 .map(|e| (e, list.is_empty()))
624 });
625
626 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
627 (Some(e), empty)
628 } else {
629 (None, true)
631 };
632 if empty {
633 inner.idle.remove(&self.key);
635 }
636
637 if entry.is_none() && self.waiter.is_none() {
638 let (tx, mut rx) = oneshot::channel();
639 trace!("checkout waiting for idle connection: {:?}", self.key);
640 inner
641 .waiters
642 .entry(self.key.clone())
643 .or_insert_with(VecDeque::new)
644 .push_back(tx);
645
646 assert!(Pin::new(&mut rx).poll(cx).is_pending());
648 self.waiter = Some(rx);
649 }
650
651 entry
652 };
653
654 entry.map(|e| self.pool.reuse(&self.key, e.value))
655 }
656}
657
658impl<T: Poolable> Future for Checkout<T> {
659 type Output = crate::Result<Pooled<T>>;
660
661 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
662 if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
663 return Poll::Ready(Ok(pooled));
664 }
665
666 if let Some(pooled) = self.checkout(cx) {
667 Poll::Ready(Ok(pooled))
668 } else if !self.pool.is_enabled() {
669 Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
670 } else {
671 debug_assert!(self.waiter.is_some());
673 Poll::Pending
674 }
675 }
676}
677
678impl<T> Drop for Checkout<T> {
679 fn drop(&mut self) {
680 if self.waiter.take().is_some() {
681 trace!("checkout dropped for {:?}", self.key);
682 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
683 inner.clean_waiters(&self.key);
684 }
685 }
686 }
687}
688
689#[allow(missing_debug_implementations)]
691pub(super) struct Connecting<T: Poolable> {
692 key: Key,
693 pool: WeakOpt<Mutex<PoolInner<T>>>,
694}
695
696impl<T: Poolable> Connecting<T> {
697 pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
698 debug_assert!(
699 self.pool.0.is_none(),
700 "Connecting::alpn_h2 but already Http2"
701 );
702
703 pool.connecting(&self.key, Ver::Http2)
704 }
705}
706
707impl<T: Poolable> Drop for Connecting<T> {
708 fn drop(&mut self) {
709 if let Some(pool) = self.pool.upgrade() {
710 if let Ok(mut inner) = pool.lock() {
712 inner.connected(&self.key);
713 }
714 }
715 }
716}
717
718struct Expiration(Option<Duration>);
719
720impl Expiration {
721 fn new(dur: Option<Duration>) -> Expiration {
722 Expiration(dur)
723 }
724
725 fn expires(&self, instant: Instant) -> bool {
726 match self.0 {
727 Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
729 None => false,
730 }
731 }
732}
733
734#[cfg(feature = "runtime")]
735pin_project_lite::pin_project! {
736 struct IdleTask<T> {
737 #[pin]
738 interval: Interval,
739 pool: WeakOpt<Mutex<PoolInner<T>>>,
740 #[pin]
744 pool_drop_notifier: oneshot::Receiver<std::convert::Infallible>,
745 }
746}
747
748#[cfg(feature = "runtime")]
749impl<T: Poolable + 'static> Future for IdleTask<T> {
750 type Output = ();
751
752 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
753 let mut this = self.project();
754 loop {
755 match this.pool_drop_notifier.as_mut().poll(cx) {
756 Poll::Ready(Ok(n)) => match n {},
757 Poll::Pending => (),
758 Poll::Ready(Err(_canceled)) => {
759 trace!("pool closed, canceling idle interval");
760 return Poll::Ready(());
761 }
762 }
763
764 ready!(this.interval.as_mut().poll_tick(cx));
765
766 if let Some(inner) = this.pool.upgrade() {
767 if let Ok(mut inner) = inner.lock() {
768 trace!("idle interval checking for expired");
769 inner.clear_expired();
770 continue;
771 }
772 }
773 return Poll::Ready(());
774 }
775 }
776}
777
778impl<T> WeakOpt<T> {
779 fn none() -> Self {
780 WeakOpt(None)
781 }
782
783 fn downgrade(arc: &Arc<T>) -> Self {
784 WeakOpt(Some(Arc::downgrade(arc)))
785 }
786
787 fn upgrade(&self) -> Option<Arc<T>> {
788 self.0.as_ref().and_then(Weak::upgrade)
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use std::future::Future;
795 use std::pin::Pin;
796 use std::task::Context;
797 use std::task::Poll;
798 use std::time::Duration;
799
800 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
801 use crate::common::exec::Exec;
802
803 #[derive(Debug, PartialEq, Eq)]
805 struct Uniq<T>(T);
806
807 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
808 fn is_open(&self) -> bool {
809 true
810 }
811
812 fn reserve(self) -> Reservation<Self> {
813 Reservation::Unique(self)
814 }
815
816 fn can_share(&self) -> bool {
817 false
818 }
819 }
820
821 fn c<T: Poolable>(key: Key) -> Connecting<T> {
822 Connecting {
823 key,
824 pool: WeakOpt::none(),
825 }
826 }
827
828 fn host_key(s: &str) -> Key {
829 (http::uri::Scheme::HTTP, s.parse().expect("host key"))
830 }
831
832 fn pool_no_timer<T>() -> Pool<T> {
833 pool_max_idle_no_timer(::std::usize::MAX)
834 }
835
836 fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
837 let pool = Pool::new(
838 super::Config {
839 idle_timeout: Some(Duration::from_millis(100)),
840 max_idle_per_host: max_idle,
841 },
842 &Exec::Default,
843 );
844 pool.no_timer();
845 pool
846 }
847
848 #[tokio::test]
849 async fn test_pool_checkout_smoke() {
850 let pool = pool_no_timer();
851 let key = host_key("foo");
852 let pooled = pool.pooled(c(key.clone()), Uniq(41));
853
854 drop(pooled);
855
856 match pool.checkout(key).await {
857 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
858 Err(_) => panic!("not ready"),
859 };
860 }
861
862 struct PollOnce<'a, F>(&'a mut F);
864
865 impl<F, T, U> Future for PollOnce<'_, F>
866 where
867 F: Future<Output = Result<T, U>> + Unpin,
868 {
869 type Output = Option<()>;
870
871 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872 match Pin::new(&mut self.0).poll(cx) {
873 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
874 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
875 Poll::Pending => Poll::Ready(None),
876 }
877 }
878 }
879
880 #[tokio::test]
881 async fn test_pool_checkout_returns_none_if_expired() {
882 let pool = pool_no_timer();
883 let key = host_key("foo");
884 let pooled = pool.pooled(c(key.clone()), Uniq(41));
885
886 drop(pooled);
887 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
888 let mut checkout = pool.checkout(key);
889 let poll_once = PollOnce(&mut checkout);
890 let is_not_ready = poll_once.await.is_none();
891 assert!(is_not_ready);
892 }
893
894 #[cfg(feature = "runtime")]
895 #[tokio::test]
896 async fn test_pool_checkout_removes_expired() {
897 let pool = pool_no_timer();
898 let key = host_key("foo");
899
900 pool.pooled(c(key.clone()), Uniq(41));
901 pool.pooled(c(key.clone()), Uniq(5));
902 pool.pooled(c(key.clone()), Uniq(99));
903
904 assert_eq!(
905 pool.locked().idle.get(&key).map(|entries| entries.len()),
906 Some(3)
907 );
908 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
909
910 let mut checkout = pool.checkout(key.clone());
911 let poll_once = PollOnce(&mut checkout);
912 poll_once.await;
914 assert!(pool.locked().idle.get(&key).is_none());
915 }
916
917 #[test]
918 fn test_pool_max_idle_per_host() {
919 let pool = pool_max_idle_no_timer(2);
920 let key = host_key("foo");
921
922 pool.pooled(c(key.clone()), Uniq(41));
923 pool.pooled(c(key.clone()), Uniq(5));
924 pool.pooled(c(key.clone()), Uniq(99));
925
926 assert_eq!(
928 pool.locked().idle.get(&key).map(|entries| entries.len()),
929 Some(2)
930 );
931 }
932
933 #[cfg(feature = "runtime")]
934 #[tokio::test]
935 async fn test_pool_timer_removes_expired() {
936 let _ = pretty_env_logger::try_init();
937 tokio::time::pause();
938
939 let pool = Pool::new(
940 super::Config {
941 idle_timeout: Some(Duration::from_millis(10)),
942 max_idle_per_host: std::usize::MAX,
943 },
944 &Exec::Default,
945 );
946
947 let key = host_key("foo");
948
949 pool.pooled(c(key.clone()), Uniq(41));
950 pool.pooled(c(key.clone()), Uniq(5));
951 pool.pooled(c(key.clone()), Uniq(99));
952
953 assert_eq!(
954 pool.locked().idle.get(&key).map(|entries| entries.len()),
955 Some(3)
956 );
957
958 tokio::time::advance(Duration::from_millis(30)).await;
960 tokio::task::yield_now().await;
962
963 assert!(pool.locked().idle.get(&key).is_none());
964 }
965
966 #[tokio::test]
967 async fn test_pool_checkout_task_unparked() {
968 use futures_util::future::join;
969 use futures_util::FutureExt;
970
971 let pool = pool_no_timer();
972 let key = host_key("foo");
973 let pooled = pool.pooled(c(key.clone()), Uniq(41));
974
975 let checkout = join(pool.checkout(key), async {
976 drop(pooled);
982 })
983 .map(|(entry, _)| entry);
984
985 assert_eq!(*checkout.await.unwrap(), Uniq(41));
986 }
987
988 #[tokio::test]
989 async fn test_pool_checkout_drop_cleans_up_waiters() {
990 let pool = pool_no_timer::<Uniq<i32>>();
991 let key = host_key("foo");
992
993 let mut checkout1 = pool.checkout(key.clone());
994 let mut checkout2 = pool.checkout(key.clone());
995
996 let poll_once1 = PollOnce(&mut checkout1);
997 let poll_once2 = PollOnce(&mut checkout2);
998
999 poll_once1.await;
1001 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1002 poll_once2.await;
1003 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1004
1005 drop(checkout1);
1007 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1008
1009 drop(checkout2);
1010 assert!(pool.locked().waiters.get(&key).is_none());
1011 }
1012
1013 #[derive(Debug)]
1014 struct CanClose {
1015 #[allow(unused)]
1016 val: i32,
1017 closed: bool,
1018 }
1019
1020 impl Poolable for CanClose {
1021 fn is_open(&self) -> bool {
1022 !self.closed
1023 }
1024
1025 fn reserve(self) -> Reservation<Self> {
1026 Reservation::Unique(self)
1027 }
1028
1029 fn can_share(&self) -> bool {
1030 false
1031 }
1032 }
1033
1034 #[test]
1035 fn pooled_drop_if_closed_doesnt_reinsert() {
1036 let pool = pool_no_timer();
1037 let key = host_key("foo");
1038 pool.pooled(
1039 c(key.clone()),
1040 CanClose {
1041 val: 57,
1042 closed: true,
1043 },
1044 );
1045
1046 assert!(!pool.locked().idle.contains_key(&key));
1047 }
1048}