interprocess/os/unix/uds_local_socket/
stream.rs1use {
2 super::name_to_addr,
3 crate::{
4 error::ReuniteError,
5 local_socket::{
6 traits::{self, ReuniteResult},
7 ConcurrencyDetector, LocalSocketSite, Name,
8 },
9 Sealed, TryClone,
10 },
11 std::{
12 io::{self, prelude::*, IoSlice, IoSliceMut},
13 os::{fd::OwnedFd, unix::net::UnixStream},
14 sync::Arc,
15 },
16};
17
18#[derive(Debug)]
20pub struct Stream(pub(super) UnixStream, ConcurrencyDetector<LocalSocketSite>);
21impl Sealed for Stream {}
22impl traits::Stream for Stream {
23 type RecvHalf = RecvHalf;
24 type SendHalf = SendHalf;
25
26 fn connect(name: Name<'_>) -> io::Result<Self> {
27 UnixStream::connect_addr(&name_to_addr(name, false)?).map(Self::from)
28 }
29 #[inline]
30 fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
31 self.0.set_nonblocking(nonblocking)
32 }
33 #[inline]
34 fn split(self) -> (RecvHalf, SendHalf) {
35 let arc = Arc::new(self);
36 (RecvHalf(Arc::clone(&arc)), SendHalf(arc))
37 }
38 #[inline]
39 #[allow(clippy::unwrap_in_result)]
40 fn reunite(rh: RecvHalf, sh: SendHalf) -> ReuniteResult<Self> {
41 if !Arc::ptr_eq(&rh.0, &sh.0) {
42 return Err(ReuniteError { rh, sh });
43 }
44 drop(rh);
45 let inner = Arc::into_inner(sh.0).expect("stream half inexplicably copied");
46 Ok(inner)
47 }
48}
49
50impl Read for &Stream {
51 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
52 let _guard = self.1.lock();
53 (&mut &self.0).read(buf)
54 }
55 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
56 let _guard = self.1.lock();
57 (&mut &self.0).read_vectored(bufs)
58 }
59 }
61impl Write for &Stream {
62 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
63 let _guard = self.1.lock();
64 (&mut &self.0).write(buf)
65 }
66 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
67 let _guard = self.1.lock();
68 (&mut &self.0).write_vectored(bufs)
69 }
70 #[inline]
71 fn flush(&mut self) -> io::Result<()> { Ok(()) }
72 }
74
75impl From<UnixStream> for Stream {
76 fn from(s: UnixStream) -> Self { Self(s, ConcurrencyDetector::new()) }
77}
78
79impl From<OwnedFd> for Stream {
80 fn from(fd: OwnedFd) -> Self { UnixStream::from(fd).into() }
81}
82
83impl TryClone for Stream {
84 #[inline]
85 fn try_clone(&self) -> std::io::Result<Self> { self.0.try_clone().map(Self::from) }
86}
87
88multimacro! {
89 Stream,
90 forward_asinto_handle(unix),
91 derive_sync_mut_rw,
92}
93
94#[derive(Debug)]
96pub struct RecvHalf(pub(super) Arc<Stream>);
97impl Sealed for RecvHalf {}
98impl traits::RecvHalf for RecvHalf {
99 type Stream = Stream;
100}
101multimacro! {
102 RecvHalf,
103 forward_rbv(Stream, *),
104 forward_sync_ref_read,
105 forward_as_handle,
106 derive_sync_mut_read,
107}
108
109#[derive(Debug)]
111pub struct SendHalf(pub(super) Arc<Stream>);
112impl Sealed for SendHalf {}
113impl traits::SendHalf for SendHalf {
114 type Stream = Stream;
115}
116multimacro! {
117 SendHalf,
118 forward_rbv(Stream, *),
119 forward_sync_ref_write,
120 forward_as_handle,
121 derive_sync_mut_write,
122}