pub struct PollChannel<Resp> { /* private fields */ }Expand description
A channel yielding responses from a poller task.
This stream is backed by a coroutine, and will continue to produce responses
until the poller task is dropped. The poller task is dropped when all
RpcClient instances are dropped, or when all listening PollChannel are
dropped.
The poller task also ignores errors from the server and deserialization errors, and will continue to poll until the client is dropped.
Implementations§
Source§impl<Resp> PollChannel<Resp>
impl<Resp> PollChannel<Resp>
Sourcepub fn resubscribe(&self) -> Self
pub fn resubscribe(&self) -> Self
Resubscribe to the poller task.
Sourcepub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin
pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin
Converts the poll channel into a stream.
Sourcepub fn into_stream_raw(self) -> BroadcastStream<Resp>
pub fn into_stream_raw(self) -> BroadcastStream<Resp>
Converts the poll channel into a stream that also yields lag errors.
Methods from Deref<Target = Receiver<Resp>>§
Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Returns the number of messages that were sent into the channel and that
this Receiver has yet to receive.
If the returned value from len is larger than the next largest power of 2
of the capacity of the channel any call to recv will return an
Err(RecvError::Lagged) and any call to try_recv will return an
Err(TryRecvError::Lagged), e.g. if the capacity of the channel is 10,
recv will start to return Err(RecvError::Lagged) once len returns
values larger than 16.
§Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
tx.send(10).unwrap();
tx.send(20).unwrap();
assert_eq!(rx1.len(), 2);
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.len(), 1);
assert_eq!(rx1.recv().await.unwrap(), 20);
assert_eq!(rx1.len(), 0);
}Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns true if there aren’t any messages in the channel that the Receiver
has yet to receive.
§Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
assert!(rx1.is_empty());
tx.send(10).unwrap();
tx.send(20).unwrap();
assert!(!rx1.is_empty());
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
assert!(rx1.is_empty());
}Sourcepub fn same_channel(&self, other: &Receiver<T>) -> bool
pub fn same_channel(&self, other: &Receiver<T>) -> bool
Returns true if receivers belong to the same channel.
§Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, rx) = broadcast::channel::<()>(16);
let rx2 = tx.subscribe();
assert!(rx.same_channel(&rx2));
let (_tx3, rx3) = broadcast::channel::<()>(16);
assert!(!rx3.same_channel(&rx2));
}Sourcepub fn sender_strong_count(&self) -> usize
pub fn sender_strong_count(&self) -> usize
Returns the number of Sender handles.
Sourcepub fn sender_weak_count(&self) -> usize
pub fn sender_weak_count(&self) -> usize
Returns the number of WeakSender handles.
Sourcepub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
Checks if a channel is closed.
This method returns true if the channel has been closed. The channel is closed
when all Sender have been dropped.
§Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, rx) = broadcast::channel::<()>(10);
assert!(!rx.is_closed());
drop(tx);
assert!(rx.is_closed());
}Sourcepub fn resubscribe(&self) -> Receiver<T>
pub fn resubscribe(&self) -> Receiver<T>
Re-subscribes to the channel starting from the current tail element.
This Receiver handle will receive a clone of all values sent
after it has resubscribed. This will not include elements that are
in the queue of the current receiver. Consider the following example.
§Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(2);
tx.send(1).unwrap();
let mut rx2 = rx.resubscribe();
tx.send(2).unwrap();
assert_eq!(rx2.recv().await.unwrap(), 2);
assert_eq!(rx.recv().await.unwrap(), 1);
}Sourcepub async fn recv(&mut self) -> Result<T, RecvError>
pub async fn recv(&mut self) -> Result<T, RecvError>
Receives the next value for this receiver.
Each Receiver handle will receive a clone of all values sent
after it has subscribed.
Err(RecvError::Closed) is returned when all Sender halves have
dropped, indicating that no further values can be sent on the channel.
If the Receiver handle falls behind, once the channel is full, newly
sent values will overwrite old values. At this point, a call to recv
will return with Err(RecvError::Lagged) and the Receiver’s
internal cursor is updated to point to the oldest value still held by
the channel. A subsequent call to recv will return this value
unless it has been since overwritten.
§Cancel safety
This method is cancel safe. If recv is used as the event in a
tokio::select! statement and some other branch
completes first, it is guaranteed that no messages were received on this
channel.
§Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}Handling lag
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();
// The receiver lagged behind
assert!(rx.recv().await.is_err());
// At this point, we can abort or continue with lost messages
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());
}Sourcepub fn try_recv(&mut self) -> Result<T, TryRecvError>
pub fn try_recv(&mut self) -> Result<T, TryRecvError>
Attempts to return a pending value on this receiver without awaiting.
This is useful for a flavor of “optimistic check” before deciding to await on a receiver.
Compared with recv, this function has three failure cases instead of two
(one for closed, one for an empty buffer, one for a lagging receiver).
Err(TryRecvError::Closed) is returned when all Sender halves have
dropped, indicating that no further values can be sent on the channel.
If the Receiver handle falls behind, once the channel is full, newly
sent values will overwrite old values. At this point, a call to recv
will return with Err(TryRecvError::Lagged) and the Receiver’s
internal cursor is updated to point to the oldest value still held by
the channel. A subsequent call to try_recv will return this value
unless it has been since overwritten. If there are no values to
receive, Err(TryRecvError::Empty) is returned.
§Examples
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(16);
assert!(rx.try_recv().is_err());
tx.send(10).unwrap();
let value = rx.try_recv().unwrap();
assert_eq!(10, value);
}Sourcepub fn blocking_recv(&mut self) -> Result<T, RecvError>
pub fn blocking_recv(&mut self) -> Result<T, RecvError>
Blocking receive to call outside of asynchronous contexts.
§Panics
This function panics if called within an asynchronous execution context.
§Examples
use std::thread;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(16);
let sync_code = thread::spawn(move || {
assert_eq!(rx.blocking_recv(), Ok(10));
});
let _ = tx.send(10);
sync_code.join().unwrap();
}Trait Implementations§
Source§impl<Resp: Debug> Debug for PollChannel<Resp>
impl<Resp: Debug> Debug for PollChannel<Resp>
Source§impl<Resp> Deref for PollChannel<Resp>
impl<Resp> Deref for PollChannel<Resp>
Source§impl<Resp> DerefMut for PollChannel<Resp>
impl<Resp> DerefMut for PollChannel<Resp>
Auto Trait Implementations§
impl<Resp> Freeze for PollChannel<Resp>
impl<Resp> RefUnwindSafe for PollChannel<Resp>
impl<Resp> Send for PollChannel<Resp>where
Resp: Send,
impl<Resp> Sync for PollChannel<Resp>where
Resp: Send,
impl<Resp> Unpin for PollChannel<Resp>
impl<Resp> UnwindSafe for PollChannel<Resp>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.