interprocess/os/unix/uds_local_socket/
stream.rs

1use {
2    super::name_to_addr,
3    crate::{
4        error::ReuniteError,
5        local_socket::{
6            traits::{self, ReuniteResult},
7            ConcurrencyDetector, LocalSocketSite, Name,
8        },
9        Sealed, TryClone,
10    },
11    std::{
12        io::{self, prelude::*, IoSlice, IoSliceMut},
13        os::{fd::OwnedFd, unix::net::UnixStream},
14        sync::Arc,
15    },
16};
17
18/// Wrapper around [`UnixStream`] that implements [`Stream`](traits::Stream).
19#[derive(Debug)]
20pub struct Stream(pub(super) UnixStream, ConcurrencyDetector<LocalSocketSite>);
21impl Sealed for Stream {}
22impl traits::Stream for Stream {
23    type RecvHalf = RecvHalf;
24    type SendHalf = SendHalf;
25
26    fn connect(name: Name<'_>) -> io::Result<Self> {
27        UnixStream::connect_addr(&name_to_addr(name, false)?).map(Self::from)
28    }
29    #[inline]
30    fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
31        self.0.set_nonblocking(nonblocking)
32    }
33    #[inline]
34    fn split(self) -> (RecvHalf, SendHalf) {
35        let arc = Arc::new(self);
36        (RecvHalf(Arc::clone(&arc)), SendHalf(arc))
37    }
38    #[inline]
39    #[allow(clippy::unwrap_in_result)]
40    fn reunite(rh: RecvHalf, sh: SendHalf) -> ReuniteResult<Self> {
41        if !Arc::ptr_eq(&rh.0, &sh.0) {
42            return Err(ReuniteError { rh, sh });
43        }
44        drop(rh);
45        let inner = Arc::into_inner(sh.0).expect("stream half inexplicably copied");
46        Ok(inner)
47    }
48}
49
50impl Read for &Stream {
51    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
52        let _guard = self.1.lock();
53        (&mut &self.0).read(buf)
54    }
55    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
56        let _guard = self.1.lock();
57        (&mut &self.0).read_vectored(bufs)
58    }
59    // FUTURE is_read_vectored
60}
61impl Write for &Stream {
62    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
63        let _guard = self.1.lock();
64        (&mut &self.0).write(buf)
65    }
66    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
67        let _guard = self.1.lock();
68        (&mut &self.0).write_vectored(bufs)
69    }
70    #[inline]
71    fn flush(&mut self) -> io::Result<()> { Ok(()) }
72    // FUTURE is_write_vectored
73}
74
75impl From<UnixStream> for Stream {
76    fn from(s: UnixStream) -> Self { Self(s, ConcurrencyDetector::new()) }
77}
78
79impl From<OwnedFd> for Stream {
80    fn from(fd: OwnedFd) -> Self { UnixStream::from(fd).into() }
81}
82
83impl TryClone for Stream {
84    #[inline]
85    fn try_clone(&self) -> std::io::Result<Self> { self.0.try_clone().map(Self::from) }
86}
87
88multimacro! {
89    Stream,
90    forward_asinto_handle(unix),
91    derive_sync_mut_rw,
92}
93
94/// [`Stream`]'s receive half, implemented using [`Arc`].
95#[derive(Debug)]
96pub struct RecvHalf(pub(super) Arc<Stream>);
97impl Sealed for RecvHalf {}
98impl traits::RecvHalf for RecvHalf {
99    type Stream = Stream;
100}
101multimacro! {
102    RecvHalf,
103    forward_rbv(Stream, *),
104    forward_sync_ref_read,
105    forward_as_handle,
106    derive_sync_mut_read,
107}
108
109/// [`Stream`]'s send half, implemented using [`Arc`].
110#[derive(Debug)]
111pub struct SendHalf(pub(super) Arc<Stream>);
112impl Sealed for SendHalf {}
113impl traits::SendHalf for SendHalf {
114    type Stream = Stream;
115}
116multimacro! {
117    SendHalf,
118    forward_rbv(Stream, *),
119    forward_sync_ref_write,
120    forward_as_handle,
121    derive_sync_mut_write,
122}