tokio/runtime/io/
registration.rs

1#![cfg_attr(not(feature = "net"), allow(dead_code))]
2
3use crate::io::interest::Interest;
4use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo};
5use crate::runtime::scheduler;
6
7use mio::event::Source;
8use std::io;
9use std::sync::Arc;
10use std::task::{ready, Context, Poll};
11
12cfg_io_driver! {
13    /// Associates an I/O resource with the reactor instance that drives it.
14    ///
15    /// A registration represents an I/O resource registered with a Reactor such
16    /// that it will receive task notifications on readiness. This is the lowest
17    /// level API for integrating with a reactor.
18    ///
19    /// The association between an I/O resource is made by calling
20    /// [`new_with_interest_and_handle`].
21    /// Once the association is established, it remains established until the
22    /// registration instance is dropped.
23    ///
24    /// A registration instance represents two separate readiness streams. One
25    /// for the read readiness and one for write readiness. These streams are
26    /// independent and can be consumed from separate tasks.
27    ///
28    /// **Note**: while `Registration` is `Sync`, the caller must ensure that
29    /// there are at most two tasks that use a registration instance
30    /// concurrently. One task for [`poll_read_ready`] and one task for
31    /// [`poll_write_ready`]. While violating this requirement is "safe" from a
32    /// Rust memory safety point of view, it will result in unexpected behavior
33    /// in the form of lost notifications and tasks hanging.
34    ///
35    /// ## Platform-specific events
36    ///
37    /// `Registration` also allows receiving platform-specific `mio::Ready`
38    /// events. These events are included as part of the read readiness event
39    /// stream. The write readiness event stream is only for `Ready::writable()`
40    /// events.
41    ///
42    /// [`new_with_interest_and_handle`]: method@Self::new_with_interest_and_handle
43    /// [`poll_read_ready`]: method@Self::poll_read_ready`
44    /// [`poll_write_ready`]: method@Self::poll_write_ready`
45    #[derive(Debug)]
46    pub(crate) struct Registration {
47        /// Handle to the associated runtime.
48        ///
49        /// TODO: this can probably be moved into `ScheduledIo`.
50        handle: scheduler::Handle,
51
52        /// Reference to state stored by the driver.
53        shared: Arc<ScheduledIo>,
54    }
55}
56
57unsafe impl Send for Registration {}
58unsafe impl Sync for Registration {}
59
60// ===== impl Registration =====
61
62impl Registration {
63    /// Registers the I/O resource with the reactor for the provided handle, for
64    /// a specific `Interest`. This does not add `hup` or `error` so if you are
65    /// interested in those states, you will need to add them to the readiness
66    /// state passed to this function.
67    ///
68    /// # Return
69    ///
70    /// - `Ok` if the registration happened successfully
71    /// - `Err` if an error was encountered during registration
72    #[track_caller]
73    pub(crate) fn new_with_interest_and_handle(
74        io: &mut impl Source,
75        interest: Interest,
76        handle: scheduler::Handle,
77    ) -> io::Result<Registration> {
78        let shared = handle.driver().io().add_source(io, interest)?;
79
80        Ok(Registration { handle, shared })
81    }
82
83    /// Deregisters the I/O resource from the reactor it is associated with.
84    ///
85    /// This function must be called before the I/O resource associated with the
86    /// registration is dropped.
87    ///
88    /// Note that deregistering does not guarantee that the I/O resource can be
89    /// registered with a different reactor. Some I/O resource types can only be
90    /// associated with a single reactor instance for their lifetime.
91    ///
92    /// # Return
93    ///
94    /// If the deregistration was successful, `Ok` is returned. Any calls to
95    /// `Reactor::turn` that happen after a successful call to `deregister` will
96    /// no longer result in notifications getting sent for this registration.
97    ///
98    /// `Err` is returned if an error is encountered.
99    pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
100        self.handle().deregister_source(&self.shared, io)
101    }
102
103    pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
104        self.shared.clear_readiness(event);
105    }
106
107    // Uses the poll path, requiring the caller to ensure mutual exclusion for
108    // correctness. Only the last task to call this function is notified.
109    pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
110        self.poll_ready(cx, Direction::Read)
111    }
112
113    // Uses the poll path, requiring the caller to ensure mutual exclusion for
114    // correctness. Only the last task to call this function is notified.
115    pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
116        self.poll_ready(cx, Direction::Write)
117    }
118
119    // Uses the poll path, requiring the caller to ensure mutual exclusion for
120    // correctness. Only the last task to call this function is notified.
121    #[cfg(not(target_os = "wasi"))]
122    pub(crate) fn poll_read_io<R>(
123        &self,
124        cx: &mut Context<'_>,
125        f: impl FnMut() -> io::Result<R>,
126    ) -> Poll<io::Result<R>> {
127        self.poll_io(cx, Direction::Read, f)
128    }
129
130    // Uses the poll path, requiring the caller to ensure mutual exclusion for
131    // correctness. Only the last task to call this function is notified.
132    pub(crate) fn poll_write_io<R>(
133        &self,
134        cx: &mut Context<'_>,
135        f: impl FnMut() -> io::Result<R>,
136    ) -> Poll<io::Result<R>> {
137        self.poll_io(cx, Direction::Write, f)
138    }
139
140    /// Polls for events on the I/O resource's `direction` readiness stream.
141    ///
142    /// If called with a task context, notify the task when a new event is
143    /// received.
144    fn poll_ready(
145        &self,
146        cx: &mut Context<'_>,
147        direction: Direction,
148    ) -> Poll<io::Result<ReadyEvent>> {
149        ready!(crate::trace::trace_leaf(cx));
150        // Keep track of task budget
151        let coop = ready!(crate::task::coop::poll_proceed(cx));
152        let ev = ready!(self.shared.poll_readiness(cx, direction));
153
154        if ev.is_shutdown {
155            return Poll::Ready(Err(gone()));
156        }
157
158        coop.made_progress();
159        Poll::Ready(Ok(ev))
160    }
161
162    fn poll_io<R>(
163        &self,
164        cx: &mut Context<'_>,
165        direction: Direction,
166        mut f: impl FnMut() -> io::Result<R>,
167    ) -> Poll<io::Result<R>> {
168        loop {
169            let ev = ready!(self.poll_ready(cx, direction))?;
170
171            match f() {
172                Ok(ret) => {
173                    return Poll::Ready(Ok(ret));
174                }
175                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
176                    self.clear_readiness(ev);
177                }
178                Err(e) => return Poll::Ready(Err(e)),
179            }
180        }
181    }
182
183    pub(crate) fn try_io<R>(
184        &self,
185        interest: Interest,
186        f: impl FnOnce() -> io::Result<R>,
187    ) -> io::Result<R> {
188        let ev = self.shared.ready_event(interest);
189
190        // Don't attempt the operation if the resource is not ready.
191        if ev.ready.is_empty() {
192            return Err(io::ErrorKind::WouldBlock.into());
193        }
194
195        match f() {
196            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
197                self.clear_readiness(ev);
198                Err(io::ErrorKind::WouldBlock.into())
199            }
200            res => res,
201        }
202    }
203
204    pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
205        let ev = self.shared.readiness(interest).await;
206
207        if ev.is_shutdown {
208            return Err(gone());
209        }
210
211        Ok(ev)
212    }
213
214    pub(crate) async fn async_io<R>(
215        &self,
216        interest: Interest,
217        mut f: impl FnMut() -> io::Result<R>,
218    ) -> io::Result<R> {
219        loop {
220            let event = self.readiness(interest).await?;
221
222            let coop = std::future::poll_fn(crate::task::coop::poll_proceed).await;
223
224            match f() {
225                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
226                    self.clear_readiness(event);
227                }
228                x => {
229                    coop.made_progress();
230                    return x;
231                }
232            }
233        }
234    }
235
236    fn handle(&self) -> &Handle {
237        self.handle.driver().io()
238    }
239}
240
241impl Drop for Registration {
242    fn drop(&mut self) {
243        // It is possible for a cycle to be created between wakers stored in
244        // `ScheduledIo` instances and `Arc<driver::Inner>`. To break this
245        // cycle, wakers are cleared. This is an imperfect solution as it is
246        // possible to store a `Registration` in a waker. In this case, the
247        // cycle would remain.
248        //
249        // See tokio-rs/tokio#3481 for more details.
250        self.shared.clear_wakers();
251    }
252}
253
254fn gone() -> io::Error {
255    io::Error::new(
256        io::ErrorKind::Other,
257        crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
258    )
259}