interprocess/os/unix/uds_local_socket/tokio/
stream.rs1use {
2 super::super::name_to_addr,
3 crate::{
4 error::ReuniteError,
5 local_socket::{traits::tokio as traits, Name},
6 Sealed,
7 },
8 std::{
9 io::{self, ErrorKind::WouldBlock},
10 os::{
11 fd::{AsFd, OwnedFd},
12 unix::{
13 net::{SocketAddr, UnixStream as SyncUnixStream},
14 prelude::BorrowedFd,
15 },
16 },
17 pin::Pin,
18 task::{ready, Context, Poll},
19 },
20 tokio::{
21 io::{AsyncRead, AsyncWrite, ReadBuf},
22 net::{
23 unix::{OwnedReadHalf as RecvHalfImpl, OwnedWriteHalf as SendHalfImpl},
24 UnixStream,
25 },
26 },
27};
28
29#[derive(Debug)]
31pub struct Stream(pub(super) UnixStream);
32impl Sealed for Stream {}
33
34impl Stream {
35 #[allow(clippy::unwrap_used)]
36 async fn _connect(addr: SocketAddr) -> io::Result<UnixStream> {
37 #[cfg(any(target_os = "linux", target_os = "android"))]
38 {
39 #[cfg(target_os = "android")]
40 use std::os::android::net::SocketAddrExt;
41 #[cfg(target_os = "linux")]
42 use std::os::linux::net::SocketAddrExt;
43 if addr.as_abstract_name().is_some() {
44 return tokio::task::spawn_blocking(move || {
45 let stream = SyncUnixStream::connect_addr(&addr)?;
46 stream.set_nonblocking(true)?;
47 Ok::<_, io::Error>(stream)
48 })
49 .await??
50 .try_into();
51 }
52 }
53 UnixStream::connect(addr.as_pathname().unwrap()).await
54 }
55}
56
57impl traits::Stream for Stream {
58 type RecvHalf = RecvHalf;
59 type SendHalf = SendHalf;
60
61 async fn connect(name: Name<'_>) -> io::Result<Self> {
62 Self::_connect(name_to_addr(name, false)?).await.map(Self::from)
63 }
64 fn split(self) -> (RecvHalf, SendHalf) {
65 let (r, w) = self.0.into_split();
66 (RecvHalf(r), SendHalf(w))
67 }
68 #[inline]
69 fn reunite(rh: RecvHalf, sh: SendHalf) -> Result<Self, ReuniteError<RecvHalf, SendHalf>> {
70 rh.0.reunite(sh.0).map(Self::from).map_err(|tokio::net::unix::ReuniteError(rh, sh)| {
71 ReuniteError { rh: RecvHalf(rh), sh: SendHalf(sh) }
72 })
73 }
74}
75
76fn ioloop(
77 mut try_io: impl FnMut() -> io::Result<usize>,
78 mut poll_read_ready: impl FnMut() -> Poll<io::Result<()>>,
79) -> Poll<io::Result<usize>> {
80 loop {
81 match try_io() {
82 Err(e) if e.kind() == WouldBlock => ready!(poll_read_ready()?),
83 els => return Poll::Ready(els),
84 };
85 }
86}
87
88multimacro! {
89 Stream,
90 pinproj_for_unpin(UnixStream),
91 forward_rbv(UnixStream, &),
92 forward_tokio_rw,
93 forward_as_handle(unix),
94 derive_trivial_conv(UnixStream),
95}
96impl AsyncRead for &Stream {
97 #[inline]
98 fn poll_read(
99 self: Pin<&mut Self>,
100 cx: &mut Context<'_>,
101 buf: &mut ReadBuf<'_>,
102 ) -> Poll<io::Result<()>> {
103 ioloop(|| self.0.try_read_buf(buf), || self.0.poll_read_ready(cx)).map(|e| e.map(|_| ()))
104 }
105}
106impl AsyncWrite for &Stream {
107 #[inline]
108 fn poll_write(
109 self: Pin<&mut Self>,
110 cx: &mut Context<'_>,
111 buf: &[u8],
112 ) -> Poll<io::Result<usize>> {
113 ioloop(|| self.0.try_write(buf), || self.0.poll_write_ready(cx))
114 }
115 #[inline]
116 fn poll_write_vectored(
117 self: Pin<&mut Self>,
118 cx: &mut Context<'_>,
119 bufs: &[io::IoSlice<'_>],
120 ) -> Poll<io::Result<usize>> {
121 ioloop(|| self.0.try_write_vectored(bufs), || self.0.poll_write_ready(cx))
122 }
123 #[inline]
124 fn is_write_vectored(&self) -> bool { self.0.is_write_vectored() }
125 #[inline]
126 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
127 Poll::Ready(Ok(()))
128 }
129 #[inline]
130 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
131 Poll::Ready(Ok(()))
132 }
133}
134impl TryFrom<Stream> for OwnedFd {
135 type Error = io::Error;
136 #[inline]
137 fn try_from(slf: Stream) -> io::Result<Self> { Ok(slf.0.into_std()?.into()) }
138}
139impl TryFrom<OwnedFd> for Stream {
140 type Error = io::Error;
141 #[inline]
142 fn try_from(fd: OwnedFd) -> io::Result<Self> {
143 Ok(UnixStream::from_std(SyncUnixStream::from(fd))?.into())
144 }
145}
146
147pub struct RecvHalf(RecvHalfImpl);
149impl Sealed for RecvHalf {}
150impl traits::RecvHalf for RecvHalf {
151 type Stream = Stream;
152}
153multimacro! {
154 RecvHalf,
155 pinproj_for_unpin(RecvHalfImpl),
156 forward_debug("local_socket::RecvHalf"),
157 forward_tokio_read,
158}
159impl AsyncRead for &RecvHalf {
160 #[inline]
161 fn poll_read(
162 self: Pin<&mut Self>,
163 cx: &mut Context<'_>,
164 buf: &mut ReadBuf<'_>,
165 ) -> Poll<io::Result<()>> {
166 ioloop(|| self.0.try_read_buf(buf), || self.0.as_ref().poll_read_ready(cx))
167 .map(|e| e.map(|_| ()))
168 }
169}
170impl AsFd for RecvHalf {
171 #[inline]
172 fn as_fd(&self) -> BorrowedFd<'_> { self.0.as_ref().as_fd() }
173}
174
175pub struct SendHalf(SendHalfImpl);
177impl Sealed for SendHalf {}
178impl traits::SendHalf for SendHalf {
179 type Stream = Stream;
180}
181multimacro! {
182 SendHalf,
183 pinproj_for_unpin(SendHalfImpl),
184 forward_rbv(SendHalfImpl, &),
185 forward_debug("local_socket::SendHalf"),
186 forward_tokio_write,
187}
188impl AsyncWrite for &SendHalf {
189 #[inline]
190 fn poll_write(
191 self: Pin<&mut Self>,
192 cx: &mut Context<'_>,
193 buf: &[u8],
194 ) -> Poll<io::Result<usize>> {
195 ioloop(|| self.0.try_write(buf), || self.0.as_ref().poll_write_ready(cx))
196 }
197 #[inline]
198 fn poll_write_vectored(
199 self: Pin<&mut Self>,
200 cx: &mut Context<'_>,
201 bufs: &[io::IoSlice<'_>],
202 ) -> Poll<io::Result<usize>> {
203 ioloop(|| self.0.try_write_vectored(bufs), || self.0.as_ref().poll_write_ready(cx))
204 }
205 #[inline]
206 fn is_write_vectored(&self) -> bool { self.0.is_write_vectored() }
207 #[inline]
208 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
209 Poll::Ready(Ok(()))
210 }
211 #[inline]
212 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
213 Poll::Ready(Ok(()))
214 }
215}
216impl AsFd for SendHalf {
217 #[inline]
218 fn as_fd(&self) -> BorrowedFd<'_> { self.0.as_ref().as_fd() }
219}