tokio/net/unix/datagram/socket.rs
1use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2use crate::net::unix::SocketAddr;
3use crate::util::check_socket_for_blocking;
4
5use std::fmt;
6use std::io;
7use std::net::Shutdown;
8use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
9use std::os::unix::net;
10use std::path::Path;
11use std::task::{ready, Context, Poll};
12
13cfg_io_util! {
14 use bytes::BufMut;
15}
16
17cfg_net_unix! {
18 /// An I/O object representing a Unix datagram socket.
19 ///
20 /// A socket can be either named (associated with a filesystem path) or
21 /// unnamed.
22 ///
23 /// This type does not provide a `split` method, because this functionality
24 /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
25 /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
26 /// is enough. This is because all of the methods take `&self` instead of
27 /// `&mut self`.
28 ///
29 /// **Note:** named sockets are persisted even after the object is dropped
30 /// and the program has exited, and cannot be reconnected. It is advised
31 /// that you either check for and unlink the existing socket if it exists,
32 /// or use a temporary file that is guaranteed to not already exist.
33 ///
34 /// [`Arc`]: std::sync::Arc
35 ///
36 /// # Examples
37 /// Using named sockets, associated with a filesystem path:
38 /// ```
39 /// # if cfg!(miri) { return } // No `socket` in miri.
40 /// # use std::error::Error;
41 /// # #[tokio::main]
42 /// # async fn main() -> Result<(), Box<dyn Error>> {
43 /// use tokio::net::UnixDatagram;
44 /// use tempfile::tempdir;
45 ///
46 /// // We use a temporary directory so that the socket
47 /// // files left by the bound sockets will get cleaned up.
48 /// let tmp = tempdir()?;
49 ///
50 /// // Bind each socket to a filesystem path
51 /// let tx_path = tmp.path().join("tx");
52 /// let tx = UnixDatagram::bind(&tx_path)?;
53 /// let rx_path = tmp.path().join("rx");
54 /// let rx = UnixDatagram::bind(&rx_path)?;
55 ///
56 /// let bytes = b"hello world";
57 /// tx.send_to(bytes, &rx_path).await?;
58 ///
59 /// let mut buf = vec![0u8; 24];
60 /// let (size, addr) = rx.recv_from(&mut buf).await?;
61 ///
62 /// let dgram = &buf[..size];
63 /// assert_eq!(dgram, bytes);
64 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
65 ///
66 /// # Ok(())
67 /// # }
68 /// ```
69 ///
70 /// Using unnamed sockets, created as a pair
71 /// ```
72 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
73 /// # use std::error::Error;
74 /// # #[tokio::main]
75 /// # async fn main() -> Result<(), Box<dyn Error>> {
76 /// use tokio::net::UnixDatagram;
77 ///
78 /// // Create the pair of sockets
79 /// let (sock1, sock2) = UnixDatagram::pair()?;
80 ///
81 /// // Since the sockets are paired, the paired send/recv
82 /// // functions can be used
83 /// let bytes = b"hello world";
84 /// sock1.send(bytes).await?;
85 ///
86 /// let mut buff = vec![0u8; 24];
87 /// let size = sock2.recv(&mut buff).await?;
88 ///
89 /// let dgram = &buff[..size];
90 /// assert_eq!(dgram, bytes);
91 ///
92 /// # Ok(())
93 /// # }
94 /// ```
95 #[cfg_attr(docsrs, doc(alias = "uds"))]
96 pub struct UnixDatagram {
97 io: PollEvented<mio::net::UnixDatagram>,
98 }
99}
100
101impl UnixDatagram {
102 pub(crate) fn from_mio(sys: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
103 let datagram = UnixDatagram::new(sys)?;
104
105 if let Some(e) = datagram.io.take_error()? {
106 return Err(e);
107 }
108
109 Ok(datagram)
110 }
111
112 /// Waits for any of the requested ready states.
113 ///
114 /// This function is usually paired with `try_recv()` or `try_send()`. It
115 /// can be used to concurrently `recv` / `send` to the same socket on a single
116 /// task without splitting the socket.
117 ///
118 /// The function may complete without the socket being ready. This is a
119 /// false-positive and attempting an operation will return with
120 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
121 /// [`Ready`] set, so you should always check the returned value and possibly
122 /// wait again if the requested states are not set.
123 ///
124 /// # Cancel safety
125 ///
126 /// This method is cancel safe. Once a readiness event occurs, the method
127 /// will continue to return immediately until the readiness event is
128 /// consumed by an attempt to read or write that fails with `WouldBlock` or
129 /// `Poll::Pending`.
130 ///
131 /// # Examples
132 ///
133 /// Concurrently receive from and send to the socket on the same task
134 /// without splitting.
135 ///
136 /// ```no_run
137 /// use tokio::io::Interest;
138 /// use tokio::net::UnixDatagram;
139 /// use std::io;
140 ///
141 /// #[tokio::main]
142 /// async fn main() -> io::Result<()> {
143 /// let dir = tempfile::tempdir().unwrap();
144 /// let client_path = dir.path().join("client.sock");
145 /// let server_path = dir.path().join("server.sock");
146 /// let socket = UnixDatagram::bind(&client_path)?;
147 /// socket.connect(&server_path)?;
148 ///
149 /// loop {
150 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
151 ///
152 /// if ready.is_readable() {
153 /// let mut data = [0; 1024];
154 /// match socket.try_recv(&mut data[..]) {
155 /// Ok(n) => {
156 /// println!("received {:?}", &data[..n]);
157 /// }
158 /// // False-positive, continue
159 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
160 /// Err(e) => {
161 /// return Err(e);
162 /// }
163 /// }
164 /// }
165 ///
166 /// if ready.is_writable() {
167 /// // Write some data
168 /// match socket.try_send(b"hello world") {
169 /// Ok(n) => {
170 /// println!("sent {} bytes", n);
171 /// }
172 /// // False-positive, continue
173 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
174 /// Err(e) => {
175 /// return Err(e);
176 /// }
177 /// }
178 /// }
179 /// }
180 /// }
181 /// ```
182 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
183 let event = self.io.registration().readiness(interest).await?;
184 Ok(event.ready)
185 }
186
187 /// Waits for the socket to become writable.
188 ///
189 /// This function is equivalent to `ready(Interest::WRITABLE)` and is
190 /// usually paired with `try_send()` or `try_send_to()`.
191 ///
192 /// The function may complete without the socket being writable. This is a
193 /// false-positive and attempting a `try_send()` will return with
194 /// `io::ErrorKind::WouldBlock`.
195 ///
196 /// # Cancel safety
197 ///
198 /// This method is cancel safe. Once a readiness event occurs, the method
199 /// will continue to return immediately until the readiness event is
200 /// consumed by an attempt to write that fails with `WouldBlock` or
201 /// `Poll::Pending`.
202 ///
203 /// # Examples
204 ///
205 /// ```no_run
206 /// use tokio::net::UnixDatagram;
207 /// use std::io;
208 ///
209 /// #[tokio::main]
210 /// async fn main() -> io::Result<()> {
211 /// let dir = tempfile::tempdir().unwrap();
212 /// let client_path = dir.path().join("client.sock");
213 /// let server_path = dir.path().join("server.sock");
214 /// let socket = UnixDatagram::bind(&client_path)?;
215 /// socket.connect(&server_path)?;
216 ///
217 /// loop {
218 /// // Wait for the socket to be writable
219 /// socket.writable().await?;
220 ///
221 /// // Try to send data, this may still fail with `WouldBlock`
222 /// // if the readiness event is a false positive.
223 /// match socket.try_send(b"hello world") {
224 /// Ok(n) => {
225 /// break;
226 /// }
227 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
228 /// continue;
229 /// }
230 /// Err(e) => {
231 /// return Err(e);
232 /// }
233 /// }
234 /// }
235 ///
236 /// Ok(())
237 /// }
238 /// ```
239 pub async fn writable(&self) -> io::Result<()> {
240 self.ready(Interest::WRITABLE).await?;
241 Ok(())
242 }
243
244 /// Polls for write/send readiness.
245 ///
246 /// If the socket is not currently ready for sending, this method will
247 /// store a clone of the `Waker` from the provided `Context`. When the socket
248 /// becomes ready for sending, `Waker::wake` will be called on the
249 /// waker.
250 ///
251 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
252 /// the `Waker` from the `Context` passed to the most recent call is
253 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
254 /// second, independent waker.)
255 ///
256 /// This function is intended for cases where creating and pinning a future
257 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
258 /// preferred, as this supports polling from multiple tasks at once.
259 ///
260 /// # Return value
261 ///
262 /// The function returns:
263 ///
264 /// * `Poll::Pending` if the socket is not ready for writing.
265 /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
266 /// * `Poll::Ready(Err(e))` if an error is encountered.
267 ///
268 /// # Errors
269 ///
270 /// This function may encounter any standard I/O error except `WouldBlock`.
271 ///
272 /// [`writable`]: method@Self::writable
273 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
274 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
275 }
276
277 /// Waits for the socket to become readable.
278 ///
279 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
280 /// paired with `try_recv()`.
281 ///
282 /// The function may complete without the socket being readable. This is a
283 /// false-positive and attempting a `try_recv()` will return with
284 /// `io::ErrorKind::WouldBlock`.
285 ///
286 /// # Cancel safety
287 ///
288 /// This method is cancel safe. Once a readiness event occurs, the method
289 /// will continue to return immediately until the readiness event is
290 /// consumed by an attempt to read that fails with `WouldBlock` or
291 /// `Poll::Pending`.
292 ///
293 /// # Examples
294 ///
295 /// ```no_run
296 /// use tokio::net::UnixDatagram;
297 /// use std::io;
298 ///
299 /// #[tokio::main]
300 /// async fn main() -> io::Result<()> {
301 /// // Connect to a peer
302 /// let dir = tempfile::tempdir().unwrap();
303 /// let client_path = dir.path().join("client.sock");
304 /// let server_path = dir.path().join("server.sock");
305 /// let socket = UnixDatagram::bind(&client_path)?;
306 /// socket.connect(&server_path)?;
307 ///
308 /// loop {
309 /// // Wait for the socket to be readable
310 /// socket.readable().await?;
311 ///
312 /// // The buffer is **not** included in the async task and will
313 /// // only exist on the stack.
314 /// let mut buf = [0; 1024];
315 ///
316 /// // Try to recv data, this may still fail with `WouldBlock`
317 /// // if the readiness event is a false positive.
318 /// match socket.try_recv(&mut buf) {
319 /// Ok(n) => {
320 /// println!("GOT {:?}", &buf[..n]);
321 /// break;
322 /// }
323 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
324 /// continue;
325 /// }
326 /// Err(e) => {
327 /// return Err(e);
328 /// }
329 /// }
330 /// }
331 ///
332 /// Ok(())
333 /// }
334 /// ```
335 pub async fn readable(&self) -> io::Result<()> {
336 self.ready(Interest::READABLE).await?;
337 Ok(())
338 }
339
340 /// Polls for read/receive readiness.
341 ///
342 /// If the socket is not currently ready for receiving, this method will
343 /// store a clone of the `Waker` from the provided `Context`. When the
344 /// socket becomes ready for reading, `Waker::wake` will be called on the
345 /// waker.
346 ///
347 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
348 /// `poll_peek`, only the `Waker` from the `Context` passed to the most
349 /// recent call is scheduled to receive a wakeup. (However,
350 /// `poll_send_ready` retains a second, independent waker.)
351 ///
352 /// This function is intended for cases where creating and pinning a future
353 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
354 /// preferred, as this supports polling from multiple tasks at once.
355 ///
356 /// # Return value
357 ///
358 /// The function returns:
359 ///
360 /// * `Poll::Pending` if the socket is not ready for reading.
361 /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
362 /// * `Poll::Ready(Err(e))` if an error is encountered.
363 ///
364 /// # Errors
365 ///
366 /// This function may encounter any standard I/O error except `WouldBlock`.
367 ///
368 /// [`readable`]: method@Self::readable
369 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
370 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
371 }
372
373 /// Creates a new `UnixDatagram` bound to the specified path.
374 ///
375 /// # Examples
376 /// ```
377 /// # if cfg!(miri) { return } // No `socket` in miri.
378 /// # use std::error::Error;
379 /// # #[tokio::main]
380 /// # async fn main() -> Result<(), Box<dyn Error>> {
381 /// use tokio::net::UnixDatagram;
382 /// use tempfile::tempdir;
383 ///
384 /// // We use a temporary directory so that the socket
385 /// // files left by the bound sockets will get cleaned up.
386 /// let tmp = tempdir()?;
387 ///
388 /// // Bind the socket to a filesystem path
389 /// let socket_path = tmp.path().join("socket");
390 /// let socket = UnixDatagram::bind(&socket_path)?;
391 ///
392 /// # Ok(())
393 /// # }
394 /// ```
395 pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
396 where
397 P: AsRef<Path>,
398 {
399 let socket = mio::net::UnixDatagram::bind(path)?;
400 UnixDatagram::new(socket)
401 }
402
403 /// Creates an unnamed pair of connected sockets.
404 ///
405 /// This function will create a pair of interconnected Unix sockets for
406 /// communicating back and forth between one another.
407 ///
408 /// # Examples
409 /// ```
410 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
411 /// # use std::error::Error;
412 /// # #[tokio::main]
413 /// # async fn main() -> Result<(), Box<dyn Error>> {
414 /// use tokio::net::UnixDatagram;
415 ///
416 /// // Create the pair of sockets
417 /// let (sock1, sock2) = UnixDatagram::pair()?;
418 ///
419 /// // Since the sockets are paired, the paired send/recv
420 /// // functions can be used
421 /// let bytes = b"hail eris";
422 /// sock1.send(bytes).await?;
423 ///
424 /// let mut buff = vec![0u8; 24];
425 /// let size = sock2.recv(&mut buff).await?;
426 ///
427 /// let dgram = &buff[..size];
428 /// assert_eq!(dgram, bytes);
429 ///
430 /// # Ok(())
431 /// # }
432 /// ```
433 pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
434 let (a, b) = mio::net::UnixDatagram::pair()?;
435 let a = UnixDatagram::new(a)?;
436 let b = UnixDatagram::new(b)?;
437
438 Ok((a, b))
439 }
440
441 /// Creates new [`UnixDatagram`] from a [`std::os::unix::net::UnixDatagram`].
442 ///
443 /// This function is intended to be used to wrap a `UnixDatagram` from the
444 /// standard library in the Tokio equivalent.
445 ///
446 /// # Notes
447 ///
448 /// The caller is responsible for ensuring that the socket is in
449 /// non-blocking mode. Otherwise all I/O operations on the socket
450 /// will block the thread, which will cause unexpected behavior.
451 /// Non-blocking mode can be set using [`set_nonblocking`].
452 ///
453 /// Passing a listener in blocking mode is always erroneous,
454 /// and the behavior in that case may change in the future.
455 /// For example, it could panic.
456 ///
457 /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
458 ///
459 /// # Panics
460 ///
461 /// This function panics if it is not called from within a runtime with
462 /// IO enabled.
463 ///
464 /// The runtime is usually set implicitly when this function is called
465 /// from a future driven by a Tokio runtime, otherwise runtime can be set
466 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
467 /// # Examples
468 /// ```
469 /// # if cfg!(miri) { return } // No `socket` in miri.
470 /// # use std::error::Error;
471 /// # #[tokio::main]
472 /// # async fn main() -> Result<(), Box<dyn Error>> {
473 /// use tokio::net::UnixDatagram;
474 /// use std::os::unix::net::UnixDatagram as StdUDS;
475 /// use tempfile::tempdir;
476 ///
477 /// // We use a temporary directory so that the socket
478 /// // files left by the bound sockets will get cleaned up.
479 /// let tmp = tempdir()?;
480 ///
481 /// // Bind the socket to a filesystem path
482 /// let socket_path = tmp.path().join("socket");
483 /// let std_socket = StdUDS::bind(&socket_path)?;
484 /// std_socket.set_nonblocking(true)?;
485 /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
486 ///
487 /// # Ok(())
488 /// # }
489 /// ```
490 #[track_caller]
491 pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
492 check_socket_for_blocking(&datagram)?;
493
494 let socket = mio::net::UnixDatagram::from_std(datagram);
495 let io = PollEvented::new(socket)?;
496 Ok(UnixDatagram { io })
497 }
498
499 /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
500 ///
501 /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
502 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
503 /// if needed.
504 ///
505 /// # Examples
506 ///
507 /// ```rust,no_run
508 /// # use std::error::Error;
509 /// # async fn dox() -> Result<(), Box<dyn Error>> {
510 /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
511 /// let std_socket = tokio_socket.into_std()?;
512 /// std_socket.set_nonblocking(false)?;
513 /// # Ok(())
514 /// # }
515 /// ```
516 ///
517 /// [`tokio::net::UnixDatagram`]: UnixDatagram
518 /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
519 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
520 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
521 self.io
522 .into_inner()
523 .map(IntoRawFd::into_raw_fd)
524 .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
525 }
526
527 fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
528 let io = PollEvented::new(socket)?;
529 Ok(UnixDatagram { io })
530 }
531
532 /// Creates a new `UnixDatagram` which is not bound to any address.
533 ///
534 /// # Examples
535 /// ```
536 /// # if cfg!(miri) { return } // No `socket` in miri.
537 /// # use std::error::Error;
538 /// # #[tokio::main]
539 /// # async fn main() -> Result<(), Box<dyn Error>> {
540 /// use tokio::net::UnixDatagram;
541 /// use tempfile::tempdir;
542 ///
543 /// // Create an unbound socket
544 /// let tx = UnixDatagram::unbound()?;
545 ///
546 /// // Create another, bound socket
547 /// let tmp = tempdir()?;
548 /// let rx_path = tmp.path().join("rx");
549 /// let rx = UnixDatagram::bind(&rx_path)?;
550 ///
551 /// // Send to the bound socket
552 /// let bytes = b"hello world";
553 /// tx.send_to(bytes, &rx_path).await?;
554 ///
555 /// let mut buf = vec![0u8; 24];
556 /// let (size, addr) = rx.recv_from(&mut buf).await?;
557 ///
558 /// let dgram = &buf[..size];
559 /// assert_eq!(dgram, bytes);
560 ///
561 /// # Ok(())
562 /// # }
563 /// ```
564 pub fn unbound() -> io::Result<UnixDatagram> {
565 let socket = mio::net::UnixDatagram::unbound()?;
566 UnixDatagram::new(socket)
567 }
568
569 /// Connects the socket to the specified address.
570 ///
571 /// The `send` method may be used to send data to the specified address.
572 /// `recv` and `recv_from` will only receive data from that address.
573 ///
574 /// # Examples
575 /// ```
576 /// # if cfg!(miri) { return } // No `socket` in miri.
577 /// # use std::error::Error;
578 /// # #[tokio::main]
579 /// # async fn main() -> Result<(), Box<dyn Error>> {
580 /// use tokio::net::UnixDatagram;
581 /// use tempfile::tempdir;
582 ///
583 /// // Create an unbound socket
584 /// let tx = UnixDatagram::unbound()?;
585 ///
586 /// // Create another, bound socket
587 /// let tmp = tempdir()?;
588 /// let rx_path = tmp.path().join("rx");
589 /// let rx = UnixDatagram::bind(&rx_path)?;
590 ///
591 /// // Connect to the bound socket
592 /// tx.connect(&rx_path)?;
593 ///
594 /// // Send to the bound socket
595 /// let bytes = b"hello world";
596 /// tx.send(bytes).await?;
597 ///
598 /// let mut buf = vec![0u8; 24];
599 /// let (size, addr) = rx.recv_from(&mut buf).await?;
600 ///
601 /// let dgram = &buf[..size];
602 /// assert_eq!(dgram, bytes);
603 ///
604 /// # Ok(())
605 /// # }
606 /// ```
607 pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
608 self.io.connect(path)
609 }
610
611 /// Sends data on the socket to the socket's peer.
612 ///
613 /// # Cancel safety
614 ///
615 /// This method is cancel safe. If `send` is used as the event in a
616 /// [`tokio::select!`](crate::select) statement and some other branch
617 /// completes first, then it is guaranteed that the message was not sent.
618 ///
619 /// # Examples
620 /// ```
621 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
622 /// # use std::error::Error;
623 /// # #[tokio::main]
624 /// # async fn main() -> Result<(), Box<dyn Error>> {
625 /// use tokio::net::UnixDatagram;
626 ///
627 /// // Create the pair of sockets
628 /// let (sock1, sock2) = UnixDatagram::pair()?;
629 ///
630 /// // Since the sockets are paired, the paired send/recv
631 /// // functions can be used
632 /// let bytes = b"hello world";
633 /// sock1.send(bytes).await?;
634 ///
635 /// let mut buff = vec![0u8; 24];
636 /// let size = sock2.recv(&mut buff).await?;
637 ///
638 /// let dgram = &buff[..size];
639 /// assert_eq!(dgram, bytes);
640 ///
641 /// # Ok(())
642 /// # }
643 /// ```
644 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
645 self.io
646 .registration()
647 .async_io(Interest::WRITABLE, || self.io.send(buf))
648 .await
649 }
650
651 /// Tries to send a datagram to the peer without waiting.
652 ///
653 /// # Examples
654 ///
655 /// ```no_run
656 /// use tokio::net::UnixDatagram;
657 /// use std::io;
658 ///
659 /// #[tokio::main]
660 /// async fn main() -> io::Result<()> {
661 /// let dir = tempfile::tempdir().unwrap();
662 /// let client_path = dir.path().join("client.sock");
663 /// let server_path = dir.path().join("server.sock");
664 /// let socket = UnixDatagram::bind(&client_path)?;
665 /// socket.connect(&server_path)?;
666 ///
667 /// loop {
668 /// // Wait for the socket to be writable
669 /// socket.writable().await?;
670 ///
671 /// // Try to send data, this may still fail with `WouldBlock`
672 /// // if the readiness event is a false positive.
673 /// match socket.try_send(b"hello world") {
674 /// Ok(n) => {
675 /// break;
676 /// }
677 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
678 /// continue;
679 /// }
680 /// Err(e) => {
681 /// return Err(e);
682 /// }
683 /// }
684 /// }
685 ///
686 /// Ok(())
687 /// }
688 /// ```
689 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
690 self.io
691 .registration()
692 .try_io(Interest::WRITABLE, || self.io.send(buf))
693 }
694
695 /// Tries to send a datagram to the peer without waiting.
696 ///
697 /// # Examples
698 ///
699 /// ```no_run
700 /// use tokio::net::UnixDatagram;
701 /// use std::io;
702 ///
703 /// #[tokio::main]
704 /// async fn main() -> io::Result<()> {
705 /// let dir = tempfile::tempdir().unwrap();
706 /// let client_path = dir.path().join("client.sock");
707 /// let server_path = dir.path().join("server.sock");
708 /// let socket = UnixDatagram::bind(&client_path)?;
709 ///
710 /// loop {
711 /// // Wait for the socket to be writable
712 /// socket.writable().await?;
713 ///
714 /// // Try to send data, this may still fail with `WouldBlock`
715 /// // if the readiness event is a false positive.
716 /// match socket.try_send_to(b"hello world", &server_path) {
717 /// Ok(n) => {
718 /// break;
719 /// }
720 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
721 /// continue;
722 /// }
723 /// Err(e) => {
724 /// return Err(e);
725 /// }
726 /// }
727 /// }
728 ///
729 /// Ok(())
730 /// }
731 /// ```
732 pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
733 where
734 P: AsRef<Path>,
735 {
736 self.io
737 .registration()
738 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
739 }
740
741 /// Receives data from the socket.
742 ///
743 /// # Cancel safety
744 ///
745 /// This method is cancel safe. If `recv` is used as the event in a
746 /// [`tokio::select!`](crate::select) statement and some other branch
747 /// completes first, it is guaranteed that no messages were received on this
748 /// socket.
749 ///
750 /// # Examples
751 /// ```
752 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
753 /// # use std::error::Error;
754 /// # #[tokio::main]
755 /// # async fn main() -> Result<(), Box<dyn Error>> {
756 /// use tokio::net::UnixDatagram;
757 ///
758 /// // Create the pair of sockets
759 /// let (sock1, sock2) = UnixDatagram::pair()?;
760 ///
761 /// // Since the sockets are paired, the paired send/recv
762 /// // functions can be used
763 /// let bytes = b"hello world";
764 /// sock1.send(bytes).await?;
765 ///
766 /// let mut buff = vec![0u8; 24];
767 /// let size = sock2.recv(&mut buff).await?;
768 ///
769 /// let dgram = &buff[..size];
770 /// assert_eq!(dgram, bytes);
771 ///
772 /// # Ok(())
773 /// # }
774 /// ```
775 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
776 self.io
777 .registration()
778 .async_io(Interest::READABLE, || self.io.recv(buf))
779 .await
780 }
781
782 /// Tries to receive a datagram from the peer without waiting.
783 ///
784 /// # Examples
785 ///
786 /// ```no_run
787 /// use tokio::net::UnixDatagram;
788 /// use std::io;
789 ///
790 /// #[tokio::main]
791 /// async fn main() -> io::Result<()> {
792 /// // Connect to a peer
793 /// let dir = tempfile::tempdir().unwrap();
794 /// let client_path = dir.path().join("client.sock");
795 /// let server_path = dir.path().join("server.sock");
796 /// let socket = UnixDatagram::bind(&client_path)?;
797 /// socket.connect(&server_path)?;
798 ///
799 /// loop {
800 /// // Wait for the socket to be readable
801 /// socket.readable().await?;
802 ///
803 /// // The buffer is **not** included in the async task and will
804 /// // only exist on the stack.
805 /// let mut buf = [0; 1024];
806 ///
807 /// // Try to recv data, this may still fail with `WouldBlock`
808 /// // if the readiness event is a false positive.
809 /// match socket.try_recv(&mut buf) {
810 /// Ok(n) => {
811 /// println!("GOT {:?}", &buf[..n]);
812 /// break;
813 /// }
814 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
815 /// continue;
816 /// }
817 /// Err(e) => {
818 /// return Err(e);
819 /// }
820 /// }
821 /// }
822 ///
823 /// Ok(())
824 /// }
825 /// ```
826 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
827 self.io
828 .registration()
829 .try_io(Interest::READABLE, || self.io.recv(buf))
830 }
831
832 cfg_io_util! {
833 /// Tries to receive data from the socket without waiting.
834 ///
835 /// This method can be used even if `buf` is uninitialized.
836 ///
837 /// # Examples
838 ///
839 /// ```no_run
840 /// use tokio::net::UnixDatagram;
841 /// use std::io;
842 ///
843 /// #[tokio::main]
844 /// async fn main() -> io::Result<()> {
845 /// // Connect to a peer
846 /// let dir = tempfile::tempdir().unwrap();
847 /// let client_path = dir.path().join("client.sock");
848 /// let server_path = dir.path().join("server.sock");
849 /// let socket = UnixDatagram::bind(&client_path)?;
850 ///
851 /// loop {
852 /// // Wait for the socket to be readable
853 /// socket.readable().await?;
854 ///
855 /// let mut buf = Vec::with_capacity(1024);
856 ///
857 /// // Try to recv data, this may still fail with `WouldBlock`
858 /// // if the readiness event is a false positive.
859 /// match socket.try_recv_buf_from(&mut buf) {
860 /// Ok((n, _addr)) => {
861 /// println!("GOT {:?}", &buf[..n]);
862 /// break;
863 /// }
864 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
865 /// continue;
866 /// }
867 /// Err(e) => {
868 /// return Err(e);
869 /// }
870 /// }
871 /// }
872 ///
873 /// Ok(())
874 /// }
875 /// ```
876 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
877 let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
878 let dst = buf.chunk_mut();
879 let dst =
880 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
881
882 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
883 // buffer.
884 let (n, addr) = (*self.io).recv_from(dst)?;
885
886 unsafe {
887 buf.advance_mut(n);
888 }
889
890 Ok((n, addr))
891 })?;
892
893 Ok((n, SocketAddr(addr)))
894 }
895
896 /// Receives from the socket, advances the
897 /// buffer's internal cursor and returns how many bytes were read and the origin.
898 ///
899 /// This method can be used even if `buf` is uninitialized.
900 ///
901 /// # Examples
902 /// ```
903 /// # if cfg!(miri) { return } // No `socket` in miri.
904 /// # use std::error::Error;
905 /// # #[tokio::main]
906 /// # async fn main() -> Result<(), Box<dyn Error>> {
907 /// use tokio::net::UnixDatagram;
908 /// use tempfile::tempdir;
909 ///
910 /// // We use a temporary directory so that the socket
911 /// // files left by the bound sockets will get cleaned up.
912 /// let tmp = tempdir()?;
913 ///
914 /// // Bind each socket to a filesystem path
915 /// let tx_path = tmp.path().join("tx");
916 /// let tx = UnixDatagram::bind(&tx_path)?;
917 /// let rx_path = tmp.path().join("rx");
918 /// let rx = UnixDatagram::bind(&rx_path)?;
919 ///
920 /// let bytes = b"hello world";
921 /// tx.send_to(bytes, &rx_path).await?;
922 ///
923 /// let mut buf = Vec::with_capacity(24);
924 /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
925 ///
926 /// let dgram = &buf[..size];
927 /// assert_eq!(dgram, bytes);
928 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
929 ///
930 /// # Ok(())
931 /// # }
932 /// ```
933 pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
934 self.io.registration().async_io(Interest::READABLE, || {
935 let dst = buf.chunk_mut();
936 let dst =
937 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
938
939 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
940 // buffer.
941 let (n, addr) = (*self.io).recv_from(dst)?;
942
943 unsafe {
944 buf.advance_mut(n);
945 }
946 Ok((n,SocketAddr(addr)))
947 }).await
948 }
949
950 /// Tries to read data from the stream into the provided buffer, advancing the
951 /// buffer's internal cursor, returning how many bytes were read.
952 ///
953 /// This method can be used even if `buf` is uninitialized.
954 ///
955 /// # Examples
956 ///
957 /// ```no_run
958 /// use tokio::net::UnixDatagram;
959 /// use std::io;
960 ///
961 /// #[tokio::main]
962 /// async fn main() -> io::Result<()> {
963 /// // Connect to a peer
964 /// let dir = tempfile::tempdir().unwrap();
965 /// let client_path = dir.path().join("client.sock");
966 /// let server_path = dir.path().join("server.sock");
967 /// let socket = UnixDatagram::bind(&client_path)?;
968 /// socket.connect(&server_path)?;
969 ///
970 /// loop {
971 /// // Wait for the socket to be readable
972 /// socket.readable().await?;
973 ///
974 /// let mut buf = Vec::with_capacity(1024);
975 ///
976 /// // Try to recv data, this may still fail with `WouldBlock`
977 /// // if the readiness event is a false positive.
978 /// match socket.try_recv_buf(&mut buf) {
979 /// Ok(n) => {
980 /// println!("GOT {:?}", &buf[..n]);
981 /// break;
982 /// }
983 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
984 /// continue;
985 /// }
986 /// Err(e) => {
987 /// return Err(e);
988 /// }
989 /// }
990 /// }
991 ///
992 /// Ok(())
993 /// }
994 /// ```
995 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
996 self.io.registration().try_io(Interest::READABLE, || {
997 let dst = buf.chunk_mut();
998 let dst =
999 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1000
1001 // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
1002 // buffer.
1003 let n = (*self.io).recv(dst)?;
1004
1005 unsafe {
1006 buf.advance_mut(n);
1007 }
1008
1009 Ok(n)
1010 })
1011 }
1012
1013 /// Receives data from the socket from the address to which it is connected,
1014 /// advancing the buffer's internal cursor, returning how many bytes were read.
1015 ///
1016 /// This method can be used even if `buf` is uninitialized.
1017 ///
1018 /// # Examples
1019 /// ```
1020 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1021 /// # use std::error::Error;
1022 /// # #[tokio::main]
1023 /// # async fn main() -> Result<(), Box<dyn Error>> {
1024 /// use tokio::net::UnixDatagram;
1025 ///
1026 /// // Create the pair of sockets
1027 /// let (sock1, sock2) = UnixDatagram::pair()?;
1028 ///
1029 /// // Since the sockets are paired, the paired send/recv
1030 /// // functions can be used
1031 /// let bytes = b"hello world";
1032 /// sock1.send(bytes).await?;
1033 ///
1034 /// let mut buff = Vec::with_capacity(24);
1035 /// let size = sock2.recv_buf(&mut buff).await?;
1036 ///
1037 /// let dgram = &buff[..size];
1038 /// assert_eq!(dgram, bytes);
1039 ///
1040 /// # Ok(())
1041 /// # }
1042 /// ```
1043 pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1044 self.io.registration().async_io(Interest::READABLE, || {
1045 let dst = buf.chunk_mut();
1046 let dst =
1047 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1048
1049 // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
1050 // buffer.
1051 let n = (*self.io).recv(dst)?;
1052
1053 unsafe {
1054 buf.advance_mut(n);
1055 }
1056 Ok(n)
1057 }).await
1058 }
1059 }
1060
1061 /// Sends data on the socket to the specified address.
1062 ///
1063 /// # Cancel safety
1064 ///
1065 /// This method is cancel safe. If `send_to` is used as the event in a
1066 /// [`tokio::select!`](crate::select) statement and some other branch
1067 /// completes first, then it is guaranteed that the message was not sent.
1068 ///
1069 /// # Examples
1070 /// ```
1071 /// # if cfg!(miri) { return } // No `socket` in miri.
1072 /// # use std::error::Error;
1073 /// # #[tokio::main]
1074 /// # async fn main() -> Result<(), Box<dyn Error>> {
1075 /// use tokio::net::UnixDatagram;
1076 /// use tempfile::tempdir;
1077 ///
1078 /// // We use a temporary directory so that the socket
1079 /// // files left by the bound sockets will get cleaned up.
1080 /// let tmp = tempdir()?;
1081 ///
1082 /// // Bind each socket to a filesystem path
1083 /// let tx_path = tmp.path().join("tx");
1084 /// let tx = UnixDatagram::bind(&tx_path)?;
1085 /// let rx_path = tmp.path().join("rx");
1086 /// let rx = UnixDatagram::bind(&rx_path)?;
1087 ///
1088 /// let bytes = b"hello world";
1089 /// tx.send_to(bytes, &rx_path).await?;
1090 ///
1091 /// let mut buf = vec![0u8; 24];
1092 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1093 ///
1094 /// let dgram = &buf[..size];
1095 /// assert_eq!(dgram, bytes);
1096 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1097 ///
1098 /// # Ok(())
1099 /// # }
1100 /// ```
1101 pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
1102 where
1103 P: AsRef<Path>,
1104 {
1105 self.io
1106 .registration()
1107 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
1108 .await
1109 }
1110
1111 /// Receives data from the socket.
1112 ///
1113 /// # Cancel safety
1114 ///
1115 /// This method is cancel safe. If `recv_from` is used as the event in a
1116 /// [`tokio::select!`](crate::select) statement and some other branch
1117 /// completes first, it is guaranteed that no messages were received on this
1118 /// socket.
1119 ///
1120 /// # Examples
1121 /// ```
1122 /// # if cfg!(miri) { return } // No `socket` in miri.
1123 /// # use std::error::Error;
1124 /// # #[tokio::main]
1125 /// # async fn main() -> Result<(), Box<dyn Error>> {
1126 /// use tokio::net::UnixDatagram;
1127 /// use tempfile::tempdir;
1128 ///
1129 /// // We use a temporary directory so that the socket
1130 /// // files left by the bound sockets will get cleaned up.
1131 /// let tmp = tempdir()?;
1132 ///
1133 /// // Bind each socket to a filesystem path
1134 /// let tx_path = tmp.path().join("tx");
1135 /// let tx = UnixDatagram::bind(&tx_path)?;
1136 /// let rx_path = tmp.path().join("rx");
1137 /// let rx = UnixDatagram::bind(&rx_path)?;
1138 ///
1139 /// let bytes = b"hello world";
1140 /// tx.send_to(bytes, &rx_path).await?;
1141 ///
1142 /// let mut buf = vec![0u8; 24];
1143 /// let (size, addr) = rx.recv_from(&mut buf).await?;
1144 ///
1145 /// let dgram = &buf[..size];
1146 /// assert_eq!(dgram, bytes);
1147 /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
1148 ///
1149 /// # Ok(())
1150 /// # }
1151 /// ```
1152 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1153 let (n, addr) = self
1154 .io
1155 .registration()
1156 .async_io(Interest::READABLE, || self.io.recv_from(buf))
1157 .await?;
1158
1159 Ok((n, SocketAddr(addr)))
1160 }
1161
1162 /// Attempts to receive a single datagram on the specified address.
1163 ///
1164 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1165 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1166 /// receive a wakeup.
1167 ///
1168 /// # Return value
1169 ///
1170 /// The function returns:
1171 ///
1172 /// * `Poll::Pending` if the socket is not ready to read
1173 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1174 /// * `Poll::Ready(Err(e))` if an error is encountered.
1175 ///
1176 /// # Errors
1177 ///
1178 /// This function may encounter any standard I/O error except `WouldBlock`.
1179 pub fn poll_recv_from(
1180 &self,
1181 cx: &mut Context<'_>,
1182 buf: &mut ReadBuf<'_>,
1183 ) -> Poll<io::Result<SocketAddr>> {
1184 #[allow(clippy::blocks_in_conditions)]
1185 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1186 // Safety: will not read the maybe uninitialized bytes.
1187 let b = unsafe {
1188 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1189 };
1190
1191 self.io.recv_from(b)
1192 }))?;
1193
1194 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1195 unsafe {
1196 buf.assume_init(n);
1197 }
1198 buf.advance(n);
1199 Poll::Ready(Ok(SocketAddr(addr)))
1200 }
1201
1202 /// Attempts to send data to the specified address.
1203 ///
1204 /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1205 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1206 /// receive a wakeup.
1207 ///
1208 /// # Return value
1209 ///
1210 /// The function returns:
1211 ///
1212 /// * `Poll::Pending` if the socket is not ready to write
1213 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1214 /// * `Poll::Ready(Err(e))` if an error is encountered.
1215 ///
1216 /// # Errors
1217 ///
1218 /// This function may encounter any standard I/O error except `WouldBlock`.
1219 pub fn poll_send_to<P>(
1220 &self,
1221 cx: &mut Context<'_>,
1222 buf: &[u8],
1223 target: P,
1224 ) -> Poll<io::Result<usize>>
1225 where
1226 P: AsRef<Path>,
1227 {
1228 self.io
1229 .registration()
1230 .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
1231 }
1232
1233 /// Attempts to send data on the socket to the remote address to which it
1234 /// was previously `connect`ed.
1235 ///
1236 /// The [`connect`] method will connect this socket to a remote address.
1237 /// This method will fail if the socket is not connected.
1238 ///
1239 /// Note that on multiple calls to a `poll_*` method in the send direction,
1240 /// only the `Waker` from the `Context` passed to the most recent call will
1241 /// be scheduled to receive a wakeup.
1242 ///
1243 /// # Return value
1244 ///
1245 /// The function returns:
1246 ///
1247 /// * `Poll::Pending` if the socket is not available to write
1248 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
1249 /// * `Poll::Ready(Err(e))` if an error is encountered.
1250 ///
1251 /// # Errors
1252 ///
1253 /// This function may encounter any standard I/O error except `WouldBlock`.
1254 ///
1255 /// [`connect`]: method@Self::connect
1256 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
1257 self.io
1258 .registration()
1259 .poll_write_io(cx, || self.io.send(buf))
1260 }
1261
1262 /// Attempts to receive a single datagram message on the socket from the remote
1263 /// address to which it is `connect`ed.
1264 ///
1265 /// The [`connect`] method will connect this socket to a remote address. This method
1266 /// resolves to an error if the socket is not connected.
1267 ///
1268 /// Note that on multiple calls to a `poll_*` method in the `recv` direction, only the
1269 /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1270 /// receive a wakeup.
1271 ///
1272 /// # Return value
1273 ///
1274 /// The function returns:
1275 ///
1276 /// * `Poll::Pending` if the socket is not ready to read
1277 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
1278 /// * `Poll::Ready(Err(e))` if an error is encountered.
1279 ///
1280 /// # Errors
1281 ///
1282 /// This function may encounter any standard I/O error except `WouldBlock`.
1283 ///
1284 /// [`connect`]: method@Self::connect
1285 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
1286 #[allow(clippy::blocks_in_conditions)]
1287 let n = ready!(self.io.registration().poll_read_io(cx, || {
1288 // Safety: will not read the maybe uninitialized bytes.
1289 let b = unsafe {
1290 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1291 };
1292
1293 self.io.recv(b)
1294 }))?;
1295
1296 // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1297 unsafe {
1298 buf.assume_init(n);
1299 }
1300 buf.advance(n);
1301 Poll::Ready(Ok(()))
1302 }
1303
1304 /// Tries to receive data from the socket without waiting.
1305 ///
1306 /// # Examples
1307 ///
1308 /// ```no_run
1309 /// use tokio::net::UnixDatagram;
1310 /// use std::io;
1311 ///
1312 /// #[tokio::main]
1313 /// async fn main() -> io::Result<()> {
1314 /// // Connect to a peer
1315 /// let dir = tempfile::tempdir().unwrap();
1316 /// let client_path = dir.path().join("client.sock");
1317 /// let server_path = dir.path().join("server.sock");
1318 /// let socket = UnixDatagram::bind(&client_path)?;
1319 ///
1320 /// loop {
1321 /// // Wait for the socket to be readable
1322 /// socket.readable().await?;
1323 ///
1324 /// // The buffer is **not** included in the async task and will
1325 /// // only exist on the stack.
1326 /// let mut buf = [0; 1024];
1327 ///
1328 /// // Try to recv data, this may still fail with `WouldBlock`
1329 /// // if the readiness event is a false positive.
1330 /// match socket.try_recv_from(&mut buf) {
1331 /// Ok((n, _addr)) => {
1332 /// println!("GOT {:?}", &buf[..n]);
1333 /// break;
1334 /// }
1335 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1336 /// continue;
1337 /// }
1338 /// Err(e) => {
1339 /// return Err(e);
1340 /// }
1341 /// }
1342 /// }
1343 ///
1344 /// Ok(())
1345 /// }
1346 /// ```
1347 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1348 let (n, addr) = self
1349 .io
1350 .registration()
1351 .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
1352
1353 Ok((n, SocketAddr(addr)))
1354 }
1355
1356 /// Tries to read or write from the socket using a user-provided IO operation.
1357 ///
1358 /// If the socket is ready, the provided closure is called. The closure
1359 /// should attempt to perform IO operation on the socket by manually
1360 /// calling the appropriate syscall. If the operation fails because the
1361 /// socket is not actually ready, then the closure should return a
1362 /// `WouldBlock` error and the readiness flag is cleared. The return value
1363 /// of the closure is then returned by `try_io`.
1364 ///
1365 /// If the socket is not ready, then the closure is not called
1366 /// and a `WouldBlock` error is returned.
1367 ///
1368 /// The closure should only return a `WouldBlock` error if it has performed
1369 /// an IO operation on the socket that failed due to the socket not being
1370 /// ready. Returning a `WouldBlock` error in any other situation will
1371 /// incorrectly clear the readiness flag, which can cause the socket to
1372 /// behave incorrectly.
1373 ///
1374 /// The closure should not perform the IO operation using any of the methods
1375 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1376 /// readiness flag and can cause the socket to behave incorrectly.
1377 ///
1378 /// This method is not intended to be used with combined interests.
1379 /// The closure should perform only one type of IO operation, so it should not
1380 /// require more than one ready state. This method may panic or sleep forever
1381 /// if it is called with a combined interest.
1382 ///
1383 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1384 ///
1385 /// [`readable()`]: UnixDatagram::readable()
1386 /// [`writable()`]: UnixDatagram::writable()
1387 /// [`ready()`]: UnixDatagram::ready()
1388 pub fn try_io<R>(
1389 &self,
1390 interest: Interest,
1391 f: impl FnOnce() -> io::Result<R>,
1392 ) -> io::Result<R> {
1393 self.io
1394 .registration()
1395 .try_io(interest, || self.io.try_io(f))
1396 }
1397
1398 /// Reads or writes from the socket using a user-provided IO operation.
1399 ///
1400 /// The readiness of the socket is awaited and when the socket is ready,
1401 /// the provided closure is called. The closure should attempt to perform
1402 /// IO operation on the socket by manually calling the appropriate syscall.
1403 /// If the operation fails because the socket is not actually ready,
1404 /// then the closure should return a `WouldBlock` error. In such case the
1405 /// readiness flag is cleared and the socket readiness is awaited again.
1406 /// This loop is repeated until the closure returns an `Ok` or an error
1407 /// other than `WouldBlock`.
1408 ///
1409 /// The closure should only return a `WouldBlock` error if it has performed
1410 /// an IO operation on the socket that failed due to the socket not being
1411 /// ready. Returning a `WouldBlock` error in any other situation will
1412 /// incorrectly clear the readiness flag, which can cause the socket to
1413 /// behave incorrectly.
1414 ///
1415 /// The closure should not perform the IO operation using any of the methods
1416 /// defined on the Tokio `UnixDatagram` type, as this will mess with the
1417 /// readiness flag and can cause the socket to behave incorrectly.
1418 ///
1419 /// This method is not intended to be used with combined interests.
1420 /// The closure should perform only one type of IO operation, so it should not
1421 /// require more than one ready state. This method may panic or sleep forever
1422 /// if it is called with a combined interest.
1423 pub async fn async_io<R>(
1424 &self,
1425 interest: Interest,
1426 mut f: impl FnMut() -> io::Result<R>,
1427 ) -> io::Result<R> {
1428 self.io
1429 .registration()
1430 .async_io(interest, || self.io.try_io(&mut f))
1431 .await
1432 }
1433
1434 /// Returns the local address that this socket is bound to.
1435 ///
1436 /// # Examples
1437 /// For a socket bound to a local path
1438 /// ```
1439 /// # if cfg!(miri) { return } // No `socket` in miri.
1440 /// # use std::error::Error;
1441 /// # #[tokio::main]
1442 /// # async fn main() -> Result<(), Box<dyn Error>> {
1443 /// use tokio::net::UnixDatagram;
1444 /// use tempfile::tempdir;
1445 ///
1446 /// // We use a temporary directory so that the socket
1447 /// // files left by the bound sockets will get cleaned up.
1448 /// let tmp = tempdir()?;
1449 ///
1450 /// // Bind socket to a filesystem path
1451 /// let socket_path = tmp.path().join("socket");
1452 /// let socket = UnixDatagram::bind(&socket_path)?;
1453 ///
1454 /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
1455 ///
1456 /// # Ok(())
1457 /// # }
1458 /// ```
1459 ///
1460 /// For an unbound socket
1461 /// ```
1462 /// # if cfg!(miri) { return } // No `socket` in miri.
1463 /// # use std::error::Error;
1464 /// # #[tokio::main]
1465 /// # async fn main() -> Result<(), Box<dyn Error>> {
1466 /// use tokio::net::UnixDatagram;
1467 ///
1468 /// // Create an unbound socket
1469 /// let socket = UnixDatagram::unbound()?;
1470 ///
1471 /// assert!(socket.local_addr()?.is_unnamed());
1472 ///
1473 /// # Ok(())
1474 /// # }
1475 /// ```
1476 pub fn local_addr(&self) -> io::Result<SocketAddr> {
1477 self.io.local_addr().map(SocketAddr)
1478 }
1479
1480 /// Returns the address of this socket's peer.
1481 ///
1482 /// The `connect` method will connect the socket to a peer.
1483 ///
1484 /// # Examples
1485 /// For a peer with a local path
1486 /// ```
1487 /// # if cfg!(miri) { return } // No `socket` in miri.
1488 /// # use std::error::Error;
1489 /// # #[tokio::main]
1490 /// # async fn main() -> Result<(), Box<dyn Error>> {
1491 /// use tokio::net::UnixDatagram;
1492 /// use tempfile::tempdir;
1493 ///
1494 /// // Create an unbound socket
1495 /// let tx = UnixDatagram::unbound()?;
1496 ///
1497 /// // Create another, bound socket
1498 /// let tmp = tempdir()?;
1499 /// let rx_path = tmp.path().join("rx");
1500 /// let rx = UnixDatagram::bind(&rx_path)?;
1501 ///
1502 /// // Connect to the bound socket
1503 /// tx.connect(&rx_path)?;
1504 ///
1505 /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
1506 ///
1507 /// # Ok(())
1508 /// # }
1509 /// ```
1510 ///
1511 /// For an unbound peer
1512 /// ```
1513 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1514 /// # use std::error::Error;
1515 /// # #[tokio::main]
1516 /// # async fn main() -> Result<(), Box<dyn Error>> {
1517 /// use tokio::net::UnixDatagram;
1518 ///
1519 /// // Create the pair of sockets
1520 /// let (sock1, sock2) = UnixDatagram::pair()?;
1521 ///
1522 /// assert!(sock1.peer_addr()?.is_unnamed());
1523 ///
1524 /// # Ok(())
1525 /// # }
1526 /// ```
1527 pub fn peer_addr(&self) -> io::Result<SocketAddr> {
1528 self.io.peer_addr().map(SocketAddr)
1529 }
1530
1531 /// Returns the value of the `SO_ERROR` option.
1532 ///
1533 /// # Examples
1534 /// ```
1535 /// # if cfg!(miri) { return } // No `socket` in miri.
1536 /// # use std::error::Error;
1537 /// # #[tokio::main]
1538 /// # async fn main() -> Result<(), Box<dyn Error>> {
1539 /// use tokio::net::UnixDatagram;
1540 ///
1541 /// // Create an unbound socket
1542 /// let socket = UnixDatagram::unbound()?;
1543 ///
1544 /// if let Ok(Some(err)) = socket.take_error() {
1545 /// println!("Got error: {:?}", err);
1546 /// }
1547 ///
1548 /// # Ok(())
1549 /// # }
1550 /// ```
1551 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1552 self.io.take_error()
1553 }
1554
1555 /// Shuts down the read, write, or both halves of this connection.
1556 ///
1557 /// This function will cause all pending and future I/O calls on the
1558 /// specified portions to immediately return with an appropriate value
1559 /// (see the documentation of `Shutdown`).
1560 ///
1561 /// # Examples
1562 /// ```
1563 /// # if cfg!(miri) { return } // No SOCK_DGRAM for `socketpair` in miri.
1564 /// # use std::error::Error;
1565 /// # #[tokio::main]
1566 /// # async fn main() -> Result<(), Box<dyn Error>> {
1567 /// use tokio::net::UnixDatagram;
1568 /// use std::net::Shutdown;
1569 ///
1570 /// // Create an unbound socket
1571 /// let (socket, other) = UnixDatagram::pair()?;
1572 ///
1573 /// socket.shutdown(Shutdown::Both)?;
1574 ///
1575 /// // NOTE: the following commented out code does NOT work as expected.
1576 /// // Due to an underlying issue, the recv call will block indefinitely.
1577 /// // See: https://github.com/tokio-rs/tokio/issues/1679
1578 /// //let mut buff = vec![0u8; 24];
1579 /// //let size = socket.recv(&mut buff).await?;
1580 /// //assert_eq!(size, 0);
1581 ///
1582 /// let send_result = socket.send(b"hello world").await;
1583 /// assert!(send_result.is_err());
1584 ///
1585 /// # Ok(())
1586 /// # }
1587 /// ```
1588 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
1589 self.io.shutdown(how)
1590 }
1591}
1592
1593impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
1594 type Error = io::Error;
1595
1596 /// Consumes stream, returning the Tokio I/O object.
1597 ///
1598 /// This is equivalent to
1599 /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
1600 fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
1601 Self::from_std(stream)
1602 }
1603}
1604
1605impl fmt::Debug for UnixDatagram {
1606 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1607 self.io.fmt(f)
1608 }
1609}
1610
1611impl AsRawFd for UnixDatagram {
1612 fn as_raw_fd(&self) -> RawFd {
1613 self.io.as_raw_fd()
1614 }
1615}
1616
1617impl AsFd for UnixDatagram {
1618 fn as_fd(&self) -> BorrowedFd<'_> {
1619 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1620 }
1621}