tokio/sync/
broadcast.rs

1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//!     let (tx, mut rx1) = broadcast::channel(16);
79//!     let mut rx2 = tx.subscribe();
80//!
81//!     tokio::spawn(async move {
82//!         assert_eq!(rx1.recv().await.unwrap(), 10);
83//!         assert_eq!(rx1.recv().await.unwrap(), 20);
84//!     });
85//!
86//!     tokio::spawn(async move {
87//!         assert_eq!(rx2.recv().await.unwrap(), 10);
88//!         assert_eq!(rx2.recv().await.unwrap(), 20);
89//!     });
90//!
91//!     tx.send(10).unwrap();
92//!     tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//!     let (tx, mut rx) = broadcast::channel(2);
104//!
105//!     tx.send(10).unwrap();
106//!     tx.send(20).unwrap();
107//!     tx.send(30).unwrap();
108//!
109//!     // The receiver lagged behind
110//!     assert!(rx.recv().await.is_err());
111//!
112//!     // At this point, we can abort or continue with lost messages
113//!
114//!     assert_eq!(20, rx.recv().await.unwrap());
115//!     assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard};
122use crate::runtime::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146///     let (tx, mut rx1) = broadcast::channel(16);
147///     let mut rx2 = tx.subscribe();
148///
149///     tokio::spawn(async move {
150///         assert_eq!(rx1.recv().await.unwrap(), 10);
151///         assert_eq!(rx1.recv().await.unwrap(), 20);
152///     });
153///
154///     tokio::spawn(async move {
155///         assert_eq!(rx2.recv().await.unwrap(), 10);
156///         assert_eq!(rx2.recv().await.unwrap(), 20);
157///     });
158///
159///     tx.send(10).unwrap();
160///     tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166    shared: Arc<Shared<T>>,
167}
168
169/// Receiving-half of the [`broadcast`] channel.
170///
171/// Must not be used concurrently. Messages may be retrieved using
172/// [`recv`][Receiver::recv].
173///
174/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175/// wrapper.
176///
177/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178///
179/// # Examples
180///
181/// ```
182/// use tokio::sync::broadcast;
183///
184/// #[tokio::main]
185/// async fn main() {
186///     let (tx, mut rx1) = broadcast::channel(16);
187///     let mut rx2 = tx.subscribe();
188///
189///     tokio::spawn(async move {
190///         assert_eq!(rx1.recv().await.unwrap(), 10);
191///         assert_eq!(rx1.recv().await.unwrap(), 20);
192///     });
193///
194///     tokio::spawn(async move {
195///         assert_eq!(rx2.recv().await.unwrap(), 10);
196///         assert_eq!(rx2.recv().await.unwrap(), 20);
197///     });
198///
199///     tx.send(10).unwrap();
200///     tx.send(20).unwrap();
201/// }
202/// ```
203///
204/// [`broadcast`]: crate::sync::broadcast
205pub struct Receiver<T> {
206    /// State shared with all receivers and senders.
207    shared: Arc<Shared<T>>,
208
209    /// Next position to read from
210    next: u64,
211}
212
213pub mod error {
214    //! Broadcast error types
215
216    use std::fmt;
217
218    /// Error returned by the [`send`] function on a [`Sender`].
219    ///
220    /// A **send** operation can only fail if there are no active receivers,
221    /// implying that the message could never be received. The error contains the
222    /// message being sent as a payload so it can be recovered.
223    ///
224    /// [`send`]: crate::sync::broadcast::Sender::send
225    /// [`Sender`]: crate::sync::broadcast::Sender
226    #[derive(Debug)]
227    pub struct SendError<T>(pub T);
228
229    impl<T> fmt::Display for SendError<T> {
230        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231            write!(f, "channel closed")
232        }
233    }
234
235    impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236
237    /// An error returned from the [`recv`] function on a [`Receiver`].
238    ///
239    /// [`recv`]: crate::sync::broadcast::Receiver::recv
240    /// [`Receiver`]: crate::sync::broadcast::Receiver
241    #[derive(Debug, PartialEq, Eq, Clone)]
242    pub enum RecvError {
243        /// There are no more active senders implying no further messages will ever
244        /// be sent.
245        Closed,
246
247        /// The receiver lagged too far behind. Attempting to receive again will
248        /// return the oldest message still retained by the channel.
249        ///
250        /// Includes the number of skipped messages.
251        Lagged(u64),
252    }
253
254    impl fmt::Display for RecvError {
255        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256            match self {
257                RecvError::Closed => write!(f, "channel closed"),
258                RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
259            }
260        }
261    }
262
263    impl std::error::Error for RecvError {}
264
265    /// An error returned from the [`try_recv`] function on a [`Receiver`].
266    ///
267    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268    /// [`Receiver`]: crate::sync::broadcast::Receiver
269    #[derive(Debug, PartialEq, Eq, Clone)]
270    pub enum TryRecvError {
271        /// The channel is currently empty. There are still active
272        /// [`Sender`] handles, so data may yet become available.
273        ///
274        /// [`Sender`]: crate::sync::broadcast::Sender
275        Empty,
276
277        /// There are no more active senders implying no further messages will ever
278        /// be sent.
279        Closed,
280
281        /// The receiver lagged too far behind and has been forcibly disconnected.
282        /// Attempting to receive again will return the oldest message still
283        /// retained by the channel.
284        ///
285        /// Includes the number of skipped messages.
286        Lagged(u64),
287    }
288
289    impl fmt::Display for TryRecvError {
290        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291            match self {
292                TryRecvError::Empty => write!(f, "channel empty"),
293                TryRecvError::Closed => write!(f, "channel closed"),
294                TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
295            }
296        }
297    }
298
299    impl std::error::Error for TryRecvError {}
300}
301
302use self::error::{RecvError, SendError, TryRecvError};
303
304/// Data shared between senders and receivers.
305struct Shared<T> {
306    /// slots in the channel.
307    buffer: Box<[Mutex<Slot<T>>]>,
308
309    /// Mask a position -> index.
310    mask: usize,
311
312    /// Tail of the queue. Includes the rx wait list.
313    tail: Mutex<Tail>,
314
315    /// Number of outstanding Sender handles.
316    num_tx: AtomicUsize,
317}
318
319/// Next position to write a value.
320struct Tail {
321    /// Next position to write to.
322    pos: u64,
323
324    /// Number of active receivers.
325    rx_cnt: usize,
326
327    /// True if the channel is closed.
328    closed: bool,
329
330    /// Receivers waiting for a value.
331    waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
332}
333
334/// Slot in the buffer.
335struct Slot<T> {
336    /// Remaining number of receivers that are expected to see this value.
337    ///
338    /// When this goes to zero, the value is released.
339    ///
340    /// An atomic is used as it is mutated concurrently with the slot read lock
341    /// acquired.
342    rem: AtomicUsize,
343
344    /// Uniquely identifies the `send` stored in the slot.
345    pos: u64,
346
347    /// The value being broadcast.
348    ///
349    /// The value is set by `send` when the write lock is held. When a reader
350    /// drops, `rem` is decremented. When it hits zero, the value is dropped.
351    val: Option<T>,
352}
353
354/// An entry in the wait queue.
355struct Waiter {
356    /// True if queued.
357    queued: AtomicBool,
358
359    /// Task waiting on the broadcast channel.
360    waker: Option<Waker>,
361
362    /// Intrusive linked-list pointers.
363    pointers: linked_list::Pointers<Waiter>,
364
365    /// Should not be `Unpin`.
366    _p: PhantomPinned,
367}
368
369impl Waiter {
370    fn new() -> Self {
371        Self {
372            queued: AtomicBool::new(false),
373            waker: None,
374            pointers: linked_list::Pointers::new(),
375            _p: PhantomPinned,
376        }
377    }
378}
379
380generate_addr_of_methods! {
381    impl<> Waiter {
382        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
383            &self.pointers
384        }
385    }
386}
387
388struct RecvGuard<'a, T> {
389    slot: MutexGuard<'a, Slot<T>>,
390}
391
392/// Receive a value future.
393struct Recv<'a, T> {
394    /// Receiver being waited on.
395    receiver: &'a mut Receiver<T>,
396
397    /// Entry in the waiter `LinkedList`.
398    waiter: WaiterCell,
399}
400
401// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
402// from `Recv`.
403struct WaiterCell(UnsafeCell<Waiter>);
404
405unsafe impl Send for WaiterCell {}
406unsafe impl Sync for WaiterCell {}
407
408/// Max number of receivers. Reserve space to lock.
409const MAX_RECEIVERS: usize = usize::MAX >> 2;
410
411/// Create a bounded, multi-producer, multi-consumer channel where each sent
412/// value is broadcasted to all active receivers.
413///
414/// **Note:** The actual capacity may be greater than the provided `capacity`.
415///
416/// All data sent on [`Sender`] will become available on every active
417/// [`Receiver`] in the same order as it was sent.
418///
419/// The `Sender` can be cloned to `send` to the same channel from multiple
420/// points in the process or it can be used concurrently from an `Arc`. New
421/// `Receiver` handles are created by calling [`Sender::subscribe`].
422///
423/// If all [`Receiver`] handles are dropped, the `send` method will return a
424/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
425/// method will return a [`RecvError`].
426///
427/// [`Sender`]: crate::sync::broadcast::Sender
428/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
429/// [`Receiver`]: crate::sync::broadcast::Receiver
430/// [`recv`]: crate::sync::broadcast::Receiver::recv
431/// [`SendError`]: crate::sync::broadcast::error::SendError
432/// [`RecvError`]: crate::sync::broadcast::error::RecvError
433///
434/// # Examples
435///
436/// ```
437/// use tokio::sync::broadcast;
438///
439/// #[tokio::main]
440/// async fn main() {
441///     let (tx, mut rx1) = broadcast::channel(16);
442///     let mut rx2 = tx.subscribe();
443///
444///     tokio::spawn(async move {
445///         assert_eq!(rx1.recv().await.unwrap(), 10);
446///         assert_eq!(rx1.recv().await.unwrap(), 20);
447///     });
448///
449///     tokio::spawn(async move {
450///         assert_eq!(rx2.recv().await.unwrap(), 10);
451///         assert_eq!(rx2.recv().await.unwrap(), 20);
452///     });
453///
454///     tx.send(10).unwrap();
455///     tx.send(20).unwrap();
456/// }
457/// ```
458///
459/// # Panics
460///
461/// This will panic if `capacity` is equal to `0` or larger
462/// than `usize::MAX / 2`.
463#[track_caller]
464pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
465    // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
466    let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
467    let rx = Receiver {
468        shared: tx.shared.clone(),
469        next: 0,
470    };
471    (tx, rx)
472}
473
474impl<T> Sender<T> {
475    /// Creates the sending-half of the [`broadcast`] channel.
476    ///
477    /// See the documentation of [`broadcast::channel`] for more information on this method.
478    ///
479    /// [`broadcast`]: crate::sync::broadcast
480    /// [`broadcast::channel`]: crate::sync::broadcast::channel
481    #[track_caller]
482    pub fn new(capacity: usize) -> Self {
483        // SAFETY: We don't create extra receivers, so there are 0.
484        unsafe { Self::new_with_receiver_count(0, capacity) }
485    }
486
487    /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
488    /// count.
489    ///
490    /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
491    /// calling this function.
492    ///
493    /// # Safety:
494    ///
495    /// The caller must ensure that the amount of receivers for this Sender is correct before
496    /// the channel functionalities are used, the count is zero by default, as this function
497    /// does not create any receivers by itself.
498    #[track_caller]
499    unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
500        assert!(capacity > 0, "broadcast channel capacity cannot be zero");
501        assert!(
502            capacity <= usize::MAX >> 1,
503            "broadcast channel capacity exceeded `usize::MAX / 2`"
504        );
505
506        // Round to a power of two
507        capacity = capacity.next_power_of_two();
508
509        let mut buffer = Vec::with_capacity(capacity);
510
511        for i in 0..capacity {
512            buffer.push(Mutex::new(Slot {
513                rem: AtomicUsize::new(0),
514                pos: (i as u64).wrapping_sub(capacity as u64),
515                val: None,
516            }));
517        }
518
519        let shared = Arc::new(Shared {
520            buffer: buffer.into_boxed_slice(),
521            mask: capacity - 1,
522            tail: Mutex::new(Tail {
523                pos: 0,
524                rx_cnt: receiver_count,
525                closed: false,
526                waiters: LinkedList::new(),
527            }),
528            num_tx: AtomicUsize::new(1),
529        });
530
531        Sender { shared }
532    }
533
534    /// Attempts to send a value to all active [`Receiver`] handles, returning
535    /// it back if it could not be sent.
536    ///
537    /// A successful send occurs when there is at least one active [`Receiver`]
538    /// handle. An unsuccessful send would be one where all associated
539    /// [`Receiver`] handles have already been dropped.
540    ///
541    /// # Return
542    ///
543    /// On success, the number of subscribed [`Receiver`] handles is returned.
544    /// This does not mean that this number of receivers will see the message as
545    /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
546    /// the message.
547    ///
548    /// # Note
549    ///
550    /// A return value of `Ok` **does not** mean that the sent value will be
551    /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
552    /// handles may be dropped before receiving the sent message.
553    ///
554    /// A return value of `Err` **does not** mean that future calls to `send`
555    /// will fail. New [`Receiver`] handles may be created by calling
556    /// [`subscribe`].
557    ///
558    /// [`Receiver`]: crate::sync::broadcast::Receiver
559    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
560    ///
561    /// # Examples
562    ///
563    /// ```
564    /// use tokio::sync::broadcast;
565    ///
566    /// #[tokio::main]
567    /// async fn main() {
568    ///     let (tx, mut rx1) = broadcast::channel(16);
569    ///     let mut rx2 = tx.subscribe();
570    ///
571    ///     tokio::spawn(async move {
572    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
573    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
574    ///     });
575    ///
576    ///     tokio::spawn(async move {
577    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
578    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
579    ///     });
580    ///
581    ///     tx.send(10).unwrap();
582    ///     tx.send(20).unwrap();
583    /// }
584    /// ```
585    pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
586        let mut tail = self.shared.tail.lock();
587
588        if tail.rx_cnt == 0 {
589            return Err(SendError(value));
590        }
591
592        // Position to write into
593        let pos = tail.pos;
594        let rem = tail.rx_cnt;
595        let idx = (pos & self.shared.mask as u64) as usize;
596
597        // Update the tail position
598        tail.pos = tail.pos.wrapping_add(1);
599
600        // Get the slot
601        let mut slot = self.shared.buffer[idx].lock();
602
603        // Track the position
604        slot.pos = pos;
605
606        // Set remaining receivers
607        slot.rem.with_mut(|v| *v = rem);
608
609        // Write the value
610        slot.val = Some(value);
611
612        // Release the slot lock before notifying the receivers.
613        drop(slot);
614
615        // Notify and release the mutex. This must happen after the slot lock is
616        // released, otherwise the writer lock bit could be cleared while another
617        // thread is in the critical section.
618        self.shared.notify_rx(tail);
619
620        Ok(rem)
621    }
622
623    /// Creates a new [`Receiver`] handle that will receive values sent **after**
624    /// this call to `subscribe`.
625    ///
626    /// # Examples
627    ///
628    /// ```
629    /// use tokio::sync::broadcast;
630    ///
631    /// #[tokio::main]
632    /// async fn main() {
633    ///     let (tx, _rx) = broadcast::channel(16);
634    ///
635    ///     // Will not be seen
636    ///     tx.send(10).unwrap();
637    ///
638    ///     let mut rx = tx.subscribe();
639    ///
640    ///     tx.send(20).unwrap();
641    ///
642    ///     let value = rx.recv().await.unwrap();
643    ///     assert_eq!(20, value);
644    /// }
645    /// ```
646    pub fn subscribe(&self) -> Receiver<T> {
647        let shared = self.shared.clone();
648        new_receiver(shared)
649    }
650
651    /// Returns the number of queued values.
652    ///
653    /// A value is queued until it has either been seen by all receivers that were alive at the time
654    /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
655    /// queue's capacity.
656    ///
657    /// # Note
658    ///
659    /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
660    /// have been evicted from the queue before being seen by all receivers.
661    ///
662    /// # Examples
663    ///
664    /// ```
665    /// use tokio::sync::broadcast;
666    ///
667    /// #[tokio::main]
668    /// async fn main() {
669    ///     let (tx, mut rx1) = broadcast::channel(16);
670    ///     let mut rx2 = tx.subscribe();
671    ///
672    ///     tx.send(10).unwrap();
673    ///     tx.send(20).unwrap();
674    ///     tx.send(30).unwrap();
675    ///
676    ///     assert_eq!(tx.len(), 3);
677    ///
678    ///     rx1.recv().await.unwrap();
679    ///
680    ///     // The len is still 3 since rx2 hasn't seen the first value yet.
681    ///     assert_eq!(tx.len(), 3);
682    ///
683    ///     rx2.recv().await.unwrap();
684    ///
685    ///     assert_eq!(tx.len(), 2);
686    /// }
687    /// ```
688    pub fn len(&self) -> usize {
689        let tail = self.shared.tail.lock();
690
691        let base_idx = (tail.pos & self.shared.mask as u64) as usize;
692        let mut low = 0;
693        let mut high = self.shared.buffer.len();
694        while low < high {
695            let mid = low + (high - low) / 2;
696            let idx = base_idx.wrapping_add(mid) & self.shared.mask;
697            if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
698                low = mid + 1;
699            } else {
700                high = mid;
701            }
702        }
703
704        self.shared.buffer.len() - low
705    }
706
707    /// Returns true if there are no queued values.
708    ///
709    /// # Examples
710    ///
711    /// ```
712    /// use tokio::sync::broadcast;
713    ///
714    /// #[tokio::main]
715    /// async fn main() {
716    ///     let (tx, mut rx1) = broadcast::channel(16);
717    ///     let mut rx2 = tx.subscribe();
718    ///
719    ///     assert!(tx.is_empty());
720    ///
721    ///     tx.send(10).unwrap();
722    ///
723    ///     assert!(!tx.is_empty());
724    ///
725    ///     rx1.recv().await.unwrap();
726    ///
727    ///     // The queue is still not empty since rx2 hasn't seen the value.
728    ///     assert!(!tx.is_empty());
729    ///
730    ///     rx2.recv().await.unwrap();
731    ///
732    ///     assert!(tx.is_empty());
733    /// }
734    /// ```
735    pub fn is_empty(&self) -> bool {
736        let tail = self.shared.tail.lock();
737
738        let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
739        self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
740    }
741
742    /// Returns the number of active receivers.
743    ///
744    /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
745    /// [`subscribe`]. These are the handles that will receive values sent on
746    /// this [`Sender`].
747    ///
748    /// # Note
749    ///
750    /// It is not guaranteed that a sent message will reach this number of
751    /// receivers. Active receivers may never call [`recv`] again before
752    /// dropping.
753    ///
754    /// [`recv`]: crate::sync::broadcast::Receiver::recv
755    /// [`Receiver`]: crate::sync::broadcast::Receiver
756    /// [`Sender`]: crate::sync::broadcast::Sender
757    /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
758    /// [`channel`]: crate::sync::broadcast::channel
759    ///
760    /// # Examples
761    ///
762    /// ```
763    /// use tokio::sync::broadcast;
764    ///
765    /// #[tokio::main]
766    /// async fn main() {
767    ///     let (tx, _rx1) = broadcast::channel(16);
768    ///
769    ///     assert_eq!(1, tx.receiver_count());
770    ///
771    ///     let mut _rx2 = tx.subscribe();
772    ///
773    ///     assert_eq!(2, tx.receiver_count());
774    ///
775    ///     tx.send(10).unwrap();
776    /// }
777    /// ```
778    pub fn receiver_count(&self) -> usize {
779        let tail = self.shared.tail.lock();
780        tail.rx_cnt
781    }
782
783    /// Returns `true` if senders belong to the same channel.
784    ///
785    /// # Examples
786    ///
787    /// ```
788    /// use tokio::sync::broadcast;
789    ///
790    /// #[tokio::main]
791    /// async fn main() {
792    ///     let (tx, _rx) = broadcast::channel::<()>(16);
793    ///     let tx2 = tx.clone();
794    ///
795    ///     assert!(tx.same_channel(&tx2));
796    ///
797    ///     let (tx3, _rx3) = broadcast::channel::<()>(16);
798    ///
799    ///     assert!(!tx3.same_channel(&tx2));
800    /// }
801    /// ```
802    pub fn same_channel(&self, other: &Self) -> bool {
803        Arc::ptr_eq(&self.shared, &other.shared)
804    }
805
806    fn close_channel(&self) {
807        let mut tail = self.shared.tail.lock();
808        tail.closed = true;
809
810        self.shared.notify_rx(tail);
811    }
812}
813
814/// Create a new `Receiver` which reads starting from the tail.
815fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
816    let mut tail = shared.tail.lock();
817
818    assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
819
820    tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
821
822    let next = tail.pos;
823
824    drop(tail);
825
826    Receiver { shared, next }
827}
828
829/// List used in `Shared::notify_rx`. It wraps a guarded linked list
830/// and gates the access to it on the `Shared.tail` mutex. It also empties
831/// the list on drop.
832struct WaitersList<'a, T> {
833    list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
834    is_empty: bool,
835    shared: &'a Shared<T>,
836}
837
838impl<'a, T> Drop for WaitersList<'a, T> {
839    fn drop(&mut self) {
840        // If the list is not empty, we unlink all waiters from it.
841        // We do not wake the waiters to avoid double panics.
842        if !self.is_empty {
843            let _lock_guard = self.shared.tail.lock();
844            while self.list.pop_back().is_some() {}
845        }
846    }
847}
848
849impl<'a, T> WaitersList<'a, T> {
850    fn new(
851        unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
852        guard: Pin<&'a Waiter>,
853        shared: &'a Shared<T>,
854    ) -> Self {
855        let guard_ptr = NonNull::from(guard.get_ref());
856        let list = unguarded_list.into_guarded(guard_ptr);
857        WaitersList {
858            list,
859            is_empty: false,
860            shared,
861        }
862    }
863
864    /// Removes the last element from the guarded list. Modifying this list
865    /// requires an exclusive access to the main list in `Notify`.
866    fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
867        let result = self.list.pop_back();
868        if result.is_none() {
869            // Save information about emptiness to avoid waiting for lock
870            // in the destructor.
871            self.is_empty = true;
872        }
873        result
874    }
875}
876
877impl<T> Shared<T> {
878    fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
879        // It is critical for `GuardedLinkedList` safety that the guard node is
880        // pinned in memory and is not dropped until the guarded list is dropped.
881        let guard = Waiter::new();
882        pin!(guard);
883
884        // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
885        // underneath to allow every waiter to safely remove itself from it.
886        //
887        // * This list will be still guarded by the `waiters` lock.
888        //   `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
889        // * This wrapper will empty the list on drop. It is critical for safety
890        //   that we will not leave any list entry with a pointer to the local
891        //   guard node after this function returns / panics.
892        let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
893
894        let mut wakers = WakeList::new();
895        'outer: loop {
896            while wakers.can_push() {
897                match list.pop_back_locked(&mut tail) {
898                    Some(waiter) => {
899                        unsafe {
900                            // Safety: accessing `waker` is safe because
901                            // the tail lock is held.
902                            if let Some(waker) = (*waiter.as_ptr()).waker.take() {
903                                wakers.push(waker);
904                            }
905
906                            // Safety: `queued` is atomic.
907                            let queued = &(*waiter.as_ptr()).queued;
908                            // `Relaxed` suffices because the tail lock is held.
909                            assert!(queued.load(Relaxed));
910                            // `Release` is needed to synchronize with `Recv::drop`.
911                            // It is critical to set this variable **after** waker
912                            // is extracted, otherwise we may data race with `Recv::drop`.
913                            queued.store(false, Release);
914                        }
915                    }
916                    None => {
917                        break 'outer;
918                    }
919                }
920            }
921
922            // Release the lock before waking.
923            drop(tail);
924
925            // Before we acquire the lock again all sorts of things can happen:
926            // some waiters may remove themselves from the list and new waiters
927            // may be added. This is fine since at worst we will unnecessarily
928            // wake up waiters which will then queue themselves again.
929
930            wakers.wake_all();
931
932            // Acquire the lock again.
933            tail = self.tail.lock();
934        }
935
936        // Release the lock before waking.
937        drop(tail);
938
939        wakers.wake_all();
940    }
941}
942
943impl<T> Clone for Sender<T> {
944    fn clone(&self) -> Sender<T> {
945        let shared = self.shared.clone();
946        shared.num_tx.fetch_add(1, SeqCst);
947
948        Sender { shared }
949    }
950}
951
952impl<T> Drop for Sender<T> {
953    fn drop(&mut self) {
954        if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
955            self.close_channel();
956        }
957    }
958}
959
960impl<T> Receiver<T> {
961    /// Returns the number of messages that were sent into the channel and that
962    /// this [`Receiver`] has yet to receive.
963    ///
964    /// If the returned value from `len` is larger than the next largest power of 2
965    /// of the capacity of the channel any call to [`recv`] will return an
966    /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
967    /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
968    /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
969    /// values larger than 16.
970    ///
971    /// [`Receiver`]: crate::sync::broadcast::Receiver
972    /// [`recv`]: crate::sync::broadcast::Receiver::recv
973    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
974    ///
975    /// # Examples
976    ///
977    /// ```
978    /// use tokio::sync::broadcast;
979    ///
980    /// #[tokio::main]
981    /// async fn main() {
982    ///     let (tx, mut rx1) = broadcast::channel(16);
983    ///
984    ///     tx.send(10).unwrap();
985    ///     tx.send(20).unwrap();
986    ///
987    ///     assert_eq!(rx1.len(), 2);
988    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
989    ///     assert_eq!(rx1.len(), 1);
990    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
991    ///     assert_eq!(rx1.len(), 0);
992    /// }
993    /// ```
994    pub fn len(&self) -> usize {
995        let next_send_pos = self.shared.tail.lock().pos;
996        (next_send_pos - self.next) as usize
997    }
998
999    /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1000    /// has yet to receive.
1001    ///
1002    /// [`Receiver]: create::sync::broadcast::Receiver
1003    ///
1004    /// # Examples
1005    ///
1006    /// ```
1007    /// use tokio::sync::broadcast;
1008    ///
1009    /// #[tokio::main]
1010    /// async fn main() {
1011    ///     let (tx, mut rx1) = broadcast::channel(16);
1012    ///
1013    ///     assert!(rx1.is_empty());
1014    ///
1015    ///     tx.send(10).unwrap();
1016    ///     tx.send(20).unwrap();
1017    ///
1018    ///     assert!(!rx1.is_empty());
1019    ///     assert_eq!(rx1.recv().await.unwrap(), 10);
1020    ///     assert_eq!(rx1.recv().await.unwrap(), 20);
1021    ///     assert!(rx1.is_empty());
1022    /// }
1023    /// ```
1024    pub fn is_empty(&self) -> bool {
1025        self.len() == 0
1026    }
1027
1028    /// Returns `true` if receivers belong to the same channel.
1029    ///
1030    /// # Examples
1031    ///
1032    /// ```
1033    /// use tokio::sync::broadcast;
1034    ///
1035    /// #[tokio::main]
1036    /// async fn main() {
1037    ///     let (tx, rx) = broadcast::channel::<()>(16);
1038    ///     let rx2 = tx.subscribe();
1039    ///
1040    ///     assert!(rx.same_channel(&rx2));
1041    ///
1042    ///     let (_tx3, rx3) = broadcast::channel::<()>(16);
1043    ///
1044    ///     assert!(!rx3.same_channel(&rx2));
1045    /// }
1046    /// ```
1047    pub fn same_channel(&self, other: &Self) -> bool {
1048        Arc::ptr_eq(&self.shared, &other.shared)
1049    }
1050
1051    /// Locks the next value if there is one.
1052    fn recv_ref(
1053        &mut self,
1054        waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1055    ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1056        let idx = (self.next & self.shared.mask as u64) as usize;
1057
1058        // The slot holding the next value to read
1059        let mut slot = self.shared.buffer[idx].lock();
1060
1061        if slot.pos != self.next {
1062            // Release the `slot` lock before attempting to acquire the `tail`
1063            // lock. This is required because `send2` acquires the tail lock
1064            // first followed by the slot lock. Acquiring the locks in reverse
1065            // order here would result in a potential deadlock: `recv_ref`
1066            // acquires the `slot` lock and attempts to acquire the `tail` lock
1067            // while `send2` acquired the `tail` lock and attempts to acquire
1068            // the slot lock.
1069            drop(slot);
1070
1071            let mut old_waker = None;
1072
1073            let mut tail = self.shared.tail.lock();
1074
1075            // Acquire slot lock again
1076            slot = self.shared.buffer[idx].lock();
1077
1078            // Make sure the position did not change. This could happen in the
1079            // unlikely event that the buffer is wrapped between dropping the
1080            // read lock and acquiring the tail lock.
1081            if slot.pos != self.next {
1082                let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1083
1084                if next_pos == self.next {
1085                    // At this point the channel is empty for *this* receiver. If
1086                    // it's been closed, then that's what we return, otherwise we
1087                    // set a waker and return empty.
1088                    if tail.closed {
1089                        return Err(TryRecvError::Closed);
1090                    }
1091
1092                    // Store the waker
1093                    if let Some((waiter, waker)) = waiter {
1094                        // Safety: called while locked.
1095                        unsafe {
1096                            // Only queue if not already queued
1097                            waiter.with_mut(|ptr| {
1098                                // If there is no waker **or** if the currently
1099                                // stored waker references a **different** task,
1100                                // track the tasks' waker to be notified on
1101                                // receipt of a new value.
1102                                match (*ptr).waker {
1103                                    Some(ref w) if w.will_wake(waker) => {}
1104                                    _ => {
1105                                        old_waker = std::mem::replace(
1106                                            &mut (*ptr).waker,
1107                                            Some(waker.clone()),
1108                                        );
1109                                    }
1110                                }
1111
1112                                // If the waiter is not already queued, enqueue it.
1113                                // `Relaxed` order suffices: we have synchronized with
1114                                // all writers through the tail lock that we hold.
1115                                if !(*ptr).queued.load(Relaxed) {
1116                                    // `Relaxed` order suffices: all the readers will
1117                                    // synchronize with this write through the tail lock.
1118                                    (*ptr).queued.store(true, Relaxed);
1119                                    tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1120                                }
1121                            });
1122                        }
1123                    }
1124
1125                    // Drop the old waker after releasing the locks.
1126                    drop(slot);
1127                    drop(tail);
1128                    drop(old_waker);
1129
1130                    return Err(TryRecvError::Empty);
1131                }
1132
1133                // At this point, the receiver has lagged behind the sender by
1134                // more than the channel capacity. The receiver will attempt to
1135                // catch up by skipping dropped messages and setting the
1136                // internal cursor to the **oldest** message stored by the
1137                // channel.
1138                let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1139
1140                let missed = next.wrapping_sub(self.next);
1141
1142                drop(tail);
1143
1144                // The receiver is slow but no values have been missed
1145                if missed == 0 {
1146                    self.next = self.next.wrapping_add(1);
1147
1148                    return Ok(RecvGuard { slot });
1149                }
1150
1151                self.next = next;
1152
1153                return Err(TryRecvError::Lagged(missed));
1154            }
1155        }
1156
1157        self.next = self.next.wrapping_add(1);
1158
1159        Ok(RecvGuard { slot })
1160    }
1161}
1162
1163impl<T: Clone> Receiver<T> {
1164    /// Re-subscribes to the channel starting from the current tail element.
1165    ///
1166    /// This [`Receiver`] handle will receive a clone of all values sent
1167    /// **after** it has resubscribed. This will not include elements that are
1168    /// in the queue of the current receiver. Consider the following example.
1169    ///
1170    /// # Examples
1171    ///
1172    /// ```
1173    /// use tokio::sync::broadcast;
1174    ///
1175    /// #[tokio::main]
1176    /// async fn main() {
1177    ///   let (tx, mut rx) = broadcast::channel(2);
1178    ///
1179    ///   tx.send(1).unwrap();
1180    ///   let mut rx2 = rx.resubscribe();
1181    ///   tx.send(2).unwrap();
1182    ///
1183    ///   assert_eq!(rx2.recv().await.unwrap(), 2);
1184    ///   assert_eq!(rx.recv().await.unwrap(), 1);
1185    /// }
1186    /// ```
1187    pub fn resubscribe(&self) -> Self {
1188        let shared = self.shared.clone();
1189        new_receiver(shared)
1190    }
1191    /// Receives the next value for this receiver.
1192    ///
1193    /// Each [`Receiver`] handle will receive a clone of all values sent
1194    /// **after** it has subscribed.
1195    ///
1196    /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1197    /// dropped, indicating that no further values can be sent on the channel.
1198    ///
1199    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1200    /// sent values will overwrite old values. At this point, a call to [`recv`]
1201    /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1202    /// internal cursor is updated to point to the oldest value still held by
1203    /// the channel. A subsequent call to [`recv`] will return this value
1204    /// **unless** it has been since overwritten.
1205    ///
1206    /// # Cancel safety
1207    ///
1208    /// This method is cancel safe. If `recv` is used as the event in a
1209    /// [`tokio::select!`](crate::select) statement and some other branch
1210    /// completes first, it is guaranteed that no messages were received on this
1211    /// channel.
1212    ///
1213    /// [`Receiver`]: crate::sync::broadcast::Receiver
1214    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1215    ///
1216    /// # Examples
1217    ///
1218    /// ```
1219    /// use tokio::sync::broadcast;
1220    ///
1221    /// #[tokio::main]
1222    /// async fn main() {
1223    ///     let (tx, mut rx1) = broadcast::channel(16);
1224    ///     let mut rx2 = tx.subscribe();
1225    ///
1226    ///     tokio::spawn(async move {
1227    ///         assert_eq!(rx1.recv().await.unwrap(), 10);
1228    ///         assert_eq!(rx1.recv().await.unwrap(), 20);
1229    ///     });
1230    ///
1231    ///     tokio::spawn(async move {
1232    ///         assert_eq!(rx2.recv().await.unwrap(), 10);
1233    ///         assert_eq!(rx2.recv().await.unwrap(), 20);
1234    ///     });
1235    ///
1236    ///     tx.send(10).unwrap();
1237    ///     tx.send(20).unwrap();
1238    /// }
1239    /// ```
1240    ///
1241    /// Handling lag
1242    ///
1243    /// ```
1244    /// use tokio::sync::broadcast;
1245    ///
1246    /// #[tokio::main]
1247    /// async fn main() {
1248    ///     let (tx, mut rx) = broadcast::channel(2);
1249    ///
1250    ///     tx.send(10).unwrap();
1251    ///     tx.send(20).unwrap();
1252    ///     tx.send(30).unwrap();
1253    ///
1254    ///     // The receiver lagged behind
1255    ///     assert!(rx.recv().await.is_err());
1256    ///
1257    ///     // At this point, we can abort or continue with lost messages
1258    ///
1259    ///     assert_eq!(20, rx.recv().await.unwrap());
1260    ///     assert_eq!(30, rx.recv().await.unwrap());
1261    /// }
1262    /// ```
1263    pub async fn recv(&mut self) -> Result<T, RecvError> {
1264        cooperative(Recv::new(self)).await
1265    }
1266
1267    /// Attempts to return a pending value on this receiver without awaiting.
1268    ///
1269    /// This is useful for a flavor of "optimistic check" before deciding to
1270    /// await on a receiver.
1271    ///
1272    /// Compared with [`recv`], this function has three failure cases instead of two
1273    /// (one for closed, one for an empty buffer, one for a lagging receiver).
1274    ///
1275    /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1276    /// dropped, indicating that no further values can be sent on the channel.
1277    ///
1278    /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1279    /// sent values will overwrite old values. At this point, a call to [`recv`]
1280    /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1281    /// internal cursor is updated to point to the oldest value still held by
1282    /// the channel. A subsequent call to [`try_recv`] will return this value
1283    /// **unless** it has been since overwritten. If there are no values to
1284    /// receive, `Err(TryRecvError::Empty)` is returned.
1285    ///
1286    /// [`recv`]: crate::sync::broadcast::Receiver::recv
1287    /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1288    /// [`Receiver`]: crate::sync::broadcast::Receiver
1289    ///
1290    /// # Examples
1291    ///
1292    /// ```
1293    /// use tokio::sync::broadcast;
1294    ///
1295    /// #[tokio::main]
1296    /// async fn main() {
1297    ///     let (tx, mut rx) = broadcast::channel(16);
1298    ///
1299    ///     assert!(rx.try_recv().is_err());
1300    ///
1301    ///     tx.send(10).unwrap();
1302    ///
1303    ///     let value = rx.try_recv().unwrap();
1304    ///     assert_eq!(10, value);
1305    /// }
1306    /// ```
1307    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1308        let guard = self.recv_ref(None)?;
1309        guard.clone_value().ok_or(TryRecvError::Closed)
1310    }
1311
1312    /// Blocking receive to call outside of asynchronous contexts.
1313    ///
1314    /// # Panics
1315    ///
1316    /// This function panics if called within an asynchronous execution
1317    /// context.
1318    ///
1319    /// # Examples
1320    /// ```
1321    /// use std::thread;
1322    /// use tokio::sync::broadcast;
1323    ///
1324    /// #[tokio::main]
1325    /// async fn main() {
1326    ///     let (tx, mut rx) = broadcast::channel(16);
1327    ///
1328    ///     let sync_code = thread::spawn(move || {
1329    ///         assert_eq!(rx.blocking_recv(), Ok(10));
1330    ///     });
1331    ///
1332    ///     let _ = tx.send(10);
1333    ///     sync_code.join().unwrap();
1334    /// }
1335    /// ```
1336    pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1337        crate::future::block_on(self.recv())
1338    }
1339}
1340
1341impl<T> Drop for Receiver<T> {
1342    fn drop(&mut self) {
1343        let mut tail = self.shared.tail.lock();
1344
1345        tail.rx_cnt -= 1;
1346        let until = tail.pos;
1347
1348        drop(tail);
1349
1350        while self.next < until {
1351            match self.recv_ref(None) {
1352                Ok(_) => {}
1353                // The channel is closed
1354                Err(TryRecvError::Closed) => break,
1355                // Ignore lagging, we will catch up
1356                Err(TryRecvError::Lagged(..)) => {}
1357                // Can't be empty
1358                Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1359            }
1360        }
1361    }
1362}
1363
1364impl<'a, T> Recv<'a, T> {
1365    fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1366        Recv {
1367            receiver,
1368            waiter: WaiterCell(UnsafeCell::new(Waiter {
1369                queued: AtomicBool::new(false),
1370                waker: None,
1371                pointers: linked_list::Pointers::new(),
1372                _p: PhantomPinned,
1373            })),
1374        }
1375    }
1376
1377    /// A custom `project` implementation is used in place of `pin-project-lite`
1378    /// as a custom drop implementation is needed.
1379    fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1380        unsafe {
1381            // Safety: Receiver is Unpin
1382            is_unpin::<&mut Receiver<T>>();
1383
1384            let me = self.get_unchecked_mut();
1385            (me.receiver, &me.waiter.0)
1386        }
1387    }
1388}
1389
1390impl<'a, T> Future for Recv<'a, T>
1391where
1392    T: Clone,
1393{
1394    type Output = Result<T, RecvError>;
1395
1396    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1397        ready!(crate::trace::trace_leaf(cx));
1398
1399        let (receiver, waiter) = self.project();
1400
1401        let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1402            Ok(value) => value,
1403            Err(TryRecvError::Empty) => return Poll::Pending,
1404            Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1405            Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1406        };
1407
1408        Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1409    }
1410}
1411
1412impl<'a, T> Drop for Recv<'a, T> {
1413    fn drop(&mut self) {
1414        // Safety: `waiter.queued` is atomic.
1415        // Acquire ordering is required to synchronize with
1416        // `Shared::notify_rx` before we drop the object.
1417        let queued = self
1418            .waiter
1419            .0
1420            .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1421
1422        // If the waiter is queued, we need to unlink it from the waiters list.
1423        // If not, no further synchronization is required, since the waiter
1424        // is not in the list and, as such, is not shared with any other threads.
1425        if queued {
1426            // Acquire the tail lock. This is required for safety before accessing
1427            // the waiter node.
1428            let mut tail = self.receiver.shared.tail.lock();
1429
1430            // Safety: tail lock is held.
1431            // `Relaxed` order suffices because we hold the tail lock.
1432            let queued = self
1433                .waiter
1434                .0
1435                .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1436
1437            if queued {
1438                // Remove the node
1439                //
1440                // safety: tail lock is held and the wait node is verified to be in
1441                // the list.
1442                unsafe {
1443                    self.waiter.0.with_mut(|ptr| {
1444                        tail.waiters.remove((&mut *ptr).into());
1445                    });
1446                }
1447            }
1448        }
1449    }
1450}
1451
1452/// # Safety
1453///
1454/// `Waiter` is forced to be !Unpin.
1455unsafe impl linked_list::Link for Waiter {
1456    type Handle = NonNull<Waiter>;
1457    type Target = Waiter;
1458
1459    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1460        *handle
1461    }
1462
1463    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1464        ptr
1465    }
1466
1467    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1468        Waiter::addr_of_pointers(target)
1469    }
1470}
1471
1472impl<T> fmt::Debug for Sender<T> {
1473    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1474        write!(fmt, "broadcast::Sender")
1475    }
1476}
1477
1478impl<T> fmt::Debug for Receiver<T> {
1479    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1480        write!(fmt, "broadcast::Receiver")
1481    }
1482}
1483
1484impl<'a, T> RecvGuard<'a, T> {
1485    fn clone_value(&self) -> Option<T>
1486    where
1487        T: Clone,
1488    {
1489        self.slot.val.clone()
1490    }
1491}
1492
1493impl<'a, T> Drop for RecvGuard<'a, T> {
1494    fn drop(&mut self) {
1495        // Decrement the remaining counter
1496        if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1497            self.slot.val = None;
1498        }
1499    }
1500}
1501
1502fn is_unpin<T: Unpin>() {}
1503
1504#[cfg(not(loom))]
1505#[cfg(test)]
1506mod tests {
1507    use super::*;
1508
1509    #[test]
1510    fn receiver_count_on_sender_constructor() {
1511        let sender = Sender::<i32>::new(16);
1512        assert_eq!(sender.receiver_count(), 0);
1513
1514        let rx_1 = sender.subscribe();
1515        assert_eq!(sender.receiver_count(), 1);
1516
1517        let rx_2 = rx_1.resubscribe();
1518        assert_eq!(sender.receiver_count(), 2);
1519
1520        let rx_3 = sender.subscribe();
1521        assert_eq!(sender.receiver_count(), 3);
1522
1523        drop(rx_3);
1524        drop(rx_1);
1525        assert_eq!(sender.receiver_count(), 1);
1526
1527        drop(rx_2);
1528        assert_eq!(sender.receiver_count(), 0);
1529    }
1530
1531    #[cfg(not(loom))]
1532    #[test]
1533    fn receiver_count_on_channel_constructor() {
1534        let (sender, rx) = channel::<i32>(16);
1535        assert_eq!(sender.receiver_count(), 1);
1536
1537        let _rx_2 = rx.resubscribe();
1538        assert_eq!(sender.receiver_count(), 2);
1539    }
1540}