tokio/net/unix/datagram/
socket.rs

1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3use crate::util::check_socket_for_blocking;
4
5use std::fmt;
6use std::io;
7use std::net::Shutdown;
8use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
9use std::os::unix::net;
10use std::path::Path;
11use std::task::{ready, Context, Poll};
12
13cfg_io_util! {
14    use bytes::BufMut;
15}
16
17cfg_net_unix! {
18    /// An I/O object representing a Unix datagram socket.
19    ///
20    /// A socket can be either named (associated with a filesystem path) or
21    /// unnamed.
22    ///
23    /// This type does not provide a `split` method, because this functionality
24    /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
25    /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
26    /// is enough. This is because all of the methods take `&self` instead of
27    /// `&mut self`.
28    ///
29    /// **Note:** named sockets are persisted even after the object is dropped
30    /// and the program has exited, and cannot be reconnected. It is advised
31    /// that you either check for and unlink the existing socket if it exists,
32    /// or use a temporary file that is guaranteed to not already exist.
33    ///
34    /// [`Arc`]: std::sync::Arc
35    ///
36    /// # Examples
37    /// Using named sockets, associated with a filesystem path:
38    /// ```
39    /// # if cfg!(miri) { return } // No `socket` in miri.
40    /// # use std::error::Error;
41    /// # #[tokio::main]
42    /// # async fn main() -> Result<(), Box<dyn Error>> {
43    /// use tokio::net::UnixDatagram;
44    /// use tempfile::tempdir;
45    ///
46    /// // We use a temporary directory so that the socket
47    /// // files left by the bound sockets will get cleaned up.
48    /// let tmp = tempdir()?;
49    ///
50    /// // Bind each socket to a filesystem path
51    /// let tx_path = tmp.path().join("tx");
52    /// let tx = UnixDatagram::bind(&tx_path)?;
53    /// let rx_path = tmp.path().join("rx");
54    /// let rx = UnixDatagram::bind(&rx_path)?;
55    ///
56    /// let bytes = b"hello world";
57    /// tx.send_to(bytes, &rx_path).await?;
58    ///
59    /// let mut buf = vec![0u8; 24];
60    /// let (size, addr) = rx.recv_from(&mut buf).await?;
61    ///
62    /// let dgram = &buf[..size];
63    /// assert_eq!(dgram, bytes);
64    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
65    ///
66    /// # Ok(())
67    /// # }
68    /// ```
69    ///
70    /// Using unnamed sockets, created as a pair
71    /// ```
72    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
73    /// # use std::error::Error;
74    /// # #[tokio::main]
75    /// # async fn main() -> Result<(), Box<dyn Error>> {
76    /// use tokio::net::UnixDatagram;
77    ///
78    /// // Create the pair of sockets
79    /// let (sock1, sock2) = UnixDatagram::pair()?;
80    ///
81    /// // Since the sockets are paired, the paired send/recv
82    /// // functions can be used
83    /// let bytes = b"hello world";
84    /// sock1.send(bytes).await?;
85    ///
86    /// let mut buff = vec![0u8; 24];
87    /// let size = sock2.recv(&mut buff).await?;
88    ///
89    /// let dgram = &buff[..size];
90    /// assert_eq!(dgram, bytes);
91    ///
92    /// # Ok(())
93    /// # }
94    /// ```
95    #[cfg_attr(docsrs, doc(alias = "uds"))]
96    pub struct UnixDatagram {
97        io: PollEvented<mio::net::UnixDatagram>,
98    }
99}
100
101impl UnixDatagram {
102    pub(crate) fn from_mio(sys: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
103        let datagram = UnixDatagram::new(sys)?;
104
105        if let Some(e) = datagram.io.take_error()? {
106            return Err(e);
107        }
108
109        Ok(datagram)
110    }
111
112    /// Waits for any of the requested ready states.
113    ///
114    /// This function is usually paired with `try_recv()` or `try_send()`. It
115    /// can be used to concurrently `recv` / `send` to the same socket on a single
116    /// task without splitting the socket.
117    ///
118    /// The function may complete without the socket being ready. This is a
119    /// false-positive and attempting an operation will return with
120    /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
121    /// [`Ready`] set, so you should always check the returned value and possibly
122    /// wait again if the requested states are not set.
123    ///
124    /// # Cancel safety
125    ///
126    /// This method is cancel safe. Once a readiness event occurs, the method
127    /// will continue to return immediately until the readiness event is
128    /// consumed by an attempt to read or write that fails with `WouldBlock` or
129    /// `Poll::Pending`.
130    ///
131    /// # Examples
132    ///
133    /// Concurrently receive from and send to the socket on the same task
134    /// without splitting.
135    ///
136    /// ```no_run
137    /// use tokio::io::Interest;
138    /// use tokio::net::UnixDatagram;
139    /// use std::io;
140    ///
141    /// #[tokio::main]
142    /// async fn main() -> io::Result<()> {
143    ///     let dir = tempfile::tempdir().unwrap();
144    ///     let client_path = dir.path().join("client.sock");
145    ///     let server_path = dir.path().join("server.sock");
146    ///     let socket = UnixDatagram::bind(&client_path)?;
147    ///     socket.connect(&server_path)?;
148    ///
149    ///     loop {
150    ///         let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
151    ///
152    ///         if ready.is_readable() {
153    ///             let mut data = [0; 1024];
154    ///             match socket.try_recv(&mut data[..]) {
155    ///                 Ok(n) => {
156    ///                     println!("received {:?}", &data[..n]);
157    ///                 }
158    ///                 // False-positive, continue
159    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
160    ///                 Err(e) => {
161    ///                     return Err(e);
162    ///                 }
163    ///             }
164    ///         }
165    ///
166    ///         if ready.is_writable() {
167    ///             // Write some data
168    ///             match socket.try_send(b"hello world") {
169    ///                 Ok(n) => {
170    ///                     println!("sent {} bytes", n);
171    ///                 }
172    ///                 // False-positive, continue
173    ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
174    ///                 Err(e) => {
175    ///                     return Err(e);
176    ///                 }
177    ///             }
178    ///         }
179    ///     }
180    /// }
181    /// ```
182    pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
183        let event = self.io.registration().readiness(interest).await?;
184        Ok(event.ready)
185    }
186
187    /// Waits for the socket to become writable.
188    ///
189    /// This function is equivalent to `ready(Interest::WRITABLE)` and is
190    /// usually paired with `try_send()` or `try_send_to()`.
191    ///
192    /// The function may complete without the socket being writable. This is a
193    /// false-positive and attempting a `try_send()` will return with
194    /// `io::ErrorKind::WouldBlock`.
195    ///
196    /// # Cancel safety
197    ///
198    /// This method is cancel safe. Once a readiness event occurs, the method
199    /// will continue to return immediately until the readiness event is
200    /// consumed by an attempt to write that fails with `WouldBlock` or
201    /// `Poll::Pending`.
202    ///
203    /// # Examples
204    ///
205    /// ```no_run
206    /// use tokio::net::UnixDatagram;
207    /// use std::io;
208    ///
209    /// #[tokio::main]
210    /// async fn main() -> io::Result<()> {
211    ///     let dir = tempfile::tempdir().unwrap();
212    ///     let client_path = dir.path().join("client.sock");
213    ///     let server_path = dir.path().join("server.sock");
214    ///     let socket = UnixDatagram::bind(&client_path)?;
215    ///     socket.connect(&server_path)?;
216    ///
217    ///     loop {
218    ///         // Wait for the socket to be writable
219    ///         socket.writable().await?;
220    ///
221    ///         // Try to send data, this may still fail with `WouldBlock`
222    ///         // if the readiness event is a false positive.
223    ///         match socket.try_send(b"hello world") {
224    ///             Ok(n) => {
225    ///                 break;
226    ///             }
227    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
228    ///                 continue;
229    ///             }
230    ///             Err(e) => {
231    ///                 return Err(e);
232    ///             }
233    ///         }
234    ///     }
235    ///
236    ///     Ok(())
237    /// }
238    /// ```
239    pub async fn writable(&self) -> io::Result<()> {
240        self.ready(Interest::WRITABLE).await?;
241        Ok(())
242    }
243
244    /// Polls for write/send readiness.
245    ///
246    /// If the socket is not currently ready for sending, this method will
247    /// store a clone of the `Waker` from the provided `Context`. When the socket
248    /// becomes ready for sending, `Waker::wake` will be called on the
249    /// waker.
250    ///
251    /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
252    /// the `Waker` from the `Context` passed to the most recent call is
253    /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
254    /// second, independent waker.)
255    ///
256    /// This function is intended for cases where creating and pinning a future
257    /// via [`writable`] is not feasible. Where possible, using [`writable`] is
258    /// preferred, as this supports polling from multiple tasks at once.
259    ///
260    /// # Return value
261    ///
262    /// The function returns:
263    ///
264    /// * `Poll::Pending` if the socket is not ready for writing.
265    /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
266    /// * `Poll::Ready(Err(e))` if an error is encountered.
267    ///
268    /// # Errors
269    ///
270    /// This function may encounter any standard I/O error except `WouldBlock`.
271    ///
272    /// [`writable`]: method@Self::writable
273    pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
274        self.io.registration().poll_write_ready(cx).map_ok(|_| ())
275    }
276
277    /// Waits for the socket to become readable.
278    ///
279    /// This function is equivalent to `ready(Interest::READABLE)` and is usually
280    /// paired with `try_recv()`.
281    ///
282    /// The function may complete without the socket being readable. This is a
283    /// false-positive and attempting a `try_recv()` will return with
284    /// `io::ErrorKind::WouldBlock`.
285    ///
286    /// # Cancel safety
287    ///
288    /// This method is cancel safe. Once a readiness event occurs, the method
289    /// will continue to return immediately until the readiness event is
290    /// consumed by an attempt to read that fails with `WouldBlock` or
291    /// `Poll::Pending`.
292    ///
293    /// # Examples
294    ///
295    /// ```no_run
296    /// use tokio::net::UnixDatagram;
297    /// use std::io;
298    ///
299    /// #[tokio::main]
300    /// async fn main() -> io::Result<()> {
301    ///     // Connect to a peer
302    ///     let dir = tempfile::tempdir().unwrap();
303    ///     let client_path = dir.path().join("client.sock");
304    ///     let server_path = dir.path().join("server.sock");
305    ///     let socket = UnixDatagram::bind(&client_path)?;
306    ///     socket.connect(&server_path)?;
307    ///
308    ///     loop {
309    ///         // Wait for the socket to be readable
310    ///         socket.readable().await?;
311    ///
312    ///         // The buffer is **not** included in the async task and will
313    ///         // only exist on the stack.
314    ///         let mut buf = [0; 1024];
315    ///
316    ///         // Try to recv data, this may still fail with `WouldBlock`
317    ///         // if the readiness event is a false positive.
318    ///         match socket.try_recv(&mut buf) {
319    ///             Ok(n) => {
320    ///                 println!("GOT {:?}", &buf[..n]);
321    ///                 break;
322    ///             }
323    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
324    ///                 continue;
325    ///             }
326    ///             Err(e) => {
327    ///                 return Err(e);
328    ///             }
329    ///         }
330    ///     }
331    ///
332    ///     Ok(())
333    /// }
334    /// ```
335    pub async fn readable(&self) -> io::Result<()> {
336        self.ready(Interest::READABLE).await?;
337        Ok(())
338    }
339
340    /// Polls for read/receive readiness.
341    ///
342    /// If the socket is not currently ready for receiving, this method will
343    /// store a clone of the `Waker` from the provided `Context`. When the
344    /// socket becomes ready for reading, `Waker::wake` will be called on the
345    /// waker.
346    ///
347    /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
348    /// `poll_peek`, only the `Waker` from the `Context` passed to the most
349    /// recent call is scheduled to receive a wakeup. (However,
350    /// `poll_send_ready` retains a second, independent waker.)
351    ///
352    /// This function is intended for cases where creating and pinning a future
353    /// via [`readable`] is not feasible. Where possible, using [`readable`] is
354    /// preferred, as this supports polling from multiple tasks at once.
355    ///
356    /// # Return value
357    ///
358    /// The function returns:
359    ///
360    /// * `Poll::Pending` if the socket is not ready for reading.
361    /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
362    /// * `Poll::Ready(Err(e))` if an error is encountered.
363    ///
364    /// # Errors
365    ///
366    /// This function may encounter any standard I/O error except `WouldBlock`.
367    ///
368    /// [`readable`]: method@Self::readable
369    pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
370        self.io.registration().poll_read_ready(cx).map_ok(|_| ())
371    }
372
373    /// Creates a new `UnixDatagram` bound to the specified path.
374    ///
375    /// # Examples
376    /// ```
377    /// # if cfg!(miri) { return } // No `socket` in miri.
378    /// # use std::error::Error;
379    /// # #[tokio::main]
380    /// # async fn main() -> Result<(), Box<dyn Error>> {
381    /// use tokio::net::UnixDatagram;
382    /// use tempfile::tempdir;
383    ///
384    /// // We use a temporary directory so that the socket
385    /// // files left by the bound sockets will get cleaned up.
386    /// let tmp = tempdir()?;
387    ///
388    /// // Bind the socket to a filesystem path
389    /// let socket_path = tmp.path().join("socket");
390    /// let socket = UnixDatagram::bind(&socket_path)?;
391    ///
392    /// # Ok(())
393    /// # }
394    /// ```
395    pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
396    where
397        P: AsRef<Path>,
398    {
399        let socket = mio::net::UnixDatagram::bind(path)?;
400        UnixDatagram::new(socket)
401    }
402
403    /// Creates an unnamed pair of connected sockets.
404    ///
405    /// This function will create a pair of interconnected Unix sockets for
406    /// communicating back and forth between one another.
407    ///
408    /// # Examples
409    /// ```
410    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
411    /// # use std::error::Error;
412    /// # #[tokio::main]
413    /// # async fn main() -> Result<(), Box<dyn Error>> {
414    /// use tokio::net::UnixDatagram;
415    ///
416    /// // Create the pair of sockets
417    /// let (sock1, sock2) = UnixDatagram::pair()?;
418    ///
419    /// // Since the sockets are paired, the paired send/recv
420    /// // functions can be used
421    /// let bytes = b"hail eris";
422    /// sock1.send(bytes).await?;
423    ///
424    /// let mut buff = vec![0u8; 24];
425    /// let size = sock2.recv(&mut buff).await?;
426    ///
427    /// let dgram = &buff[..size];
428    /// assert_eq!(dgram, bytes);
429    ///
430    /// # Ok(())
431    /// # }
432    /// ```
433    pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
434        let (a, b) = mio::net::UnixDatagram::pair()?;
435        let a = UnixDatagram::new(a)?;
436        let b = UnixDatagram::new(b)?;
437
438        Ok((a, b))
439    }
440
441    /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`].
442    ///
443    /// This function is intended to be used to wrap a `UnixDatagram` from the
444    /// standard library in the Tokio equivalent.
445    ///
446    /// # Notes
447    ///
448    /// The caller is responsible for ensuring that the socket is in
449    /// non-blocking mode. Otherwise all I/O operations on the socket
450    /// will block the thread, which will cause unexpected behavior.
451    /// Non-blocking mode can be set using [`set_nonblocking`].
452    ///
453    /// Passing a listener in blocking mode is always erroneous,
454    /// and the behavior in that case may change in the future.
455    /// For example, it could panic.
456    ///
457    /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
458    ///
459    /// # Panics
460    ///
461    /// This function panics if it is not called from within a runtime with
462    /// IO enabled.
463    ///
464    /// The runtime is usually set implicitly when this function is called
465    /// from a future driven by a Tokio runtime, otherwise runtime can be set
466    /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
467    /// # Examples
468    /// ```
469    /// # if cfg!(miri) { return } // No `socket` in miri.
470    /// # use std::error::Error;
471    /// # #[tokio::main]
472    /// # async fn main() -> Result<(), Box<dyn Error>> {
473    /// use tokio::net::UnixDatagram;
474    /// use std::os::unix::net::UnixDatagram as StdUDS;
475    /// use tempfile::tempdir;
476    ///
477    /// // We use a temporary directory so that the socket
478    /// // files left by the bound sockets will get cleaned up.
479    /// let tmp = tempdir()?;
480    ///
481    /// // Bind the socket to a filesystem path
482    /// let socket_path = tmp.path().join("socket");
483    /// let std_socket = StdUDS::bind(&socket_path)?;
484    /// std_socket.set_nonblocking(true)?;
485    /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
486    ///
487    /// # Ok(())
488    /// # }
489    /// ```
490    #[track_caller]
491    pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
492        check_socket_for_blocking(&datagram)?;
493
494        let socket = mio::net::UnixDatagram::from_std(datagram);
495        let io = PollEvented::new(socket)?;
496        Ok(UnixDatagram { io })
497    }
498
499    /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
500    ///
501    /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
502    /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
503    /// if needed.
504    ///
505    /// # Examples
506    ///
507    /// ```rust,no_run
508    /// # use std::error::Error;
509    /// # async fn dox() -> Result<(), Box<dyn Error>> {
510    /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
511    /// let std_socket = tokio_socket.into_std()?;
512    /// std_socket.set_nonblocking(false)?;
513    /// # Ok(())
514    /// # }
515    /// ```
516    ///
517    /// [`tokio::net::UnixDatagram`]: UnixDatagram
518    /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
519    /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
520    pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
521        self.io
522            .into_inner()
523            .map(IntoRawFd::into_raw_fd)
524            .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
525    }
526
527    fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
528        let io = PollEvented::new(socket)?;
529        Ok(UnixDatagram { io })
530    }
531
532    /// Creates a new `UnixDatagram` which is not bound to any address.
533    ///
534    /// # Examples
535    /// ```
536    /// # if cfg!(miri) { return } // No `socket` in miri.
537    /// # use std::error::Error;
538    /// # #[tokio::main]
539    /// # async fn main() -> Result<(), Box<dyn Error>> {
540    /// use tokio::net::UnixDatagram;
541    /// use tempfile::tempdir;
542    ///
543    /// // Create an unbound socket
544    /// let tx = UnixDatagram::unbound()?;
545    ///
546    /// // Create another, bound socket
547    /// let tmp = tempdir()?;
548    /// let rx_path = tmp.path().join("rx");
549    /// let rx = UnixDatagram::bind(&rx_path)?;
550    ///
551    /// // Send to the bound socket
552    /// let bytes = b"hello world";
553    /// tx.send_to(bytes, &rx_path).await?;
554    ///
555    /// let mut buf = vec![0u8; 24];
556    /// let (size, addr) = rx.recv_from(&mut buf).await?;
557    ///
558    /// let dgram = &buf[..size];
559    /// assert_eq!(dgram, bytes);
560    ///
561    /// # Ok(())
562    /// # }
563    /// ```
564    pub fn unbound() -> io::Result<UnixDatagram> {
565        let socket = mio::net::UnixDatagram::unbound()?;
566        UnixDatagram::new(socket)
567    }
568
569    /// Connects the socket to the specified address.
570    ///
571    /// The `send` method may be used to send data to the specified address.
572    /// `recv` and `recv_from` will only receive data from that address.
573    ///
574    /// # Examples
575    /// ```
576    /// # if cfg!(miri) { return } // No `socket` in miri.
577    /// # use std::error::Error;
578    /// # #[tokio::main]
579    /// # async fn main() -> Result<(), Box<dyn Error>> {
580    /// use tokio::net::UnixDatagram;
581    /// use tempfile::tempdir;
582    ///
583    /// // Create an unbound socket
584    /// let tx = UnixDatagram::unbound()?;
585    ///
586    /// // Create another, bound socket
587    /// let tmp = tempdir()?;
588    /// let rx_path = tmp.path().join("rx");
589    /// let rx = UnixDatagram::bind(&rx_path)?;
590    ///
591    /// // Connect to the bound socket
592    /// tx.connect(&rx_path)?;
593    ///
594    /// // Send to the bound socket
595    /// let bytes = b"hello world";
596    /// tx.send(bytes).await?;
597    ///
598    /// let mut buf = vec![0u8; 24];
599    /// let (size, addr) = rx.recv_from(&mut buf).await?;
600    ///
601    /// let dgram = &buf[..size];
602    /// assert_eq!(dgram, bytes);
603    ///
604    /// # Ok(())
605    /// # }
606    /// ```
607    pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
608        self.io.connect(path)
609    }
610
611    /// Sends data on the socket to the socket's peer.
612    ///
613    /// # Cancel safety
614    ///
615    /// This method is cancel safe. If `send` is used as the event in a
616    /// [`tokio::select!`](crate::select) statement and some other branch
617    /// completes first, then it is guaranteed that the message was not sent.
618    ///
619    /// # Examples
620    /// ```
621    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
622    /// # use std::error::Error;
623    /// # #[tokio::main]
624    /// # async fn main() -> Result<(), Box<dyn Error>> {
625    /// use tokio::net::UnixDatagram;
626    ///
627    /// // Create the pair of sockets
628    /// let (sock1, sock2) = UnixDatagram::pair()?;
629    ///
630    /// // Since the sockets are paired, the paired send/recv
631    /// // functions can be used
632    /// let bytes = b"hello world";
633    /// sock1.send(bytes).await?;
634    ///
635    /// let mut buff = vec![0u8; 24];
636    /// let size = sock2.recv(&mut buff).await?;
637    ///
638    /// let dgram = &buff[..size];
639    /// assert_eq!(dgram, bytes);
640    ///
641    /// # Ok(())
642    /// # }
643    /// ```
644    pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
645        self.io
646            .registration()
647            .async_io(Interest::WRITABLE, || self.io.send(buf))
648            .await
649    }
650
651    /// Tries to send a datagram to the peer without waiting.
652    ///
653    /// # Examples
654    ///
655    /// ```no_run
656    /// use tokio::net::UnixDatagram;
657    /// use std::io;
658    ///
659    /// #[tokio::main]
660    /// async fn main() -> io::Result<()> {
661    ///     let dir = tempfile::tempdir().unwrap();
662    ///     let client_path = dir.path().join("client.sock");
663    ///     let server_path = dir.path().join("server.sock");
664    ///     let socket = UnixDatagram::bind(&client_path)?;
665    ///     socket.connect(&server_path)?;
666    ///
667    ///     loop {
668    ///         // Wait for the socket to be writable
669    ///         socket.writable().await?;
670    ///
671    ///         // Try to send data, this may still fail with `WouldBlock`
672    ///         // if the readiness event is a false positive.
673    ///         match socket.try_send(b"hello world") {
674    ///             Ok(n) => {
675    ///                 break;
676    ///             }
677    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
678    ///                 continue;
679    ///             }
680    ///             Err(e) => {
681    ///                 return Err(e);
682    ///             }
683    ///         }
684    ///     }
685    ///
686    ///     Ok(())
687    /// }
688    /// ```
689    pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
690        self.io
691            .registration()
692            .try_io(Interest::WRITABLE, || self.io.send(buf))
693    }
694
695    /// Tries to send a datagram to the peer without waiting.
696    ///
697    /// # Examples
698    ///
699    /// ```no_run
700    /// use tokio::net::UnixDatagram;
701    /// use std::io;
702    ///
703    /// #[tokio::main]
704    /// async fn main() -> io::Result<()> {
705    ///     let dir = tempfile::tempdir().unwrap();
706    ///     let client_path = dir.path().join("client.sock");
707    ///     let server_path = dir.path().join("server.sock");
708    ///     let socket = UnixDatagram::bind(&client_path)?;
709    ///
710    ///     loop {
711    ///         // Wait for the socket to be writable
712    ///         socket.writable().await?;
713    ///
714    ///         // Try to send data, this may still fail with `WouldBlock`
715    ///         // if the readiness event is a false positive.
716    ///         match socket.try_send_to(b"hello world", &server_path) {
717    ///             Ok(n) => {
718    ///                 break;
719    ///             }
720    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
721    ///                 continue;
722    ///             }
723    ///             Err(e) => {
724    ///                 return Err(e);
725    ///             }
726    ///         }
727    ///     }
728    ///
729    ///     Ok(())
730    /// }
731    /// ```
732    pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
733    where
734        P: AsRef<Path>,
735    {
736        self.io
737            .registration()
738            .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
739    }
740
741    /// Receives data from the socket.
742    ///
743    /// # Cancel safety
744    ///
745    /// This method is cancel safe. If `recv` is used as the event in a
746    /// [`tokio::select!`](crate::select) statement and some other branch
747    /// completes first, it is guaranteed that no messages were received on this
748    /// socket.
749    ///
750    /// # Examples
751    /// ```
752    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
753    /// # use std::error::Error;
754    /// # #[tokio::main]
755    /// # async fn main() -> Result<(), Box<dyn Error>> {
756    /// use tokio::net::UnixDatagram;
757    ///
758    /// // Create the pair of sockets
759    /// let (sock1, sock2) = UnixDatagram::pair()?;
760    ///
761    /// // Since the sockets are paired, the paired send/recv
762    /// // functions can be used
763    /// let bytes = b"hello world";
764    /// sock1.send(bytes).await?;
765    ///
766    /// let mut buff = vec![0u8; 24];
767    /// let size = sock2.recv(&mut buff).await?;
768    ///
769    /// let dgram = &buff[..size];
770    /// assert_eq!(dgram, bytes);
771    ///
772    /// # Ok(())
773    /// # }
774    /// ```
775    pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
776        self.io
777            .registration()
778            .async_io(Interest::READABLE, || self.io.recv(buf))
779            .await
780    }
781
782    /// Tries to receive a datagram from the peer without waiting.
783    ///
784    /// # Examples
785    ///
786    /// ```no_run
787    /// use tokio::net::UnixDatagram;
788    /// use std::io;
789    ///
790    /// #[tokio::main]
791    /// async fn main() -> io::Result<()> {
792    ///     // Connect to a peer
793    ///     let dir = tempfile::tempdir().unwrap();
794    ///     let client_path = dir.path().join("client.sock");
795    ///     let server_path = dir.path().join("server.sock");
796    ///     let socket = UnixDatagram::bind(&client_path)?;
797    ///     socket.connect(&server_path)?;
798    ///
799    ///     loop {
800    ///         // Wait for the socket to be readable
801    ///         socket.readable().await?;
802    ///
803    ///         // The buffer is **not** included in the async task and will
804    ///         // only exist on the stack.
805    ///         let mut buf = [0; 1024];
806    ///
807    ///         // Try to recv data, this may still fail with `WouldBlock`
808    ///         // if the readiness event is a false positive.
809    ///         match socket.try_recv(&mut buf) {
810    ///             Ok(n) => {
811    ///                 println!("GOT {:?}", &buf[..n]);
812    ///                 break;
813    ///             }
814    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
815    ///                 continue;
816    ///             }
817    ///             Err(e) => {
818    ///                 return Err(e);
819    ///             }
820    ///         }
821    ///     }
822    ///
823    ///     Ok(())
824    /// }
825    /// ```
826    pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
827        self.io
828            .registration()
829            .try_io(Interest::READABLE, || self.io.recv(buf))
830    }
831
832    cfg_io_util! {
833        /// Tries to receive data from the socket without waiting.
834        ///
835        /// This method can be used even if `buf` is uninitialized.
836        ///
837        /// # Examples
838        ///
839        /// ```no_run
840        /// use tokio::net::UnixDatagram;
841        /// use std::io;
842        ///
843        /// #[tokio::main]
844        /// async fn main() -> io::Result<()> {
845        ///     // Connect to a peer
846        ///     let dir = tempfile::tempdir().unwrap();
847        ///     let client_path = dir.path().join("client.sock");
848        ///     let server_path = dir.path().join("server.sock");
849        ///     let socket = UnixDatagram::bind(&client_path)?;
850        ///
851        ///     loop {
852        ///         // Wait for the socket to be readable
853        ///         socket.readable().await?;
854        ///
855        ///         let mut buf = Vec::with_capacity(1024);
856        ///
857        ///         // Try to recv data, this may still fail with `WouldBlock`
858        ///         // if the readiness event is a false positive.
859        ///         match socket.try_recv_buf_from(&mut buf) {
860        ///             Ok((n, _addr)) => {
861        ///                 println!("GOT {:?}", &buf[..n]);
862        ///                 break;
863        ///             }
864        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
865        ///                 continue;
866        ///             }
867        ///             Err(e) => {
868        ///                 return Err(e);
869        ///             }
870        ///         }
871        ///     }
872        ///
873        ///     Ok(())
874        /// }
875        /// ```
876        pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
877            let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
878                let dst = buf.chunk_mut();
879                let dst =
880                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
881
882                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
883                // buffer.
884                let (n, addr) = (*self.io).recv_from(dst)?;
885
886                unsafe {
887                    buf.advance_mut(n);
888                }
889
890                Ok((n, addr))
891            })?;
892
893            Ok((n, SocketAddr(addr)))
894        }
895
896        /// Receives from the socket, advances the
897        /// buffer's internal cursor and returns how many bytes were read and the origin.
898        ///
899        /// This method can be used even if `buf` is uninitialized.
900        ///
901        /// # Examples
902        /// ```
903        /// # if cfg!(miri) { return } // No `socket` in miri.
904        /// # use std::error::Error;
905        /// # #[tokio::main]
906        /// # async fn main() -> Result<(), Box<dyn Error>> {
907        /// use tokio::net::UnixDatagram;
908        /// use tempfile::tempdir;
909        ///
910        /// // We use a temporary directory so that the socket
911        /// // files left by the bound sockets will get cleaned up.
912        /// let tmp = tempdir()?;
913        ///
914        /// // Bind each socket to a filesystem path
915        /// let tx_path = tmp.path().join("tx");
916        /// let tx = UnixDatagram::bind(&tx_path)?;
917        /// let rx_path = tmp.path().join("rx");
918        /// let rx = UnixDatagram::bind(&rx_path)?;
919        ///
920        /// let bytes = b"hello world";
921        /// tx.send_to(bytes, &rx_path).await?;
922        ///
923        /// let mut buf = Vec::with_capacity(24);
924        /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
925        ///
926        /// let dgram = &buf[..size];
927        /// assert_eq!(dgram, bytes);
928        /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
929        ///
930        /// # Ok(())
931        /// # }
932        /// ```
933        pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
934            self.io.registration().async_io(Interest::READABLE, || {
935                let dst = buf.chunk_mut();
936                let dst =
937                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
938
939                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
940                // buffer.
941                let (n, addr) = (*self.io).recv_from(dst)?;
942
943                unsafe {
944                    buf.advance_mut(n);
945                }
946                Ok((n,SocketAddr(addr)))
947            }).await
948        }
949
950        /// Tries to read data from the stream into the provided buffer, advancing the
951        /// buffer's internal cursor, returning how many bytes were read.
952        ///
953        /// This method can be used even if `buf` is uninitialized.
954        ///
955        /// # Examples
956        ///
957        /// ```no_run
958        /// use tokio::net::UnixDatagram;
959        /// use std::io;
960        ///
961        /// #[tokio::main]
962        /// async fn main() -> io::Result<()> {
963        ///     // Connect to a peer
964        ///     let dir = tempfile::tempdir().unwrap();
965        ///     let client_path = dir.path().join("client.sock");
966        ///     let server_path = dir.path().join("server.sock");
967        ///     let socket = UnixDatagram::bind(&client_path)?;
968        ///     socket.connect(&server_path)?;
969        ///
970        ///     loop {
971        ///         // Wait for the socket to be readable
972        ///         socket.readable().await?;
973        ///
974        ///         let mut buf = Vec::with_capacity(1024);
975        ///
976        ///         // Try to recv data, this may still fail with `WouldBlock`
977        ///         // if the readiness event is a false positive.
978        ///         match socket.try_recv_buf(&mut buf) {
979        ///             Ok(n) => {
980        ///                 println!("GOT {:?}", &buf[..n]);
981        ///                 break;
982        ///             }
983        ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
984        ///                 continue;
985        ///             }
986        ///             Err(e) => {
987        ///                 return Err(e);
988        ///             }
989        ///         }
990        ///     }
991        ///
992        ///     Ok(())
993        /// }
994        /// ```
995        pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
996            self.io.registration().try_io(Interest::READABLE, || {
997                let dst = buf.chunk_mut();
998                let dst =
999                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1000
1001                // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
1002                // buffer.
1003                let n = (*self.io).recv(dst)?;
1004
1005                unsafe {
1006                    buf.advance_mut(n);
1007                }
1008
1009                Ok(n)
1010            })
1011        }
1012
1013        /// Receives data from the socket from the address to which it is connected,
1014        /// advancing the buffer's internal cursor, returning how many bytes were read.
1015        ///
1016        /// This method can be used even if `buf` is uninitialized.
1017        ///
1018        /// # Examples
1019        /// ```
1020        /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1021        /// # use std::error::Error;
1022        /// # #[tokio::main]
1023        /// # async fn main() -> Result<(), Box<dyn Error>> {
1024        /// use tokio::net::UnixDatagram;
1025        ///
1026        /// // Create the pair of sockets
1027        /// let (sock1, sock2) = UnixDatagram::pair()?;
1028        ///
1029        /// // Since the sockets are paired, the paired send/recv
1030        /// // functions can be used
1031        /// let bytes = b"hello world";
1032        /// sock1.send(bytes).await?;
1033        ///
1034        /// let mut buff = Vec::with_capacity(24);
1035        /// let size = sock2.recv_buf(&mut buff).await?;
1036        ///
1037        /// let dgram = &buff[..size];
1038        /// assert_eq!(dgram, bytes);
1039        ///
1040        /// # Ok(())
1041        /// # }
1042        /// ```
1043        pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1044            self.io.registration().async_io(Interest::READABLE, || {
1045                let dst = buf.chunk_mut();
1046                let dst =
1047                    unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1048
1049                // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1050                // buffer.
1051                let n = (*self.io).recv(dst)?;
1052
1053                unsafe {
1054                    buf.advance_mut(n);
1055                }
1056                Ok(n)
1057            }).await
1058        }
1059    }
1060
1061    /// Sends data on the socket to the specified address.
1062    ///
1063    /// # Cancel safety
1064    ///
1065    /// This method is cancel safe. If `send_to` is used as the event in a
1066    /// [`tokio::select!`](crate::select) statement and some other branch
1067    /// completes first, then it is guaranteed that the message was not sent.
1068    ///
1069    /// # Examples
1070    /// ```
1071    /// # if cfg!(miri) { return } // No `socket` in miri.
1072    /// # use std::error::Error;
1073    /// # #[tokio::main]
1074    /// # async fn main() -> Result<(), Box<dyn Error>> {
1075    /// use tokio::net::UnixDatagram;
1076    /// use tempfile::tempdir;
1077    ///
1078    /// // We use a temporary directory so that the socket
1079    /// // files left by the bound sockets will get cleaned up.
1080    /// let tmp = tempdir()?;
1081    ///
1082    /// // Bind each socket to a filesystem path
1083    /// let tx_path = tmp.path().join("tx");
1084    /// let tx = UnixDatagram::bind(&tx_path)?;
1085    /// let rx_path = tmp.path().join("rx");
1086    /// let rx = UnixDatagram::bind(&rx_path)?;
1087    ///
1088    /// let bytes = b"hello world";
1089    /// tx.send_to(bytes, &rx_path).await?;
1090    ///
1091    /// let mut buf = vec![0u8; 24];
1092    /// let (size, addr) = rx.recv_from(&mut buf).await?;
1093    ///
1094    /// let dgram = &buf[..size];
1095    /// assert_eq!(dgram, bytes);
1096    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1097    ///
1098    /// # Ok(())
1099    /// # }
1100    /// ```
1101    pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1102    where
1103        P: AsRef<Path>,
1104    {
1105        self.io
1106            .registration()
1107            .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1108            .await
1109    }
1110
1111    /// Receives data from the socket.
1112    ///
1113    /// # Cancel safety
1114    ///
1115    /// This method is cancel safe. If `recv_from` is used as the event in a
1116    /// [`tokio::select!`](crate::select) statement and some other branch
1117    /// completes first, it is guaranteed that no messages were received on this
1118    /// socket.
1119    ///
1120    /// # Examples
1121    /// ```
1122    /// # if cfg!(miri) { return } // No `socket` in miri.
1123    /// # use std::error::Error;
1124    /// # #[tokio::main]
1125    /// # async fn main() -> Result<(), Box<dyn Error>> {
1126    /// use tokio::net::UnixDatagram;
1127    /// use tempfile::tempdir;
1128    ///
1129    /// // We use a temporary directory so that the socket
1130    /// // files left by the bound sockets will get cleaned up.
1131    /// let tmp = tempdir()?;
1132    ///
1133    /// // Bind each socket to a filesystem path
1134    /// let tx_path = tmp.path().join("tx");
1135    /// let tx = UnixDatagram::bind(&tx_path)?;
1136    /// let rx_path = tmp.path().join("rx");
1137    /// let rx = UnixDatagram::bind(&rx_path)?;
1138    ///
1139    /// let bytes = b"hello world";
1140    /// tx.send_to(bytes, &rx_path).await?;
1141    ///
1142    /// let mut buf = vec![0u8; 24];
1143    /// let (size, addr) = rx.recv_from(&mut buf).await?;
1144    ///
1145    /// let dgram = &buf[..size];
1146    /// assert_eq!(dgram, bytes);
1147    /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1148    ///
1149    /// # Ok(())
1150    /// # }
1151    /// ```
1152    pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1153        let (n, addr) = self
1154            .io
1155            .registration()
1156            .async_io(Interest::READABLE, || self.io.recv_from(buf))
1157            .await?;
1158
1159        Ok((n, SocketAddr(addr)))
1160    }
1161
1162    /// Attempts to receive a single datagram on the specified address.
1163    ///
1164    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1165    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1166    /// receive a wakeup.
1167    ///
1168    /// # Return value
1169    ///
1170    /// The function returns:
1171    ///
1172    /// * `Poll::Pending` if the socket is not ready to read
1173    /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1174    /// * `Poll::Ready(Err(e))` if an error is encountered.
1175    ///
1176    /// # Errors
1177    ///
1178    /// This function may encounter any standard I/O error except `WouldBlock`.
1179    pub fn poll_recv_from(
1180        &self,
1181        cx: &mut Context<'_>,
1182        buf: &mut ReadBuf<'_>,
1183    ) -> Poll<io::Result<SocketAddr>> {
1184        #[allow(clippy::blocks_in_conditions)]
1185        let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1186            // Safety: will not read the maybe uninitialized bytes.
1187            let b = unsafe {
1188                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1189            };
1190
1191            self.io.recv_from(b)
1192        }))?;
1193
1194        // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1195        unsafe {
1196            buf.assume_init(n);
1197        }
1198        buf.advance(n);
1199        Poll::Ready(Ok(SocketAddr(addr)))
1200    }
1201
1202    /// Attempts to send data to the specified address.
1203    ///
1204    /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1205    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1206    /// receive a wakeup.
1207    ///
1208    /// # Return value
1209    ///
1210    /// The function returns:
1211    ///
1212    /// * `Poll::Pending` if the socket is not ready to write
1213    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1214    /// * `Poll::Ready(Err(e))` if an error is encountered.
1215    ///
1216    /// # Errors
1217    ///
1218    /// This function may encounter any standard I/O error except `WouldBlock`.
1219    pub fn poll_send_to<P>(
1220        &self,
1221        cx: &mut Context<'_>,
1222        buf: &[u8],
1223        target: P,
1224    ) -> Poll<io::Result<usize>>
1225    where
1226        P: AsRef<Path>,
1227    {
1228        self.io
1229            .registration()
1230            .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1231    }
1232
1233    /// Attempts to send data on the socket to the remote address to which it
1234    /// was previously `connect`ed.
1235    ///
1236    /// The [`connect`] method will connect this socket to a remote address.
1237    /// This method will fail if the socket is not connected.
1238    ///
1239    /// Note that on multiple calls to a `poll_*` method in the send direction,
1240    /// only the `Waker` from the `Context` passed to the most recent call will
1241    /// be scheduled to receive a wakeup.
1242    ///
1243    /// # Return value
1244    ///
1245    /// The function returns:
1246    ///
1247    /// * `Poll::Pending` if the socket is not available to write
1248    /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1249    /// * `Poll::Ready(Err(e))` if an error is encountered.
1250    ///
1251    /// # Errors
1252    ///
1253    /// This function may encounter any standard I/O error except `WouldBlock`.
1254    ///
1255    /// [`connect`]: method@Self::connect
1256    pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1257        self.io
1258            .registration()
1259            .poll_write_io(cx, || self.io.send(buf))
1260    }
1261
1262    /// Attempts to receive a single datagram message on the socket from the remote
1263    /// address to which it is `connect`ed.
1264    ///
1265    /// The [`connect`] method will connect this socket to a remote address. This method
1266    /// resolves to an error if the socket is not connected.
1267    ///
1268    /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1269    /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1270    /// receive a wakeup.
1271    ///
1272    /// # Return value
1273    ///
1274    /// The function returns:
1275    ///
1276    /// * `Poll::Pending` if the socket is not ready to read
1277    /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1278    /// * `Poll::Ready(Err(e))` if an error is encountered.
1279    ///
1280    /// # Errors
1281    ///
1282    /// This function may encounter any standard I/O error except `WouldBlock`.
1283    ///
1284    /// [`connect`]: method@Self::connect
1285    pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1286        #[allow(clippy::blocks_in_conditions)]
1287        let n = ready!(self.io.registration().poll_read_io(cx, || {
1288            // Safety: will not read the maybe uninitialized bytes.
1289            let b = unsafe {
1290                &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1291            };
1292
1293            self.io.recv(b)
1294        }))?;
1295
1296        // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1297        unsafe {
1298            buf.assume_init(n);
1299        }
1300        buf.advance(n);
1301        Poll::Ready(Ok(()))
1302    }
1303
1304    /// Tries to receive data from the socket without waiting.
1305    ///
1306    /// # Examples
1307    ///
1308    /// ```no_run
1309    /// use tokio::net::UnixDatagram;
1310    /// use std::io;
1311    ///
1312    /// #[tokio::main]
1313    /// async fn main() -> io::Result<()> {
1314    ///     // Connect to a peer
1315    ///     let dir = tempfile::tempdir().unwrap();
1316    ///     let client_path = dir.path().join("client.sock");
1317    ///     let server_path = dir.path().join("server.sock");
1318    ///     let socket = UnixDatagram::bind(&client_path)?;
1319    ///
1320    ///     loop {
1321    ///         // Wait for the socket to be readable
1322    ///         socket.readable().await?;
1323    ///
1324    ///         // The buffer is **not** included in the async task and will
1325    ///         // only exist on the stack.
1326    ///         let mut buf = [0; 1024];
1327    ///
1328    ///         // Try to recv data, this may still fail with `WouldBlock`
1329    ///         // if the readiness event is a false positive.
1330    ///         match socket.try_recv_from(&mut buf) {
1331    ///             Ok((n, _addr)) => {
1332    ///                 println!("GOT {:?}", &buf[..n]);
1333    ///                 break;
1334    ///             }
1335    ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1336    ///                 continue;
1337    ///             }
1338    ///             Err(e) => {
1339    ///                 return Err(e);
1340    ///             }
1341    ///         }
1342    ///     }
1343    ///
1344    ///     Ok(())
1345    /// }
1346    /// ```
1347    pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1348        let (n, addr) = self
1349            .io
1350            .registration()
1351            .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1352
1353        Ok((n, SocketAddr(addr)))
1354    }
1355
1356    /// Tries to read or write from the socket using a user-provided IO operation.
1357    ///
1358    /// If the socket is ready, the provided closure is called. The closure
1359    /// should attempt to perform IO operation on the socket by manually
1360    /// calling the appropriate syscall. If the operation fails because the
1361    /// socket is not actually ready, then the closure should return a
1362    /// `WouldBlock` error and the readiness flag is cleared. The return value
1363    /// of the closure is then returned by `try_io`.
1364    ///
1365    /// If the socket is not ready, then the closure is not called
1366    /// and a `WouldBlock` error is returned.
1367    ///
1368    /// The closure should only return a `WouldBlock` error if it has performed
1369    /// an IO operation on the socket that failed due to the socket not being
1370    /// ready. Returning a `WouldBlock` error in any other situation will
1371    /// incorrectly clear the readiness flag, which can cause the socket to
1372    /// behave incorrectly.
1373    ///
1374    /// The closure should not perform the IO operation using any of the methods
1375    /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1376    /// readiness flag and can cause the socket to behave incorrectly.
1377    ///
1378    /// This method is not intended to be used with combined interests.
1379    /// The closure should perform only one type of IO operation, so it should not
1380    /// require more than one ready state. This method may panic or sleep forever
1381    /// if it is called with a combined interest.
1382    ///
1383    /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1384    ///
1385    /// [`readable()`]: UnixDatagram::readable()
1386    /// [`writable()`]: UnixDatagram::writable()
1387    /// [`ready()`]: UnixDatagram::ready()
1388    pub fn try_io<R>(
1389        &self,
1390        interest: Interest,
1391        f: impl FnOnce() -> io::Result<R>,
1392    ) -> io::Result<R> {
1393        self.io
1394            .registration()
1395            .try_io(interest, || self.io.try_io(f))
1396    }
1397
1398    /// Reads or writes from the socket using a user-provided IO operation.
1399    ///
1400    /// The readiness of the socket is awaited and when the socket is ready,
1401    /// the provided closure is called. The closure should attempt to perform
1402    /// IO operation on the socket by manually calling the appropriate syscall.
1403    /// If the operation fails because the socket is not actually ready,
1404    /// then the closure should return a `WouldBlock` error. In such case the
1405    /// readiness flag is cleared and the socket readiness is awaited again.
1406    /// This loop is repeated until the closure returns an `Ok` or an error
1407    /// other than `WouldBlock`.
1408    ///
1409    /// The closure should only return a `WouldBlock` error if it has performed
1410    /// an IO operation on the socket that failed due to the socket not being
1411    /// ready. Returning a `WouldBlock` error in any other situation will
1412    /// incorrectly clear the readiness flag, which can cause the socket to
1413    /// behave incorrectly.
1414    ///
1415    /// The closure should not perform the IO operation using any of the methods
1416    /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1417    /// readiness flag and can cause the socket to behave incorrectly.
1418    ///
1419    /// This method is not intended to be used with combined interests.
1420    /// The closure should perform only one type of IO operation, so it should not
1421    /// require more than one ready state. This method may panic or sleep forever
1422    /// if it is called with a combined interest.
1423    pub async fn async_io<R>(
1424        &self,
1425        interest: Interest,
1426        mut f: impl FnMut() -> io::Result<R>,
1427    ) -> io::Result<R> {
1428        self.io
1429            .registration()
1430            .async_io(interest, || self.io.try_io(&mut f))
1431            .await
1432    }
1433
1434    /// Returns the local address that this socket is bound to.
1435    ///
1436    /// # Examples
1437    /// For a socket bound to a local path
1438    /// ```
1439    /// # if cfg!(miri) { return } // No `socket` in miri.
1440    /// # use std::error::Error;
1441    /// # #[tokio::main]
1442    /// # async fn main() -> Result<(), Box<dyn Error>> {
1443    /// use tokio::net::UnixDatagram;
1444    /// use tempfile::tempdir;
1445    ///
1446    /// // We use a temporary directory so that the socket
1447    /// // files left by the bound sockets will get cleaned up.
1448    /// let tmp = tempdir()?;
1449    ///
1450    /// // Bind socket to a filesystem path
1451    /// let socket_path = tmp.path().join("socket");
1452    /// let socket = UnixDatagram::bind(&socket_path)?;
1453    ///
1454    /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1455    ///
1456    /// # Ok(())
1457    /// # }
1458    /// ```
1459    ///
1460    /// For an unbound socket
1461    /// ```
1462    /// # if cfg!(miri) { return } // No `socket` in miri.
1463    /// # use std::error::Error;
1464    /// # #[tokio::main]
1465    /// # async fn main() -> Result<(), Box<dyn Error>> {
1466    /// use tokio::net::UnixDatagram;
1467    ///
1468    /// // Create an unbound socket
1469    /// let socket = UnixDatagram::unbound()?;
1470    ///
1471    /// assert!(socket.local_addr()?.is_unnamed());
1472    ///
1473    /// # Ok(())
1474    /// # }
1475    /// ```
1476    pub fn local_addr(&self) -> io::Result<SocketAddr> {
1477        self.io.local_addr().map(SocketAddr)
1478    }
1479
1480    /// Returns the address of this socket's peer.
1481    ///
1482    /// The `connect` method will connect the socket to a peer.
1483    ///
1484    /// # Examples
1485    /// For a peer with a local path
1486    /// ```
1487    /// # if cfg!(miri) { return } // No `socket` in miri.
1488    /// # use std::error::Error;
1489    /// # #[tokio::main]
1490    /// # async fn main() -> Result<(), Box<dyn Error>> {
1491    /// use tokio::net::UnixDatagram;
1492    /// use tempfile::tempdir;
1493    ///
1494    /// // Create an unbound socket
1495    /// let tx = UnixDatagram::unbound()?;
1496    ///
1497    /// // Create another, bound socket
1498    /// let tmp = tempdir()?;
1499    /// let rx_path = tmp.path().join("rx");
1500    /// let rx = UnixDatagram::bind(&rx_path)?;
1501    ///
1502    /// // Connect to the bound socket
1503    /// tx.connect(&rx_path)?;
1504    ///
1505    /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1506    ///
1507    /// # Ok(())
1508    /// # }
1509    /// ```
1510    ///
1511    /// For an unbound peer
1512    /// ```
1513    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1514    /// # use std::error::Error;
1515    /// # #[tokio::main]
1516    /// # async fn main() -> Result<(), Box<dyn Error>> {
1517    /// use tokio::net::UnixDatagram;
1518    ///
1519    /// // Create the pair of sockets
1520    /// let (sock1, sock2) = UnixDatagram::pair()?;
1521    ///
1522    /// assert!(sock1.peer_addr()?.is_unnamed());
1523    ///
1524    /// # Ok(())
1525    /// # }
1526    /// ```
1527    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1528        self.io.peer_addr().map(SocketAddr)
1529    }
1530
1531    /// Returns the value of the `SO_ERROR` option.
1532    ///
1533    /// # Examples
1534    /// ```
1535    /// # if cfg!(miri) { return } // No `socket` in miri.
1536    /// # use std::error::Error;
1537    /// # #[tokio::main]
1538    /// # async fn main() -> Result<(), Box<dyn Error>> {
1539    /// use tokio::net::UnixDatagram;
1540    ///
1541    /// // Create an unbound socket
1542    /// let socket = UnixDatagram::unbound()?;
1543    ///
1544    /// if let Ok(Some(err)) = socket.take_error() {
1545    ///     println!("Got error: {:?}", err);
1546    /// }
1547    ///
1548    /// # Ok(())
1549    /// # }
1550    /// ```
1551    pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1552        self.io.take_error()
1553    }
1554
1555    /// Shuts down the read, write, or both halves of this connection.
1556    ///
1557    /// This function will cause all pending and future I/O calls on the
1558    /// specified portions to immediately return with an appropriate value
1559    /// (see the documentation of `Shutdown`).
1560    ///
1561    /// # Examples
1562    /// ```
1563    /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1564    /// # use std::error::Error;
1565    /// # #[tokio::main]
1566    /// # async fn main() -> Result<(), Box<dyn Error>> {
1567    /// use tokio::net::UnixDatagram;
1568    /// use std::net::Shutdown;
1569    ///
1570    /// // Create an unbound socket
1571    /// let (socket, other) = UnixDatagram::pair()?;
1572    ///
1573    /// socket.shutdown(Shutdown::Both)?;
1574    ///
1575    /// // NOTE: the following commented out code does NOT work as expected.
1576    /// // Due to an underlying issue, the recv call will block indefinitely.
1577    /// // See: https://github.com/tokio-rs/tokio/issues/1679
1578    /// //let mut buff = vec![0u8; 24];
1579    /// //let size = socket.recv(&mut buff).await?;
1580    /// //assert_eq!(size, 0);
1581    ///
1582    /// let send_result = socket.send(b"hello world").await;
1583    /// assert!(send_result.is_err());
1584    ///
1585    /// # Ok(())
1586    /// # }
1587    /// ```
1588    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1589        self.io.shutdown(how)
1590    }
1591}
1592
1593impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1594    type Error = io::Error;
1595
1596    /// Consumes stream, returning the Tokio I/O object.
1597    ///
1598    /// This is equivalent to
1599    /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1600    fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1601        Self::from_std(stream)
1602    }
1603}
1604
1605impl fmt::Debug for UnixDatagram {
1606    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1607        self.io.fmt(f)
1608    }
1609}
1610
1611impl AsRawFd for UnixDatagram {
1612    fn as_raw_fd(&self) -> RawFd {
1613        self.io.as_raw_fd()
1614    }
1615}
1616
1617impl AsFd for UnixDatagram {
1618    fn as_fd(&self) -> BorrowedFd<'_> {
1619        unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1620    }
1621}