interprocess/os/unix/uds_local_socket/tokio/
stream.rs

1use {
2    super::super::name_to_addr,
3    crate::{
4        error::ReuniteError,
5        local_socket::{traits::tokio as traits, Name},
6        Sealed,
7    },
8    std::{
9        io::{self, ErrorKind::WouldBlock},
10        os::{
11            fd::{AsFd, OwnedFd},
12            unix::{
13                net::{SocketAddr, UnixStream as SyncUnixStream},
14                prelude::BorrowedFd,
15            },
16        },
17        pin::Pin,
18        task::{ready, Context, Poll},
19    },
20    tokio::{
21        io::{AsyncRead, AsyncWrite, ReadBuf},
22        net::{
23            unix::{OwnedReadHalf as RecvHalfImpl, OwnedWriteHalf as SendHalfImpl},
24            UnixStream,
25        },
26    },
27};
28
29/// Wrapper around [`UnixStream`] that implements [`Stream`](traits::Stream).
30#[derive(Debug)]
31pub struct Stream(pub(super) UnixStream);
32impl Sealed for Stream {}
33
34impl Stream {
35    #[allow(clippy::unwrap_used)]
36    async fn _connect(addr: SocketAddr) -> io::Result<UnixStream> {
37        #[cfg(any(target_os = "linux", target_os = "android"))]
38        {
39            #[cfg(target_os = "android")]
40            use std::os::android::net::SocketAddrExt;
41            #[cfg(target_os = "linux")]
42            use std::os::linux::net::SocketAddrExt;
43            if addr.as_abstract_name().is_some() {
44                return tokio::task::spawn_blocking(move || {
45                    let stream = SyncUnixStream::connect_addr(&addr)?;
46                    stream.set_nonblocking(true)?;
47                    Ok::<_, io::Error>(stream)
48                })
49                .await??
50                .try_into();
51            }
52        }
53        UnixStream::connect(addr.as_pathname().unwrap()).await
54    }
55}
56
57impl traits::Stream for Stream {
58    type RecvHalf = RecvHalf;
59    type SendHalf = SendHalf;
60
61    async fn connect(name: Name<'_>) -> io::Result<Self> {
62        Self::_connect(name_to_addr(name, false)?).await.map(Self::from)
63    }
64    fn split(self) -> (RecvHalf, SendHalf) {
65        let (r, w) = self.0.into_split();
66        (RecvHalf(r), SendHalf(w))
67    }
68    #[inline]
69    fn reunite(rh: RecvHalf, sh: SendHalf) -> Result<Self, ReuniteError<RecvHalf, SendHalf>> {
70        rh.0.reunite(sh.0).map(Self::from).map_err(|tokio::net::unix::ReuniteError(rh, sh)| {
71            ReuniteError { rh: RecvHalf(rh), sh: SendHalf(sh) }
72        })
73    }
74}
75
76fn ioloop(
77    mut try_io: impl FnMut() -> io::Result<usize>,
78    mut poll_read_ready: impl FnMut() -> Poll<io::Result<()>>,
79) -> Poll<io::Result<usize>> {
80    loop {
81        match try_io() {
82            Err(e) if e.kind() == WouldBlock => ready!(poll_read_ready()?),
83            els => return Poll::Ready(els),
84        };
85    }
86}
87
88multimacro! {
89    Stream,
90    pinproj_for_unpin(UnixStream),
91    forward_rbv(UnixStream, &),
92    forward_tokio_rw,
93    forward_as_handle(unix),
94    derive_trivial_conv(UnixStream),
95}
96impl AsyncRead for &Stream {
97    #[inline]
98    fn poll_read(
99        self: Pin<&mut Self>,
100        cx: &mut Context<'_>,
101        buf: &mut ReadBuf<'_>,
102    ) -> Poll<io::Result<()>> {
103        ioloop(|| self.0.try_read_buf(buf), || self.0.poll_read_ready(cx)).map(|e| e.map(|_| ()))
104    }
105}
106impl AsyncWrite for &Stream {
107    #[inline]
108    fn poll_write(
109        self: Pin<&mut Self>,
110        cx: &mut Context<'_>,
111        buf: &[u8],
112    ) -> Poll<io::Result<usize>> {
113        ioloop(|| self.0.try_write(buf), || self.0.poll_write_ready(cx))
114    }
115    #[inline]
116    fn poll_write_vectored(
117        self: Pin<&mut Self>,
118        cx: &mut Context<'_>,
119        bufs: &[io::IoSlice<'_>],
120    ) -> Poll<io::Result<usize>> {
121        ioloop(|| self.0.try_write_vectored(bufs), || self.0.poll_write_ready(cx))
122    }
123    #[inline]
124    fn is_write_vectored(&self) -> bool { self.0.is_write_vectored() }
125    #[inline]
126    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
127        Poll::Ready(Ok(()))
128    }
129    #[inline]
130    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
131        Poll::Ready(Ok(()))
132    }
133}
134impl TryFrom<Stream> for OwnedFd {
135    type Error = io::Error;
136    #[inline]
137    fn try_from(slf: Stream) -> io::Result<Self> { Ok(slf.0.into_std()?.into()) }
138}
139impl TryFrom<OwnedFd> for Stream {
140    type Error = io::Error;
141    #[inline]
142    fn try_from(fd: OwnedFd) -> io::Result<Self> {
143        Ok(UnixStream::from_std(SyncUnixStream::from(fd))?.into())
144    }
145}
146
147/// [`Stream`]'s receive half, internally implemented using [`Arc`](std::sync::Arc) by Tokio.
148pub struct RecvHalf(RecvHalfImpl);
149impl Sealed for RecvHalf {}
150impl traits::RecvHalf for RecvHalf {
151    type Stream = Stream;
152}
153multimacro! {
154    RecvHalf,
155    pinproj_for_unpin(RecvHalfImpl),
156    forward_debug("local_socket::RecvHalf"),
157    forward_tokio_read,
158}
159impl AsyncRead for &RecvHalf {
160    #[inline]
161    fn poll_read(
162        self: Pin<&mut Self>,
163        cx: &mut Context<'_>,
164        buf: &mut ReadBuf<'_>,
165    ) -> Poll<io::Result<()>> {
166        ioloop(|| self.0.try_read_buf(buf), || self.0.as_ref().poll_read_ready(cx))
167            .map(|e| e.map(|_| ()))
168    }
169}
170impl AsFd for RecvHalf {
171    #[inline]
172    fn as_fd(&self) -> BorrowedFd<'_> { self.0.as_ref().as_fd() }
173}
174
175/// [`Stream`]'s send half, internally implemented using [`Arc`](std::sync::Arc) by Tokio.
176pub struct SendHalf(SendHalfImpl);
177impl Sealed for SendHalf {}
178impl traits::SendHalf for SendHalf {
179    type Stream = Stream;
180}
181multimacro! {
182    SendHalf,
183    pinproj_for_unpin(SendHalfImpl),
184    forward_rbv(SendHalfImpl, &),
185    forward_debug("local_socket::SendHalf"),
186    forward_tokio_write,
187}
188impl AsyncWrite for &SendHalf {
189    #[inline]
190    fn poll_write(
191        self: Pin<&mut Self>,
192        cx: &mut Context<'_>,
193        buf: &[u8],
194    ) -> Poll<io::Result<usize>> {
195        ioloop(|| self.0.try_write(buf), || self.0.as_ref().poll_write_ready(cx))
196    }
197    #[inline]
198    fn poll_write_vectored(
199        self: Pin<&mut Self>,
200        cx: &mut Context<'_>,
201        bufs: &[io::IoSlice<'_>],
202    ) -> Poll<io::Result<usize>> {
203        ioloop(|| self.0.try_write_vectored(bufs), || self.0.as_ref().poll_write_ready(cx))
204    }
205    #[inline]
206    fn is_write_vectored(&self) -> bool { self.0.is_write_vectored() }
207    #[inline]
208    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
209        Poll::Ready(Ok(()))
210    }
211    #[inline]
212    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
213        Poll::Ready(Ok(()))
214    }
215}
216impl AsFd for SendHalf {
217    #[inline]
218    fn as_fd(&self) -> BorrowedFd<'_> { self.0.as_ref().as_fd() }
219}