tokio/sync/broadcast.rs
1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//! let (tx, mut rx1) = broadcast::channel(16);
79//! let mut rx2 = tx.subscribe();
80//!
81//! tokio::spawn(async move {
82//! assert_eq!(rx1.recv().await.unwrap(), 10);
83//! assert_eq!(rx1.recv().await.unwrap(), 20);
84//! });
85//!
86//! tokio::spawn(async move {
87//! assert_eq!(rx2.recv().await.unwrap(), 10);
88//! assert_eq!(rx2.recv().await.unwrap(), 20);
89//! });
90//!
91//! tx.send(10).unwrap();
92//! tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//! let (tx, mut rx) = broadcast::channel(2);
104//!
105//! tx.send(10).unwrap();
106//! tx.send(20).unwrap();
107//! tx.send(30).unwrap();
108//!
109//! // The receiver lagged behind
110//! assert!(rx.recv().await.is_err());
111//!
112//! // At this point, we can abort or continue with lost messages
113//!
114//! assert_eq!(20, rx.recv().await.unwrap());
115//! assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121use crate::loom::sync::{Arc, Mutex, MutexGuard};
122use crate::runtime::coop::cooperative;
123use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124use crate::util::WakeList;
125
126use std::fmt;
127use std::future::Future;
128use std::marker::PhantomPinned;
129use std::pin::Pin;
130use std::ptr::NonNull;
131use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
132use std::task::{ready, Context, Poll, Waker};
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146/// let (tx, mut rx1) = broadcast::channel(16);
147/// let mut rx2 = tx.subscribe();
148///
149/// tokio::spawn(async move {
150/// assert_eq!(rx1.recv().await.unwrap(), 10);
151/// assert_eq!(rx1.recv().await.unwrap(), 20);
152/// });
153///
154/// tokio::spawn(async move {
155/// assert_eq!(rx2.recv().await.unwrap(), 10);
156/// assert_eq!(rx2.recv().await.unwrap(), 20);
157/// });
158///
159/// tx.send(10).unwrap();
160/// tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166 shared: Arc<Shared<T>>,
167}
168
169/// Receiving-half of the [`broadcast`] channel.
170///
171/// Must not be used concurrently. Messages may be retrieved using
172/// [`recv`][Receiver::recv].
173///
174/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175/// wrapper.
176///
177/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178///
179/// # Examples
180///
181/// ```
182/// use tokio::sync::broadcast;
183///
184/// #[tokio::main]
185/// async fn main() {
186/// let (tx, mut rx1) = broadcast::channel(16);
187/// let mut rx2 = tx.subscribe();
188///
189/// tokio::spawn(async move {
190/// assert_eq!(rx1.recv().await.unwrap(), 10);
191/// assert_eq!(rx1.recv().await.unwrap(), 20);
192/// });
193///
194/// tokio::spawn(async move {
195/// assert_eq!(rx2.recv().await.unwrap(), 10);
196/// assert_eq!(rx2.recv().await.unwrap(), 20);
197/// });
198///
199/// tx.send(10).unwrap();
200/// tx.send(20).unwrap();
201/// }
202/// ```
203///
204/// [`broadcast`]: crate::sync::broadcast
205pub struct Receiver<T> {
206 /// State shared with all receivers and senders.
207 shared: Arc<Shared<T>>,
208
209 /// Next position to read from
210 next: u64,
211}
212
213pub mod error {
214 //! Broadcast error types
215
216 use std::fmt;
217
218 /// Error returned by the [`send`] function on a [`Sender`].
219 ///
220 /// A **send** operation can only fail if there are no active receivers,
221 /// implying that the message could never be received. The error contains the
222 /// message being sent as a payload so it can be recovered.
223 ///
224 /// [`send`]: crate::sync::broadcast::Sender::send
225 /// [`Sender`]: crate::sync::broadcast::Sender
226 #[derive(Debug)]
227 pub struct SendError<T>(pub T);
228
229 impl<T> fmt::Display for SendError<T> {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 write!(f, "channel closed")
232 }
233 }
234
235 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236
237 /// An error returned from the [`recv`] function on a [`Receiver`].
238 ///
239 /// [`recv`]: crate::sync::broadcast::Receiver::recv
240 /// [`Receiver`]: crate::sync::broadcast::Receiver
241 #[derive(Debug, PartialEq, Eq, Clone)]
242 pub enum RecvError {
243 /// There are no more active senders implying no further messages will ever
244 /// be sent.
245 Closed,
246
247 /// The receiver lagged too far behind. Attempting to receive again will
248 /// return the oldest message still retained by the channel.
249 ///
250 /// Includes the number of skipped messages.
251 Lagged(u64),
252 }
253
254 impl fmt::Display for RecvError {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 RecvError::Closed => write!(f, "channel closed"),
258 RecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
259 }
260 }
261 }
262
263 impl std::error::Error for RecvError {}
264
265 /// An error returned from the [`try_recv`] function on a [`Receiver`].
266 ///
267 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268 /// [`Receiver`]: crate::sync::broadcast::Receiver
269 #[derive(Debug, PartialEq, Eq, Clone)]
270 pub enum TryRecvError {
271 /// The channel is currently empty. There are still active
272 /// [`Sender`] handles, so data may yet become available.
273 ///
274 /// [`Sender`]: crate::sync::broadcast::Sender
275 Empty,
276
277 /// There are no more active senders implying no further messages will ever
278 /// be sent.
279 Closed,
280
281 /// The receiver lagged too far behind and has been forcibly disconnected.
282 /// Attempting to receive again will return the oldest message still
283 /// retained by the channel.
284 ///
285 /// Includes the number of skipped messages.
286 Lagged(u64),
287 }
288
289 impl fmt::Display for TryRecvError {
290 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291 match self {
292 TryRecvError::Empty => write!(f, "channel empty"),
293 TryRecvError::Closed => write!(f, "channel closed"),
294 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {amt}"),
295 }
296 }
297 }
298
299 impl std::error::Error for TryRecvError {}
300}
301
302use self::error::{RecvError, SendError, TryRecvError};
303
304/// Data shared between senders and receivers.
305struct Shared<T> {
306 /// slots in the channel.
307 buffer: Box<[Mutex<Slot<T>>]>,
308
309 /// Mask a position -> index.
310 mask: usize,
311
312 /// Tail of the queue. Includes the rx wait list.
313 tail: Mutex<Tail>,
314
315 /// Number of outstanding Sender handles.
316 num_tx: AtomicUsize,
317}
318
319/// Next position to write a value.
320struct Tail {
321 /// Next position to write to.
322 pos: u64,
323
324 /// Number of active receivers.
325 rx_cnt: usize,
326
327 /// True if the channel is closed.
328 closed: bool,
329
330 /// Receivers waiting for a value.
331 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
332}
333
334/// Slot in the buffer.
335struct Slot<T> {
336 /// Remaining number of receivers that are expected to see this value.
337 ///
338 /// When this goes to zero, the value is released.
339 ///
340 /// An atomic is used as it is mutated concurrently with the slot read lock
341 /// acquired.
342 rem: AtomicUsize,
343
344 /// Uniquely identifies the `send` stored in the slot.
345 pos: u64,
346
347 /// The value being broadcast.
348 ///
349 /// The value is set by `send` when the write lock is held. When a reader
350 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
351 val: Option<T>,
352}
353
354/// An entry in the wait queue.
355struct Waiter {
356 /// True if queued.
357 queued: AtomicBool,
358
359 /// Task waiting on the broadcast channel.
360 waker: Option<Waker>,
361
362 /// Intrusive linked-list pointers.
363 pointers: linked_list::Pointers<Waiter>,
364
365 /// Should not be `Unpin`.
366 _p: PhantomPinned,
367}
368
369impl Waiter {
370 fn new() -> Self {
371 Self {
372 queued: AtomicBool::new(false),
373 waker: None,
374 pointers: linked_list::Pointers::new(),
375 _p: PhantomPinned,
376 }
377 }
378}
379
380generate_addr_of_methods! {
381 impl<> Waiter {
382 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
383 &self.pointers
384 }
385 }
386}
387
388struct RecvGuard<'a, T> {
389 slot: MutexGuard<'a, Slot<T>>,
390}
391
392/// Receive a value future.
393struct Recv<'a, T> {
394 /// Receiver being waited on.
395 receiver: &'a mut Receiver<T>,
396
397 /// Entry in the waiter `LinkedList`.
398 waiter: WaiterCell,
399}
400
401// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
402// from `Recv`.
403struct WaiterCell(UnsafeCell<Waiter>);
404
405unsafe impl Send for WaiterCell {}
406unsafe impl Sync for WaiterCell {}
407
408/// Max number of receivers. Reserve space to lock.
409const MAX_RECEIVERS: usize = usize::MAX >> 2;
410
411/// Create a bounded, multi-producer, multi-consumer channel where each sent
412/// value is broadcasted to all active receivers.
413///
414/// **Note:** The actual capacity may be greater than the provided `capacity`.
415///
416/// All data sent on [`Sender`] will become available on every active
417/// [`Receiver`] in the same order as it was sent.
418///
419/// The `Sender` can be cloned to `send` to the same channel from multiple
420/// points in the process or it can be used concurrently from an `Arc`. New
421/// `Receiver` handles are created by calling [`Sender::subscribe`].
422///
423/// If all [`Receiver`] handles are dropped, the `send` method will return a
424/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
425/// method will return a [`RecvError`].
426///
427/// [`Sender`]: crate::sync::broadcast::Sender
428/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
429/// [`Receiver`]: crate::sync::broadcast::Receiver
430/// [`recv`]: crate::sync::broadcast::Receiver::recv
431/// [`SendError`]: crate::sync::broadcast::error::SendError
432/// [`RecvError`]: crate::sync::broadcast::error::RecvError
433///
434/// # Examples
435///
436/// ```
437/// use tokio::sync::broadcast;
438///
439/// #[tokio::main]
440/// async fn main() {
441/// let (tx, mut rx1) = broadcast::channel(16);
442/// let mut rx2 = tx.subscribe();
443///
444/// tokio::spawn(async move {
445/// assert_eq!(rx1.recv().await.unwrap(), 10);
446/// assert_eq!(rx1.recv().await.unwrap(), 20);
447/// });
448///
449/// tokio::spawn(async move {
450/// assert_eq!(rx2.recv().await.unwrap(), 10);
451/// assert_eq!(rx2.recv().await.unwrap(), 20);
452/// });
453///
454/// tx.send(10).unwrap();
455/// tx.send(20).unwrap();
456/// }
457/// ```
458///
459/// # Panics
460///
461/// This will panic if `capacity` is equal to `0` or larger
462/// than `usize::MAX / 2`.
463#[track_caller]
464pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
465 // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
466 let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
467 let rx = Receiver {
468 shared: tx.shared.clone(),
469 next: 0,
470 };
471 (tx, rx)
472}
473
474impl<T> Sender<T> {
475 /// Creates the sending-half of the [`broadcast`] channel.
476 ///
477 /// See the documentation of [`broadcast::channel`] for more information on this method.
478 ///
479 /// [`broadcast`]: crate::sync::broadcast
480 /// [`broadcast::channel`]: crate::sync::broadcast::channel
481 #[track_caller]
482 pub fn new(capacity: usize) -> Self {
483 // SAFETY: We don't create extra receivers, so there are 0.
484 unsafe { Self::new_with_receiver_count(0, capacity) }
485 }
486
487 /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
488 /// count.
489 ///
490 /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
491 /// calling this function.
492 ///
493 /// # Safety:
494 ///
495 /// The caller must ensure that the amount of receivers for this Sender is correct before
496 /// the channel functionalities are used, the count is zero by default, as this function
497 /// does not create any receivers by itself.
498 #[track_caller]
499 unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
500 assert!(capacity > 0, "broadcast channel capacity cannot be zero");
501 assert!(
502 capacity <= usize::MAX >> 1,
503 "broadcast channel capacity exceeded `usize::MAX / 2`"
504 );
505
506 // Round to a power of two
507 capacity = capacity.next_power_of_two();
508
509 let mut buffer = Vec::with_capacity(capacity);
510
511 for i in 0..capacity {
512 buffer.push(Mutex::new(Slot {
513 rem: AtomicUsize::new(0),
514 pos: (i as u64).wrapping_sub(capacity as u64),
515 val: None,
516 }));
517 }
518
519 let shared = Arc::new(Shared {
520 buffer: buffer.into_boxed_slice(),
521 mask: capacity - 1,
522 tail: Mutex::new(Tail {
523 pos: 0,
524 rx_cnt: receiver_count,
525 closed: false,
526 waiters: LinkedList::new(),
527 }),
528 num_tx: AtomicUsize::new(1),
529 });
530
531 Sender { shared }
532 }
533
534 /// Attempts to send a value to all active [`Receiver`] handles, returning
535 /// it back if it could not be sent.
536 ///
537 /// A successful send occurs when there is at least one active [`Receiver`]
538 /// handle. An unsuccessful send would be one where all associated
539 /// [`Receiver`] handles have already been dropped.
540 ///
541 /// # Return
542 ///
543 /// On success, the number of subscribed [`Receiver`] handles is returned.
544 /// This does not mean that this number of receivers will see the message as
545 /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
546 /// the message.
547 ///
548 /// # Note
549 ///
550 /// A return value of `Ok` **does not** mean that the sent value will be
551 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
552 /// handles may be dropped before receiving the sent message.
553 ///
554 /// A return value of `Err` **does not** mean that future calls to `send`
555 /// will fail. New [`Receiver`] handles may be created by calling
556 /// [`subscribe`].
557 ///
558 /// [`Receiver`]: crate::sync::broadcast::Receiver
559 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
560 ///
561 /// # Examples
562 ///
563 /// ```
564 /// use tokio::sync::broadcast;
565 ///
566 /// #[tokio::main]
567 /// async fn main() {
568 /// let (tx, mut rx1) = broadcast::channel(16);
569 /// let mut rx2 = tx.subscribe();
570 ///
571 /// tokio::spawn(async move {
572 /// assert_eq!(rx1.recv().await.unwrap(), 10);
573 /// assert_eq!(rx1.recv().await.unwrap(), 20);
574 /// });
575 ///
576 /// tokio::spawn(async move {
577 /// assert_eq!(rx2.recv().await.unwrap(), 10);
578 /// assert_eq!(rx2.recv().await.unwrap(), 20);
579 /// });
580 ///
581 /// tx.send(10).unwrap();
582 /// tx.send(20).unwrap();
583 /// }
584 /// ```
585 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
586 let mut tail = self.shared.tail.lock();
587
588 if tail.rx_cnt == 0 {
589 return Err(SendError(value));
590 }
591
592 // Position to write into
593 let pos = tail.pos;
594 let rem = tail.rx_cnt;
595 let idx = (pos & self.shared.mask as u64) as usize;
596
597 // Update the tail position
598 tail.pos = tail.pos.wrapping_add(1);
599
600 // Get the slot
601 let mut slot = self.shared.buffer[idx].lock();
602
603 // Track the position
604 slot.pos = pos;
605
606 // Set remaining receivers
607 slot.rem.with_mut(|v| *v = rem);
608
609 // Write the value
610 slot.val = Some(value);
611
612 // Release the slot lock before notifying the receivers.
613 drop(slot);
614
615 // Notify and release the mutex. This must happen after the slot lock is
616 // released, otherwise the writer lock bit could be cleared while another
617 // thread is in the critical section.
618 self.shared.notify_rx(tail);
619
620 Ok(rem)
621 }
622
623 /// Creates a new [`Receiver`] handle that will receive values sent **after**
624 /// this call to `subscribe`.
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// use tokio::sync::broadcast;
630 ///
631 /// #[tokio::main]
632 /// async fn main() {
633 /// let (tx, _rx) = broadcast::channel(16);
634 ///
635 /// // Will not be seen
636 /// tx.send(10).unwrap();
637 ///
638 /// let mut rx = tx.subscribe();
639 ///
640 /// tx.send(20).unwrap();
641 ///
642 /// let value = rx.recv().await.unwrap();
643 /// assert_eq!(20, value);
644 /// }
645 /// ```
646 pub fn subscribe(&self) -> Receiver<T> {
647 let shared = self.shared.clone();
648 new_receiver(shared)
649 }
650
651 /// Returns the number of queued values.
652 ///
653 /// A value is queued until it has either been seen by all receivers that were alive at the time
654 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
655 /// queue's capacity.
656 ///
657 /// # Note
658 ///
659 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
660 /// have been evicted from the queue before being seen by all receivers.
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// use tokio::sync::broadcast;
666 ///
667 /// #[tokio::main]
668 /// async fn main() {
669 /// let (tx, mut rx1) = broadcast::channel(16);
670 /// let mut rx2 = tx.subscribe();
671 ///
672 /// tx.send(10).unwrap();
673 /// tx.send(20).unwrap();
674 /// tx.send(30).unwrap();
675 ///
676 /// assert_eq!(tx.len(), 3);
677 ///
678 /// rx1.recv().await.unwrap();
679 ///
680 /// // The len is still 3 since rx2 hasn't seen the first value yet.
681 /// assert_eq!(tx.len(), 3);
682 ///
683 /// rx2.recv().await.unwrap();
684 ///
685 /// assert_eq!(tx.len(), 2);
686 /// }
687 /// ```
688 pub fn len(&self) -> usize {
689 let tail = self.shared.tail.lock();
690
691 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
692 let mut low = 0;
693 let mut high = self.shared.buffer.len();
694 while low < high {
695 let mid = low + (high - low) / 2;
696 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
697 if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
698 low = mid + 1;
699 } else {
700 high = mid;
701 }
702 }
703
704 self.shared.buffer.len() - low
705 }
706
707 /// Returns true if there are no queued values.
708 ///
709 /// # Examples
710 ///
711 /// ```
712 /// use tokio::sync::broadcast;
713 ///
714 /// #[tokio::main]
715 /// async fn main() {
716 /// let (tx, mut rx1) = broadcast::channel(16);
717 /// let mut rx2 = tx.subscribe();
718 ///
719 /// assert!(tx.is_empty());
720 ///
721 /// tx.send(10).unwrap();
722 ///
723 /// assert!(!tx.is_empty());
724 ///
725 /// rx1.recv().await.unwrap();
726 ///
727 /// // The queue is still not empty since rx2 hasn't seen the value.
728 /// assert!(!tx.is_empty());
729 ///
730 /// rx2.recv().await.unwrap();
731 ///
732 /// assert!(tx.is_empty());
733 /// }
734 /// ```
735 pub fn is_empty(&self) -> bool {
736 let tail = self.shared.tail.lock();
737
738 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
739 self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
740 }
741
742 /// Returns the number of active receivers.
743 ///
744 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
745 /// [`subscribe`]. These are the handles that will receive values sent on
746 /// this [`Sender`].
747 ///
748 /// # Note
749 ///
750 /// It is not guaranteed that a sent message will reach this number of
751 /// receivers. Active receivers may never call [`recv`] again before
752 /// dropping.
753 ///
754 /// [`recv`]: crate::sync::broadcast::Receiver::recv
755 /// [`Receiver`]: crate::sync::broadcast::Receiver
756 /// [`Sender`]: crate::sync::broadcast::Sender
757 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
758 /// [`channel`]: crate::sync::broadcast::channel
759 ///
760 /// # Examples
761 ///
762 /// ```
763 /// use tokio::sync::broadcast;
764 ///
765 /// #[tokio::main]
766 /// async fn main() {
767 /// let (tx, _rx1) = broadcast::channel(16);
768 ///
769 /// assert_eq!(1, tx.receiver_count());
770 ///
771 /// let mut _rx2 = tx.subscribe();
772 ///
773 /// assert_eq!(2, tx.receiver_count());
774 ///
775 /// tx.send(10).unwrap();
776 /// }
777 /// ```
778 pub fn receiver_count(&self) -> usize {
779 let tail = self.shared.tail.lock();
780 tail.rx_cnt
781 }
782
783 /// Returns `true` if senders belong to the same channel.
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// use tokio::sync::broadcast;
789 ///
790 /// #[tokio::main]
791 /// async fn main() {
792 /// let (tx, _rx) = broadcast::channel::<()>(16);
793 /// let tx2 = tx.clone();
794 ///
795 /// assert!(tx.same_channel(&tx2));
796 ///
797 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
798 ///
799 /// assert!(!tx3.same_channel(&tx2));
800 /// }
801 /// ```
802 pub fn same_channel(&self, other: &Self) -> bool {
803 Arc::ptr_eq(&self.shared, &other.shared)
804 }
805
806 fn close_channel(&self) {
807 let mut tail = self.shared.tail.lock();
808 tail.closed = true;
809
810 self.shared.notify_rx(tail);
811 }
812}
813
814/// Create a new `Receiver` which reads starting from the tail.
815fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
816 let mut tail = shared.tail.lock();
817
818 assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
819
820 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
821
822 let next = tail.pos;
823
824 drop(tail);
825
826 Receiver { shared, next }
827}
828
829/// List used in `Shared::notify_rx`. It wraps a guarded linked list
830/// and gates the access to it on the `Shared.tail` mutex. It also empties
831/// the list on drop.
832struct WaitersList<'a, T> {
833 list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
834 is_empty: bool,
835 shared: &'a Shared<T>,
836}
837
838impl<'a, T> Drop for WaitersList<'a, T> {
839 fn drop(&mut self) {
840 // If the list is not empty, we unlink all waiters from it.
841 // We do not wake the waiters to avoid double panics.
842 if !self.is_empty {
843 let _lock_guard = self.shared.tail.lock();
844 while self.list.pop_back().is_some() {}
845 }
846 }
847}
848
849impl<'a, T> WaitersList<'a, T> {
850 fn new(
851 unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
852 guard: Pin<&'a Waiter>,
853 shared: &'a Shared<T>,
854 ) -> Self {
855 let guard_ptr = NonNull::from(guard.get_ref());
856 let list = unguarded_list.into_guarded(guard_ptr);
857 WaitersList {
858 list,
859 is_empty: false,
860 shared,
861 }
862 }
863
864 /// Removes the last element from the guarded list. Modifying this list
865 /// requires an exclusive access to the main list in `Notify`.
866 fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
867 let result = self.list.pop_back();
868 if result.is_none() {
869 // Save information about emptiness to avoid waiting for lock
870 // in the destructor.
871 self.is_empty = true;
872 }
873 result
874 }
875}
876
877impl<T> Shared<T> {
878 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
879 // It is critical for `GuardedLinkedList` safety that the guard node is
880 // pinned in memory and is not dropped until the guarded list is dropped.
881 let guard = Waiter::new();
882 pin!(guard);
883
884 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
885 // underneath to allow every waiter to safely remove itself from it.
886 //
887 // * This list will be still guarded by the `waiters` lock.
888 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
889 // * This wrapper will empty the list on drop. It is critical for safety
890 // that we will not leave any list entry with a pointer to the local
891 // guard node after this function returns / panics.
892 let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
893
894 let mut wakers = WakeList::new();
895 'outer: loop {
896 while wakers.can_push() {
897 match list.pop_back_locked(&mut tail) {
898 Some(waiter) => {
899 unsafe {
900 // Safety: accessing `waker` is safe because
901 // the tail lock is held.
902 if let Some(waker) = (*waiter.as_ptr()).waker.take() {
903 wakers.push(waker);
904 }
905
906 // Safety: `queued` is atomic.
907 let queued = &(*waiter.as_ptr()).queued;
908 // `Relaxed` suffices because the tail lock is held.
909 assert!(queued.load(Relaxed));
910 // `Release` is needed to synchronize with `Recv::drop`.
911 // It is critical to set this variable **after** waker
912 // is extracted, otherwise we may data race with `Recv::drop`.
913 queued.store(false, Release);
914 }
915 }
916 None => {
917 break 'outer;
918 }
919 }
920 }
921
922 // Release the lock before waking.
923 drop(tail);
924
925 // Before we acquire the lock again all sorts of things can happen:
926 // some waiters may remove themselves from the list and new waiters
927 // may be added. This is fine since at worst we will unnecessarily
928 // wake up waiters which will then queue themselves again.
929
930 wakers.wake_all();
931
932 // Acquire the lock again.
933 tail = self.tail.lock();
934 }
935
936 // Release the lock before waking.
937 drop(tail);
938
939 wakers.wake_all();
940 }
941}
942
943impl<T> Clone for Sender<T> {
944 fn clone(&self) -> Sender<T> {
945 let shared = self.shared.clone();
946 shared.num_tx.fetch_add(1, SeqCst);
947
948 Sender { shared }
949 }
950}
951
952impl<T> Drop for Sender<T> {
953 fn drop(&mut self) {
954 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
955 self.close_channel();
956 }
957 }
958}
959
960impl<T> Receiver<T> {
961 /// Returns the number of messages that were sent into the channel and that
962 /// this [`Receiver`] has yet to receive.
963 ///
964 /// If the returned value from `len` is larger than the next largest power of 2
965 /// of the capacity of the channel any call to [`recv`] will return an
966 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
967 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
968 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
969 /// values larger than 16.
970 ///
971 /// [`Receiver`]: crate::sync::broadcast::Receiver
972 /// [`recv`]: crate::sync::broadcast::Receiver::recv
973 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
974 ///
975 /// # Examples
976 ///
977 /// ```
978 /// use tokio::sync::broadcast;
979 ///
980 /// #[tokio::main]
981 /// async fn main() {
982 /// let (tx, mut rx1) = broadcast::channel(16);
983 ///
984 /// tx.send(10).unwrap();
985 /// tx.send(20).unwrap();
986 ///
987 /// assert_eq!(rx1.len(), 2);
988 /// assert_eq!(rx1.recv().await.unwrap(), 10);
989 /// assert_eq!(rx1.len(), 1);
990 /// assert_eq!(rx1.recv().await.unwrap(), 20);
991 /// assert_eq!(rx1.len(), 0);
992 /// }
993 /// ```
994 pub fn len(&self) -> usize {
995 let next_send_pos = self.shared.tail.lock().pos;
996 (next_send_pos - self.next) as usize
997 }
998
999 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
1000 /// has yet to receive.
1001 ///
1002 /// [`Receiver]: create::sync::broadcast::Receiver
1003 ///
1004 /// # Examples
1005 ///
1006 /// ```
1007 /// use tokio::sync::broadcast;
1008 ///
1009 /// #[tokio::main]
1010 /// async fn main() {
1011 /// let (tx, mut rx1) = broadcast::channel(16);
1012 ///
1013 /// assert!(rx1.is_empty());
1014 ///
1015 /// tx.send(10).unwrap();
1016 /// tx.send(20).unwrap();
1017 ///
1018 /// assert!(!rx1.is_empty());
1019 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1020 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1021 /// assert!(rx1.is_empty());
1022 /// }
1023 /// ```
1024 pub fn is_empty(&self) -> bool {
1025 self.len() == 0
1026 }
1027
1028 /// Returns `true` if receivers belong to the same channel.
1029 ///
1030 /// # Examples
1031 ///
1032 /// ```
1033 /// use tokio::sync::broadcast;
1034 ///
1035 /// #[tokio::main]
1036 /// async fn main() {
1037 /// let (tx, rx) = broadcast::channel::<()>(16);
1038 /// let rx2 = tx.subscribe();
1039 ///
1040 /// assert!(rx.same_channel(&rx2));
1041 ///
1042 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1043 ///
1044 /// assert!(!rx3.same_channel(&rx2));
1045 /// }
1046 /// ```
1047 pub fn same_channel(&self, other: &Self) -> bool {
1048 Arc::ptr_eq(&self.shared, &other.shared)
1049 }
1050
1051 /// Locks the next value if there is one.
1052 fn recv_ref(
1053 &mut self,
1054 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1055 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1056 let idx = (self.next & self.shared.mask as u64) as usize;
1057
1058 // The slot holding the next value to read
1059 let mut slot = self.shared.buffer[idx].lock();
1060
1061 if slot.pos != self.next {
1062 // Release the `slot` lock before attempting to acquire the `tail`
1063 // lock. This is required because `send2` acquires the tail lock
1064 // first followed by the slot lock. Acquiring the locks in reverse
1065 // order here would result in a potential deadlock: `recv_ref`
1066 // acquires the `slot` lock and attempts to acquire the `tail` lock
1067 // while `send2` acquired the `tail` lock and attempts to acquire
1068 // the slot lock.
1069 drop(slot);
1070
1071 let mut old_waker = None;
1072
1073 let mut tail = self.shared.tail.lock();
1074
1075 // Acquire slot lock again
1076 slot = self.shared.buffer[idx].lock();
1077
1078 // Make sure the position did not change. This could happen in the
1079 // unlikely event that the buffer is wrapped between dropping the
1080 // read lock and acquiring the tail lock.
1081 if slot.pos != self.next {
1082 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1083
1084 if next_pos == self.next {
1085 // At this point the channel is empty for *this* receiver. If
1086 // it's been closed, then that's what we return, otherwise we
1087 // set a waker and return empty.
1088 if tail.closed {
1089 return Err(TryRecvError::Closed);
1090 }
1091
1092 // Store the waker
1093 if let Some((waiter, waker)) = waiter {
1094 // Safety: called while locked.
1095 unsafe {
1096 // Only queue if not already queued
1097 waiter.with_mut(|ptr| {
1098 // If there is no waker **or** if the currently
1099 // stored waker references a **different** task,
1100 // track the tasks' waker to be notified on
1101 // receipt of a new value.
1102 match (*ptr).waker {
1103 Some(ref w) if w.will_wake(waker) => {}
1104 _ => {
1105 old_waker = std::mem::replace(
1106 &mut (*ptr).waker,
1107 Some(waker.clone()),
1108 );
1109 }
1110 }
1111
1112 // If the waiter is not already queued, enqueue it.
1113 // `Relaxed` order suffices: we have synchronized with
1114 // all writers through the tail lock that we hold.
1115 if !(*ptr).queued.load(Relaxed) {
1116 // `Relaxed` order suffices: all the readers will
1117 // synchronize with this write through the tail lock.
1118 (*ptr).queued.store(true, Relaxed);
1119 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1120 }
1121 });
1122 }
1123 }
1124
1125 // Drop the old waker after releasing the locks.
1126 drop(slot);
1127 drop(tail);
1128 drop(old_waker);
1129
1130 return Err(TryRecvError::Empty);
1131 }
1132
1133 // At this point, the receiver has lagged behind the sender by
1134 // more than the channel capacity. The receiver will attempt to
1135 // catch up by skipping dropped messages and setting the
1136 // internal cursor to the **oldest** message stored by the
1137 // channel.
1138 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1139
1140 let missed = next.wrapping_sub(self.next);
1141
1142 drop(tail);
1143
1144 // The receiver is slow but no values have been missed
1145 if missed == 0 {
1146 self.next = self.next.wrapping_add(1);
1147
1148 return Ok(RecvGuard { slot });
1149 }
1150
1151 self.next = next;
1152
1153 return Err(TryRecvError::Lagged(missed));
1154 }
1155 }
1156
1157 self.next = self.next.wrapping_add(1);
1158
1159 Ok(RecvGuard { slot })
1160 }
1161}
1162
1163impl<T: Clone> Receiver<T> {
1164 /// Re-subscribes to the channel starting from the current tail element.
1165 ///
1166 /// This [`Receiver`] handle will receive a clone of all values sent
1167 /// **after** it has resubscribed. This will not include elements that are
1168 /// in the queue of the current receiver. Consider the following example.
1169 ///
1170 /// # Examples
1171 ///
1172 /// ```
1173 /// use tokio::sync::broadcast;
1174 ///
1175 /// #[tokio::main]
1176 /// async fn main() {
1177 /// let (tx, mut rx) = broadcast::channel(2);
1178 ///
1179 /// tx.send(1).unwrap();
1180 /// let mut rx2 = rx.resubscribe();
1181 /// tx.send(2).unwrap();
1182 ///
1183 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1184 /// assert_eq!(rx.recv().await.unwrap(), 1);
1185 /// }
1186 /// ```
1187 pub fn resubscribe(&self) -> Self {
1188 let shared = self.shared.clone();
1189 new_receiver(shared)
1190 }
1191 /// Receives the next value for this receiver.
1192 ///
1193 /// Each [`Receiver`] handle will receive a clone of all values sent
1194 /// **after** it has subscribed.
1195 ///
1196 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1197 /// dropped, indicating that no further values can be sent on the channel.
1198 ///
1199 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1200 /// sent values will overwrite old values. At this point, a call to [`recv`]
1201 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1202 /// internal cursor is updated to point to the oldest value still held by
1203 /// the channel. A subsequent call to [`recv`] will return this value
1204 /// **unless** it has been since overwritten.
1205 ///
1206 /// # Cancel safety
1207 ///
1208 /// This method is cancel safe. If `recv` is used as the event in a
1209 /// [`tokio::select!`](crate::select) statement and some other branch
1210 /// completes first, it is guaranteed that no messages were received on this
1211 /// channel.
1212 ///
1213 /// [`Receiver`]: crate::sync::broadcast::Receiver
1214 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1215 ///
1216 /// # Examples
1217 ///
1218 /// ```
1219 /// use tokio::sync::broadcast;
1220 ///
1221 /// #[tokio::main]
1222 /// async fn main() {
1223 /// let (tx, mut rx1) = broadcast::channel(16);
1224 /// let mut rx2 = tx.subscribe();
1225 ///
1226 /// tokio::spawn(async move {
1227 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1228 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1229 /// });
1230 ///
1231 /// tokio::spawn(async move {
1232 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1233 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1234 /// });
1235 ///
1236 /// tx.send(10).unwrap();
1237 /// tx.send(20).unwrap();
1238 /// }
1239 /// ```
1240 ///
1241 /// Handling lag
1242 ///
1243 /// ```
1244 /// use tokio::sync::broadcast;
1245 ///
1246 /// #[tokio::main]
1247 /// async fn main() {
1248 /// let (tx, mut rx) = broadcast::channel(2);
1249 ///
1250 /// tx.send(10).unwrap();
1251 /// tx.send(20).unwrap();
1252 /// tx.send(30).unwrap();
1253 ///
1254 /// // The receiver lagged behind
1255 /// assert!(rx.recv().await.is_err());
1256 ///
1257 /// // At this point, we can abort or continue with lost messages
1258 ///
1259 /// assert_eq!(20, rx.recv().await.unwrap());
1260 /// assert_eq!(30, rx.recv().await.unwrap());
1261 /// }
1262 /// ```
1263 pub async fn recv(&mut self) -> Result<T, RecvError> {
1264 cooperative(Recv::new(self)).await
1265 }
1266
1267 /// Attempts to return a pending value on this receiver without awaiting.
1268 ///
1269 /// This is useful for a flavor of "optimistic check" before deciding to
1270 /// await on a receiver.
1271 ///
1272 /// Compared with [`recv`], this function has three failure cases instead of two
1273 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1274 ///
1275 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1276 /// dropped, indicating that no further values can be sent on the channel.
1277 ///
1278 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1279 /// sent values will overwrite old values. At this point, a call to [`recv`]
1280 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1281 /// internal cursor is updated to point to the oldest value still held by
1282 /// the channel. A subsequent call to [`try_recv`] will return this value
1283 /// **unless** it has been since overwritten. If there are no values to
1284 /// receive, `Err(TryRecvError::Empty)` is returned.
1285 ///
1286 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1287 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1288 /// [`Receiver`]: crate::sync::broadcast::Receiver
1289 ///
1290 /// # Examples
1291 ///
1292 /// ```
1293 /// use tokio::sync::broadcast;
1294 ///
1295 /// #[tokio::main]
1296 /// async fn main() {
1297 /// let (tx, mut rx) = broadcast::channel(16);
1298 ///
1299 /// assert!(rx.try_recv().is_err());
1300 ///
1301 /// tx.send(10).unwrap();
1302 ///
1303 /// let value = rx.try_recv().unwrap();
1304 /// assert_eq!(10, value);
1305 /// }
1306 /// ```
1307 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1308 let guard = self.recv_ref(None)?;
1309 guard.clone_value().ok_or(TryRecvError::Closed)
1310 }
1311
1312 /// Blocking receive to call outside of asynchronous contexts.
1313 ///
1314 /// # Panics
1315 ///
1316 /// This function panics if called within an asynchronous execution
1317 /// context.
1318 ///
1319 /// # Examples
1320 /// ```
1321 /// use std::thread;
1322 /// use tokio::sync::broadcast;
1323 ///
1324 /// #[tokio::main]
1325 /// async fn main() {
1326 /// let (tx, mut rx) = broadcast::channel(16);
1327 ///
1328 /// let sync_code = thread::spawn(move || {
1329 /// assert_eq!(rx.blocking_recv(), Ok(10));
1330 /// });
1331 ///
1332 /// let _ = tx.send(10);
1333 /// sync_code.join().unwrap();
1334 /// }
1335 /// ```
1336 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1337 crate::future::block_on(self.recv())
1338 }
1339}
1340
1341impl<T> Drop for Receiver<T> {
1342 fn drop(&mut self) {
1343 let mut tail = self.shared.tail.lock();
1344
1345 tail.rx_cnt -= 1;
1346 let until = tail.pos;
1347
1348 drop(tail);
1349
1350 while self.next < until {
1351 match self.recv_ref(None) {
1352 Ok(_) => {}
1353 // The channel is closed
1354 Err(TryRecvError::Closed) => break,
1355 // Ignore lagging, we will catch up
1356 Err(TryRecvError::Lagged(..)) => {}
1357 // Can't be empty
1358 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1359 }
1360 }
1361 }
1362}
1363
1364impl<'a, T> Recv<'a, T> {
1365 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1366 Recv {
1367 receiver,
1368 waiter: WaiterCell(UnsafeCell::new(Waiter {
1369 queued: AtomicBool::new(false),
1370 waker: None,
1371 pointers: linked_list::Pointers::new(),
1372 _p: PhantomPinned,
1373 })),
1374 }
1375 }
1376
1377 /// A custom `project` implementation is used in place of `pin-project-lite`
1378 /// as a custom drop implementation is needed.
1379 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1380 unsafe {
1381 // Safety: Receiver is Unpin
1382 is_unpin::<&mut Receiver<T>>();
1383
1384 let me = self.get_unchecked_mut();
1385 (me.receiver, &me.waiter.0)
1386 }
1387 }
1388}
1389
1390impl<'a, T> Future for Recv<'a, T>
1391where
1392 T: Clone,
1393{
1394 type Output = Result<T, RecvError>;
1395
1396 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1397 ready!(crate::trace::trace_leaf(cx));
1398
1399 let (receiver, waiter) = self.project();
1400
1401 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1402 Ok(value) => value,
1403 Err(TryRecvError::Empty) => return Poll::Pending,
1404 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1405 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1406 };
1407
1408 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1409 }
1410}
1411
1412impl<'a, T> Drop for Recv<'a, T> {
1413 fn drop(&mut self) {
1414 // Safety: `waiter.queued` is atomic.
1415 // Acquire ordering is required to synchronize with
1416 // `Shared::notify_rx` before we drop the object.
1417 let queued = self
1418 .waiter
1419 .0
1420 .with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
1421
1422 // If the waiter is queued, we need to unlink it from the waiters list.
1423 // If not, no further synchronization is required, since the waiter
1424 // is not in the list and, as such, is not shared with any other threads.
1425 if queued {
1426 // Acquire the tail lock. This is required for safety before accessing
1427 // the waiter node.
1428 let mut tail = self.receiver.shared.tail.lock();
1429
1430 // Safety: tail lock is held.
1431 // `Relaxed` order suffices because we hold the tail lock.
1432 let queued = self
1433 .waiter
1434 .0
1435 .with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
1436
1437 if queued {
1438 // Remove the node
1439 //
1440 // safety: tail lock is held and the wait node is verified to be in
1441 // the list.
1442 unsafe {
1443 self.waiter.0.with_mut(|ptr| {
1444 tail.waiters.remove((&mut *ptr).into());
1445 });
1446 }
1447 }
1448 }
1449 }
1450}
1451
1452/// # Safety
1453///
1454/// `Waiter` is forced to be !Unpin.
1455unsafe impl linked_list::Link for Waiter {
1456 type Handle = NonNull<Waiter>;
1457 type Target = Waiter;
1458
1459 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1460 *handle
1461 }
1462
1463 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1464 ptr
1465 }
1466
1467 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1468 Waiter::addr_of_pointers(target)
1469 }
1470}
1471
1472impl<T> fmt::Debug for Sender<T> {
1473 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1474 write!(fmt, "broadcast::Sender")
1475 }
1476}
1477
1478impl<T> fmt::Debug for Receiver<T> {
1479 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1480 write!(fmt, "broadcast::Receiver")
1481 }
1482}
1483
1484impl<'a, T> RecvGuard<'a, T> {
1485 fn clone_value(&self) -> Option<T>
1486 where
1487 T: Clone,
1488 {
1489 self.slot.val.clone()
1490 }
1491}
1492
1493impl<'a, T> Drop for RecvGuard<'a, T> {
1494 fn drop(&mut self) {
1495 // Decrement the remaining counter
1496 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1497 self.slot.val = None;
1498 }
1499 }
1500}
1501
1502fn is_unpin<T: Unpin>() {}
1503
1504#[cfg(not(loom))]
1505#[cfg(test)]
1506mod tests {
1507 use super::*;
1508
1509 #[test]
1510 fn receiver_count_on_sender_constructor() {
1511 let sender = Sender::<i32>::new(16);
1512 assert_eq!(sender.receiver_count(), 0);
1513
1514 let rx_1 = sender.subscribe();
1515 assert_eq!(sender.receiver_count(), 1);
1516
1517 let rx_2 = rx_1.resubscribe();
1518 assert_eq!(sender.receiver_count(), 2);
1519
1520 let rx_3 = sender.subscribe();
1521 assert_eq!(sender.receiver_count(), 3);
1522
1523 drop(rx_3);
1524 drop(rx_1);
1525 assert_eq!(sender.receiver_count(), 1);
1526
1527 drop(rx_2);
1528 assert_eq!(sender.receiver_count(), 0);
1529 }
1530
1531 #[cfg(not(loom))]
1532 #[test]
1533 fn receiver_count_on_channel_constructor() {
1534 let (sender, rx) = channel::<i32>(16);
1535 assert_eq!(sender.receiver_count(), 1);
1536
1537 let _rx_2 = rx.resubscribe();
1538 assert_eq!(sender.receiver_count(), 2);
1539 }
1540}