interprocess/local_socket/tokio/stream/
enum.rs1#[cfg(unix)]
2use crate::os::unix::uds_local_socket::tokio as uds_impl;
3#[cfg(windows)]
4use crate::os::windows::named_pipe::local_socket::tokio as np_impl;
5use {
6 super::r#trait,
7 crate::local_socket::Name,
8 std::{
9 io,
10 pin::Pin,
11 task::{Context, Poll},
12 },
13 tokio::io::{AsyncRead, AsyncWrite, ReadBuf},
14};
15
16impmod! {local_socket::dispatch_tokio as dispatch}
17
18macro_rules! dispatch_read {
19 (@iw $ty:ident) => {
20 #[inline]
21 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
22 dispatch!($ty: x in self.get_mut() => Pin::new(x).poll_read(cx, buf))
23 }
24 };
25 ($ty:ident) => {
26 impl AsyncRead for &$ty {
27 dispatch_read!(@iw $ty);
28 }
29 impl AsyncRead for $ty {
30 dispatch_read!(@iw $ty);
31 }
32 };
33}
34macro_rules! dispatch_write {
35 (@iw $ty:ident) => {
36 #[inline]
37 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
38 dispatch!($ty: x in self.get_mut() => Pin::new(x).poll_write(cx, buf))
39 }
40 #[inline]
41 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
42 Poll::Ready(Ok(()))
43 }
44 #[inline]
45 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
46 Poll::Ready(Ok(()))
47 }
48 };
49 ($ty:ident) => {
50 impl AsyncWrite for &$ty {
52 dispatch_write!(@iw $ty);
53 }
54 impl AsyncWrite for $ty {
56 dispatch_write!(@iw $ty);
57 }
58 };
59}
60
61mkenum!(
62#[doc = doctest_file::include_doctest!("examples/local_socket/tokio/listener.rs")]
70Stream);
72
73impl r#trait::Stream for Stream {
74 type RecvHalf = RecvHalf;
75 type SendHalf = SendHalf;
76
77 #[inline]
78 async fn connect(name: Name<'_>) -> io::Result<Self> { dispatch::connect(name).await }
79 fn split(self) -> (RecvHalf, SendHalf) {
80 match self {
81 #[cfg(windows)]
82 Stream::NamedPipe(s) => {
83 let (rh, sh) = s.split();
84 (RecvHalf::NamedPipe(rh), SendHalf::NamedPipe(sh))
85 }
86 #[cfg(unix)]
87 Stream::UdSocket(s) => {
88 let (rh, sh) = s.split();
89 (RecvHalf::UdSocket(rh), SendHalf::UdSocket(sh))
90 }
91 }
92 }
93 fn reunite(rh: RecvHalf, sh: SendHalf) -> ReuniteResult {
94 match (rh, sh) {
95 #[cfg(windows)]
96 (RecvHalf::NamedPipe(rh), SendHalf::NamedPipe(sh)) => {
97 np_impl::Stream::reunite(rh, sh).map(From::from).map_err(|e| e.convert_halves())
98 }
99 #[cfg(unix)]
100 (RecvHalf::UdSocket(rh), SendHalf::UdSocket(sh)) => {
101 uds_impl::Stream::reunite(rh, sh).map(From::from).map_err(|e| e.convert_halves())
102 }
103 #[allow(unreachable_patterns)]
104 (rh, sh) => Err(ReuniteError { rh, sh }),
105 }
106 }
107}
108multimacro! {
109 Stream,
110 dispatch_read,
111 dispatch_write,
112}
113
114mkenum!(
115"local_socket::tokio::" RecvHalf);
117impl r#trait::RecvHalf for RecvHalf {
118 type Stream = Stream;
119}
120multimacro! {
121 RecvHalf,
122 dispatch_read,
123}
124
125mkenum!(
126"local_socket::tokio::" SendHalf);
128impl r#trait::SendHalf for SendHalf {
129 type Stream = Stream;
130}
131multimacro! {
132 SendHalf,
133 dispatch_write,
134}
135
136pub type ReuniteError = crate::error::ReuniteError<RecvHalf, SendHalf>;
138
139pub type ReuniteResult = r#trait::ReuniteResult<Stream>;