interprocess/local_socket/tokio/stream/
enum.rs

1#[cfg(unix)]
2use crate::os::unix::uds_local_socket::tokio as uds_impl;
3#[cfg(windows)]
4use crate::os::windows::named_pipe::local_socket::tokio as np_impl;
5use {
6    super::r#trait,
7    crate::local_socket::Name,
8    std::{
9        io,
10        pin::Pin,
11        task::{Context, Poll},
12    },
13    tokio::io::{AsyncRead, AsyncWrite, ReadBuf},
14};
15
16impmod! {local_socket::dispatch_tokio as dispatch}
17
18macro_rules! dispatch_read {
19    (@iw $ty:ident) => {
20        #[inline]
21        fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
22            dispatch!($ty: x in self.get_mut() => Pin::new(x).poll_read(cx, buf))
23        }
24    };
25    ($ty:ident) => {
26        impl AsyncRead for &$ty {
27            dispatch_read!(@iw $ty);
28        }
29        impl AsyncRead for $ty {
30            dispatch_read!(@iw $ty);
31        }
32    };
33}
34macro_rules! dispatch_write {
35    (@iw $ty:ident) => {
36        #[inline]
37        fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
38            dispatch!($ty: x in self.get_mut() => Pin::new(x).poll_write(cx, buf))
39        }
40        #[inline]
41        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
42            Poll::Ready(Ok(()))
43        }
44        #[inline]
45        fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
46            Poll::Ready(Ok(()))
47        }
48    };
49    ($ty:ident) => {
50        /// Flushing and shutdown are always successful no-ops.
51        impl AsyncWrite for &$ty {
52            dispatch_write!(@iw $ty);
53        }
54        /// Flushing and shutdown are always successful no-ops.
55        impl AsyncWrite for $ty {
56            dispatch_write!(@iw $ty);
57        }
58    };
59}
60
61mkenum!(
62/// Tokio-based local socket byte stream, obtained either from [`Listener`](super::super::Listener)
63/// or by connecting to an existing local socket.
64///
65/// # Examples
66///
67/// ## Basic client
68/// ```no_run
69#[doc = doctest_file::include_doctest!("examples/local_socket/tokio/listener.rs")]
70/// ```
71Stream);
72
73impl r#trait::Stream for Stream {
74    type RecvHalf = RecvHalf;
75    type SendHalf = SendHalf;
76
77    #[inline]
78    async fn connect(name: Name<'_>) -> io::Result<Self> { dispatch::connect(name).await }
79    fn split(self) -> (RecvHalf, SendHalf) {
80        match self {
81            #[cfg(windows)]
82            Stream::NamedPipe(s) => {
83                let (rh, sh) = s.split();
84                (RecvHalf::NamedPipe(rh), SendHalf::NamedPipe(sh))
85            }
86            #[cfg(unix)]
87            Stream::UdSocket(s) => {
88                let (rh, sh) = s.split();
89                (RecvHalf::UdSocket(rh), SendHalf::UdSocket(sh))
90            }
91        }
92    }
93    fn reunite(rh: RecvHalf, sh: SendHalf) -> ReuniteResult {
94        match (rh, sh) {
95            #[cfg(windows)]
96            (RecvHalf::NamedPipe(rh), SendHalf::NamedPipe(sh)) => {
97                np_impl::Stream::reunite(rh, sh).map(From::from).map_err(|e| e.convert_halves())
98            }
99            #[cfg(unix)]
100            (RecvHalf::UdSocket(rh), SendHalf::UdSocket(sh)) => {
101                uds_impl::Stream::reunite(rh, sh).map(From::from).map_err(|e| e.convert_halves())
102            }
103            #[allow(unreachable_patterns)]
104            (rh, sh) => Err(ReuniteError { rh, sh }),
105        }
106    }
107}
108multimacro! {
109    Stream,
110    dispatch_read,
111    dispatch_write,
112}
113
114mkenum!(
115/// Receive half of a Tokio-based local socket stream, obtained by splitting a [`Stream`].
116"local_socket::tokio::" RecvHalf);
117impl r#trait::RecvHalf for RecvHalf {
118    type Stream = Stream;
119}
120multimacro! {
121    RecvHalf,
122    dispatch_read,
123}
124
125mkenum!(
126/// Send half of a Tokio-based local socket stream, obtained by splitting a [`Stream`].
127"local_socket::tokio::" SendHalf);
128impl r#trait::SendHalf for SendHalf {
129    type Stream = Stream;
130}
131multimacro! {
132    SendHalf,
133    dispatch_write,
134}
135
136/// [`ReuniteError`](crate::error::ReuniteError) for [`Stream`].
137pub type ReuniteError = crate::error::ReuniteError<RecvHalf, SendHalf>;
138
139/// Result type for [`.reunite()`](trait::Stream::reunite) on [`Stream`].
140pub type ReuniteResult = r#trait::ReuniteResult<Stream>;