1// Signal handling
2cfg_signal_internal_and_unix! {
3mod signal;
4}
56use crate::io::interest::Interest;
7use crate::io::ready::Ready;
8use crate::loom::sync::Mutex;
9use crate::runtime::driver;
10use crate::runtime::io::registration_set;
11use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
1213use mio::event::Source;
14use std::fmt;
15use std::io;
16use std::sync::Arc;
17use std::time::Duration;
1819/// I/O driver, backed by Mio.
20pub(crate) struct Driver {
21/// True when an event with the signal token is received
22signal_ready: bool,
2324/// Reuse the `mio::Events` value across calls to poll.
25events: mio::Events,
2627/// The system event queue.
28poll: mio::Poll,
29}
3031/// A reference to an I/O driver.
32pub(crate) struct Handle {
33/// Registers I/O resources.
34registry: mio::Registry,
3536/// Tracks all registrations
37registrations: RegistrationSet,
3839/// State that should be synchronized
40synced: Mutex<registration_set::Synced>,
4142/// Used to wake up the reactor from a call to `turn`.
43 /// Not supported on `Wasi` due to lack of threading support.
44#[cfg(not(target_os = "wasi"))]
45waker: mio::Waker,
4647pub(crate) metrics: IoDriverMetrics,
48}
4950#[derive(Debug)]
51pub(crate) struct ReadyEvent {
52pub(super) tick: u8,
53pub(crate) ready: Ready,
54pub(super) is_shutdown: bool,
55}
5657cfg_net_unix!(
58impl ReadyEvent {
59pub(crate) fn with_ready(&self, ready: Ready) -> Self {
60Self {
61 ready,
62 tick: self.tick,
63 is_shutdown: self.is_shutdown,
64 }
65 }
66 }
67);
6869#[derive(Debug, Eq, PartialEq, Clone, Copy)]
70pub(super) enum Direction {
71 Read,
72 Write,
73}
7475pub(super) enum Tick {
76 Set,
77 Clear(u8),
78}
7980const TOKEN_WAKEUP: mio::Token = mio::Token(0);
81const TOKEN_SIGNAL: mio::Token = mio::Token(1);
8283fn _assert_kinds() {
84fn _assert<T: Send + Sync>() {}
8586 _assert::<Handle>();
87}
8889// ===== impl Driver =====
9091impl Driver {
92/// Creates a new event loop, returning any error that happened during the
93 /// creation.
94pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
95let poll = mio::Poll::new()?;
96#[cfg(not(target_os = "wasi"))]
97let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
98let registry = poll.registry().try_clone()?;
99100let driver = Driver {
101 signal_ready: false,
102 events: mio::Events::with_capacity(nevents),
103 poll,
104 };
105106let (registrations, synced) = RegistrationSet::new();
107108let handle = Handle {
109 registry,
110 registrations,
111 synced: Mutex::new(synced),
112#[cfg(not(target_os = "wasi"))]
113waker,
114 metrics: IoDriverMetrics::default(),
115 };
116117Ok((driver, handle))
118 }
119120pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
121let handle = rt_handle.io();
122self.turn(handle, None);
123 }
124125pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
126let handle = rt_handle.io();
127self.turn(handle, Some(duration));
128 }
129130pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
131let handle = rt_handle.io();
132let ios = handle.registrations.shutdown(&mut handle.synced.lock());
133134// `shutdown()` must be called without holding the lock.
135for io in ios {
136 io.shutdown();
137 }
138 }
139140fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
141debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
142143 handle.release_pending_registrations();
144145let events = &mut self.events;
146147// Block waiting for an event to happen, peeling out how many events
148 // happened.
149match self.poll.poll(events, max_wait) {
150Ok(()) => {}
151Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
152#[cfg(target_os = "wasi")]
153Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
154// In case of wasm32_wasi this error happens, when trying to poll without subscriptions
155 // just return from the park, as there would be nothing, which wakes us up.
156}
157Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
158 }
159160// Process all the events that came in, dispatching appropriately
161let mut ready_count = 0;
162for event in events.iter() {
163let token = event.token();
164165if token == TOKEN_WAKEUP {
166// Nothing to do, the event is used to unblock the I/O driver
167} else if token == TOKEN_SIGNAL {
168self.signal_ready = true;
169 } else {
170let ready = Ready::from_mio(event);
171let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
172173// Safety: we ensure that the pointers used as tokens are not freed
174 // until they are both deregistered from mio **and** we know the I/O
175 // driver is not concurrently polling. The I/O driver holds ownership of
176 // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
177let io: &ScheduledIo = unsafe { &*ptr };
178179 io.set_readiness(Tick::Set, |curr| curr | ready);
180 io.wake(ready);
181182 ready_count += 1;
183 }
184 }
185186 handle.metrics.incr_ready_count_by(ready_count);
187 }
188}
189190impl fmt::Debug for Driver {
191fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192write!(f, "Driver")
193 }
194}
195196impl Handle {
197/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
198 /// makes the next call to `turn` return immediately.
199 ///
200 /// This method is intended to be used in situations where a notification
201 /// needs to otherwise be sent to the main reactor. If the reactor is
202 /// currently blocked inside of `turn` then it will wake up and soon return
203 /// after this method has been called. If the reactor is not currently
204 /// blocked in `turn`, then the next call to `turn` will not block and
205 /// return immediately.
206pub(crate) fn unpark(&self) {
207#[cfg(not(target_os = "wasi"))]
208self.waker.wake().expect("failed to wake I/O driver");
209 }
210211/// Registers an I/O resource with the reactor for a given `mio::Ready` state.
212 ///
213 /// The registration token is returned.
214pub(super) fn add_source(
215&self,
216 source: &mut impl mio::event::Source,
217 interest: Interest,
218 ) -> io::Result<Arc<ScheduledIo>> {
219let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
220let token = scheduled_io.token();
221222// we should remove the `scheduled_io` from the `registrations` set if registering
223 // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`.
224if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
225// safety: `scheduled_io` is part of the `registrations` set.
226unsafe {
227self.registrations
228 .remove(&mut self.synced.lock(), &scheduled_io)
229 };
230231return Err(e);
232 }
233234// TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
235self.metrics.incr_fd_count();
236237Ok(scheduled_io)
238 }
239240/// Deregisters an I/O resource from the reactor.
241pub(super) fn deregister_source(
242&self,
243 registration: &Arc<ScheduledIo>,
244 source: &mut impl Source,
245 ) -> io::Result<()> {
246// Deregister the source with the OS poller **first**
247self.registry.deregister(source)?;
248249if self
250.registrations
251 .deregister(&mut self.synced.lock(), registration)
252 {
253self.unpark();
254 }
255256self.metrics.dec_fd_count();
257258Ok(())
259 }
260261fn release_pending_registrations(&self) {
262if self.registrations.needs_release() {
263self.registrations.release(&mut self.synced.lock());
264 }
265 }
266}
267268impl fmt::Debug for Handle {
269fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270write!(f, "Handle")
271 }
272}
273274impl Direction {
275pub(super) fn mask(self) -> Ready {
276match self {
277 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
278 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
279 }
280 }
281}