tokio::sync::mpsc

Struct Sender

Source
pub struct Sender<T> { /* private fields */ }
Expand description

Sends values to the associated Receiver.

Instances are created by the channel function.

To convert the Sender into a Sink or use it in a poll function, you can use the PollSender utility.

Implementations§

Source§

impl<T> Sender<T>

Source

pub async fn send(&self, value: T) -> Result<(), SendError<T>>

Sends a value, waiting until there is capacity.

A successful send occurs when it is determined that the other end of the channel has not hung up already. An unsuccessful send would be one where the corresponding receiver has already been closed. Note that a return value of Err means that the data will never be received, but a return value of Ok does not mean that the data will be received. It is possible for the corresponding receiver to hang up immediately after this function returns Ok.

§Errors

If the receive half of the channel is closed, either due to close being called or the Receiver handle dropping, the function returns an error. The error includes the value passed to send.

§Cancel safety

If send is used as the event in a tokio::select! statement and some other branch completes first, then it is guaranteed that the message was not sent. However, in that case, the message is dropped and will be lost.

To avoid losing messages, use reserve to reserve capacity, then use the returned Permit to send the message.

This channel uses a queue to ensure that calls to send and reserve complete in the order they were requested. Cancelling a call to send makes you lose your place in the queue.

§Examples

In the following example, each call to send will block until the previously sent value was received.

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        for i in 0..10 {
            if let Err(_) = tx.send(i).await {
                println!("receiver dropped");
                return;
            }
        }
    });

    while let Some(i) = rx.recv().await {
        println!("got = {}", i);
    }
}
Source

pub async fn closed(&self)

Completes when the receiver has dropped.

This allows the producers to get notified when interest in the produced values is canceled and immediately stop doing work.

§Cancel safety

This method is cancel safe. Once the channel is closed, it stays closed forever and all future calls to closed will return immediately.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, rx) = mpsc::channel::<()>(1);
    let tx2 = tx1.clone();
    let tx3 = tx1.clone();
    let tx4 = tx1.clone();
    let tx5 = tx1.clone();
    tokio::spawn(async move {
        drop(rx);
    });

    futures::join!(
        tx1.closed(),
        tx2.closed(),
        tx3.closed(),
        tx4.closed(),
        tx5.closed()
    );
    println!("Receiver dropped");
}
Source

pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>>

Attempts to immediately send a message on this Sender

This method differs from send by returning immediately if the channel’s buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).

§Errors

If the channel capacity has been reached, i.e., the channel has n buffered values where n is the argument passed to channel, then an error is returned.

If the receive half of the channel is closed, either due to close being called or the Receiver handle dropping, the function returns an error. The error includes the value passed to send.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Create a channel with buffer size 1
    let (tx1, mut rx) = mpsc::channel(1);
    let tx2 = tx1.clone();

    tokio::spawn(async move {
        tx1.send(1).await.unwrap();
        tx1.send(2).await.unwrap();
        // task waits until the receiver receives a value.
    });

    tokio::spawn(async move {
        // This will return an error and send
        // no message if the buffer is full
        let _ = tx2.try_send(3);
    });

    let mut msg;
    msg = rx.recv().await.unwrap();
    println!("message {} received", msg);

    msg = rx.recv().await.unwrap();
    println!("message {} received", msg);

    // Third message may have never been sent
    match rx.recv().await {
        Some(msg) => println!("message {} received", msg),
        None => println!("the third message was never sent"),
    }
}
Source

pub async fn send_timeout( &self, value: T, timeout: Duration, ) -> Result<(), SendTimeoutError<T>>

Sends a value, waiting until there is capacity, but only for a limited time.

Shares the same success and error conditions as send, adding one more condition for an unsuccessful send, which is when the provided timeout has elapsed, and there is no capacity available.

§Errors

If the receive half of the channel is closed, either due to close being called or the Receiver having been dropped, the function returns an error. The error includes the value passed to send.

§Panics

This function panics if it is called outside the context of a Tokio runtime with time enabled.

§Examples

In the following example, each call to send_timeout will block until the previously sent value was received, unless the timeout has elapsed.

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    tokio::spawn(async move {
        for i in 0..10 {
            if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
                println!("send error: #{:?}", e);
                return;
            }
        }
    });

    while let Some(i) = rx.recv().await {
        println!("got = {}", i);
        sleep(Duration::from_millis(200)).await;
    }
}
Source

pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>>

Blocking send to call outside of asynchronous contexts.

This method is intended for use cases where you are sending from synchronous code to asynchronous code, and will work even if the receiver is not using blocking_recv to receive the message.

§Panics

This function panics if called within an asynchronous execution context.

§Examples
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

fn main() {
    let (tx, mut rx) = mpsc::channel::<u8>(1);

    let sync_code = thread::spawn(move || {
        tx.blocking_send(10).unwrap();
    });

    Runtime::new().unwrap().block_on(async move {
        assert_eq!(Some(10), rx.recv().await);
    });
    sync_code.join().unwrap()
}
Source

pub fn is_closed(&self) -> bool

Checks if the channel has been closed. This happens when the Receiver is dropped, or when the Receiver::close method is called.

let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
assert!(!tx.is_closed());

let tx2 = tx.clone();
assert!(!tx2.is_closed());

drop(rx);
assert!(tx.is_closed());
assert!(tx2.is_closed());
Source

pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>>

Waits for channel capacity. Once capacity to send one message is available, it is reserved for the caller.

If the channel is full, the function waits for the number of unreceived messages to become less than the channel capacity. Capacity to send one message is reserved for the caller. A Permit is returned to track the reserved capacity. The send function on Permit consumes the reserved capacity.

Dropping Permit without sending a message releases the capacity back to the channel.

§Cancel safety

This channel uses a queue to ensure that calls to send and reserve complete in the order they were requested. Cancelling a call to reserve makes you lose your place in the queue.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    // Reserve capacity
    let permit = tx.reserve().await.unwrap();

    // Trying to send directly on the `tx` will fail due to no
    // available capacity.
    assert!(tx.try_send(123).is_err());

    // Sending on the permit succeeds
    permit.send(456);

    // The value sent on the permit is received
    assert_eq!(rx.recv().await.unwrap(), 456);
}
Source

pub async fn reserve_many( &self, n: usize, ) -> Result<PermitIterator<'_, T>, SendError<()>>

Waits for channel capacity. Once capacity to send n messages is available, it is reserved for the caller.

If the channel is full or if there are fewer than n permits available, the function waits for the number of unreceived messages to become n less than the channel capacity. Capacity to send n message is then reserved for the caller.

A PermitIterator is returned to track the reserved capacity. You can call this Iterator until it is exhausted to get a Permit and then call Permit::send. This function is similar to try_reserve_many except it awaits for the slots to become available.

If the channel is closed, the function returns a SendError.

Dropping PermitIterator without consuming it entirely releases the remaining permits back to the channel.

§Cancel safety

This channel uses a queue to ensure that calls to send and reserve_many complete in the order they were requested. Cancelling a call to reserve_many makes you lose your place in the queue.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(2);

    // Reserve capacity
    let mut permit = tx.reserve_many(2).await.unwrap();

    // Trying to send directly on the `tx` will fail due to no
    // available capacity.
    assert!(tx.try_send(123).is_err());

    // Sending with the permit iterator succeeds
    permit.next().unwrap().send(456);
    permit.next().unwrap().send(457);

    // The iterator should now be exhausted
    assert!(permit.next().is_none());

    // The value sent on the permit is received
    assert_eq!(rx.recv().await.unwrap(), 456);
    assert_eq!(rx.recv().await.unwrap(), 457);
}
Source

pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>>

Waits for channel capacity, moving the Sender and returning an owned permit. Once capacity to send one message is available, it is reserved for the caller.

This moves the sender by value, and returns an owned permit that can be used to send a message into the channel. Unlike Sender::reserve, this method may be used in cases where the permit must be valid for the 'static lifetime. Senders may be cloned cheaply (Sender::clone is essentially a reference count increment, comparable to Arc::clone), so when multiple OwnedPermits are needed or the Sender cannot be moved, it can be cloned prior to calling reserve_owned.

If the channel is full, the function waits for the number of unreceived messages to become less than the channel capacity. Capacity to send one message is reserved for the caller. An OwnedPermit is returned to track the reserved capacity. The send function on OwnedPermit consumes the reserved capacity.

Dropping the OwnedPermit without sending a message releases the capacity back to the channel.

§Cancel safety

This channel uses a queue to ensure that calls to send and reserve complete in the order they were requested. Cancelling a call to reserve_owned makes you lose your place in the queue.

§Examples

Sending a message using an OwnedPermit:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    // Reserve capacity, moving the sender.
    let permit = tx.reserve_owned().await.unwrap();

    // Send a message, consuming the permit and returning
    // the moved sender.
    let tx = permit.send(123);

    // The value sent on the permit is received.
    assert_eq!(rx.recv().await.unwrap(), 123);

    // The sender can now be used again.
    tx.send(456).await.unwrap();
}

When multiple OwnedPermits are needed, or the sender cannot be moved by value, it can be inexpensively cloned before calling reserve_owned:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    // Clone the sender and reserve capacity.
    let permit = tx.clone().reserve_owned().await.unwrap();

    // Trying to send directly on the `tx` will fail due to no
    // available capacity.
    assert!(tx.try_send(123).is_err());

    // Sending on the permit succeeds.
    permit.send(456);

    // The value sent on the permit is received
    assert_eq!(rx.recv().await.unwrap(), 456);
}
Source

pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>>

Tries to acquire a slot in the channel without waiting for the slot to become available.

If the channel is full this function will return TrySendError, otherwise if there is a slot available it will return a Permit that will then allow you to send on the channel with a guaranteed slot. This function is similar to reserve except it does not await for the slot to become available.

Dropping Permit without sending a message releases the capacity back to the channel.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    // Reserve capacity
    let permit = tx.try_reserve().unwrap();

    // Trying to send directly on the `tx` will fail due to no
    // available capacity.
    assert!(tx.try_send(123).is_err());

    // Trying to reserve an additional slot on the `tx` will
    // fail because there is no capacity.
    assert!(tx.try_reserve().is_err());

    // Sending on the permit succeeds
    permit.send(456);

    // The value sent on the permit is received
    assert_eq!(rx.recv().await.unwrap(), 456);

}
Source

pub fn try_reserve_many( &self, n: usize, ) -> Result<PermitIterator<'_, T>, TrySendError<()>>

Tries to acquire n slots in the channel without waiting for the slot to become available.

A PermitIterator is returned to track the reserved capacity. You can call this Iterator until it is exhausted to get a Permit and then call Permit::send. This function is similar to reserve_many except it does not await for the slots to become available.

If there are fewer than n permits available on the channel, then this function will return a TrySendError::Full. If the channel is closed this function will return a TrySendError::Closed.

Dropping PermitIterator without consuming it entirely releases the remaining permits back to the channel.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(2);

    // Reserve capacity
    let mut permit = tx.try_reserve_many(2).unwrap();

    // Trying to send directly on the `tx` will fail due to no
    // available capacity.
    assert!(tx.try_send(123).is_err());

    // Trying to reserve an additional slot on the `tx` will
    // fail because there is no capacity.
    assert!(tx.try_reserve().is_err());

    // Sending with the permit iterator succeeds
    permit.next().unwrap().send(456);
    permit.next().unwrap().send(457);

    // The iterator should now be exhausted
    assert!(permit.next().is_none());

    // The value sent on the permit is received
    assert_eq!(rx.recv().await.unwrap(), 456);
    assert_eq!(rx.recv().await.unwrap(), 457);

    // Trying to call try_reserve_many with 0 will return an empty iterator
    let mut permit = tx.try_reserve_many(0).unwrap();
    assert!(permit.next().is_none());

    // Trying to call try_reserve_many with a number greater than the channel
    // capacity will return an error
    let permit = tx.try_reserve_many(3);
    assert!(permit.is_err());

    // Trying to call try_reserve_many on a closed channel will return an error
    drop(rx);
    let permit = tx.try_reserve_many(1);
    assert!(permit.is_err());

    let permit = tx.try_reserve_many(0);
    assert!(permit.is_err());
}
Source

pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>>

Tries to acquire a slot in the channel without waiting for the slot to become available, returning an owned permit.

This moves the sender by value, and returns an owned permit that can be used to send a message into the channel. Unlike Sender::try_reserve, this method may be used in cases where the permit must be valid for the 'static lifetime. Senders may be cloned cheaply (Sender::clone is essentially a reference count increment, comparable to Arc::clone), so when multiple OwnedPermits are needed or the Sender cannot be moved, it can be cloned prior to calling try_reserve_owned.

If the channel is full this function will return a TrySendError. Since the sender is taken by value, the TrySendError returned in this case contains the sender, so that it may be used again. Otherwise, if there is a slot available, this method will return an OwnedPermit that can then be used to send on the channel with a guaranteed slot. This function is similar to reserve_owned except it does not await for the slot to become available.

Dropping the OwnedPermit without sending a message releases the capacity back to the channel.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    // Reserve capacity
    let permit = tx.clone().try_reserve_owned().unwrap();

    // Trying to send directly on the `tx` will fail due to no
    // available capacity.
    assert!(tx.try_send(123).is_err());

    // Trying to reserve an additional slot on the `tx` will
    // fail because there is no capacity.
    assert!(tx.try_reserve().is_err());

    // Sending on the permit succeeds
    permit.send(456);

    // The value sent on the permit is received
    assert_eq!(rx.recv().await.unwrap(), 456);

}
Source

pub fn same_channel(&self, other: &Self) -> bool

Returns true if senders belong to the same channel.

§Examples
let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
let  tx2 = tx.clone();
assert!(tx.same_channel(&tx2));

let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
assert!(!tx3.same_channel(&tx2));
Source

pub fn capacity(&self) -> usize

Returns the current capacity of the channel.

The capacity goes down when sending a value by calling send or by reserving capacity with reserve. The capacity goes up when values are received by the Receiver. This is distinct from max_capacity, which always returns buffer capacity initially specified when calling channel

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<()>(5);

    assert_eq!(tx.capacity(), 5);

    // Making a reservation drops the capacity by one.
    let permit = tx.reserve().await.unwrap();
    assert_eq!(tx.capacity(), 4);

    // Sending and receiving a value increases the capacity by one.
    permit.send(());
    rx.recv().await.unwrap();
    assert_eq!(tx.capacity(), 5);
}
Source

pub fn downgrade(&self) -> WeakSender<T>

Converts the Sender to a WeakSender that does not count towards RAII semantics, i.e. if all Sender instances of the channel were dropped and only WeakSender instances remain, the channel is closed.

Source

pub fn max_capacity(&self) -> usize

Returns the maximum buffer capacity of the channel.

The maximum capacity is the buffer capacity initially specified when calling channel. This is distinct from capacity, which returns the current available buffer capacity: as messages are sent and received, the value returned by capacity will go up or down, whereas the value returned by max_capacity will remain constant.

§Examples
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, _rx) = mpsc::channel::<()>(5);

    // both max capacity and capacity are the same at first
    assert_eq!(tx.max_capacity(), 5);
    assert_eq!(tx.capacity(), 5);

    // Making a reservation doesn't change the max capacity.
    let permit = tx.reserve().await.unwrap();
    assert_eq!(tx.max_capacity(), 5);
    // but drops the capacity by one
    assert_eq!(tx.capacity(), 4);
}
Source

pub fn strong_count(&self) -> usize

Returns the number of Sender handles.

Source

pub fn weak_count(&self) -> usize

Returns the number of WeakSender handles.

Trait Implementations§

Source§

impl<T> Clone for Sender<T>

Source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T> Debug for Sender<T>

Source§

fn fmt(&self, fmt: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> Freeze for Sender<T>

§

impl<T> RefUnwindSafe for Sender<T>

§

impl<T> Send for Sender<T>
where T: Send,

§

impl<T> Sync for Sender<T>
where T: Send,

§

impl<T> Unpin for Sender<T>

§

impl<T> UnwindSafe for Sender<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.