tokio/runtime/io/
scheduled_io.rs

1use crate::io::interest::Interest;
2use crate::io::ready::Ready;
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::Mutex;
5use crate::runtime::io::{Direction, ReadyEvent, Tick};
6use crate::util::bit;
7use crate::util::linked_list::{self, LinkedList};
8use crate::util::WakeList;
9
10use std::cell::UnsafeCell;
11use std::future::Future;
12use std::marker::PhantomPinned;
13use std::pin::Pin;
14use std::ptr::NonNull;
15use std::sync::atomic::Ordering::{AcqRel, Acquire};
16use std::task::{Context, Poll, Waker};
17
18/// Stored in the I/O driver resource slab.
19#[derive(Debug)]
20// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
21// from crossbeam-utils/src/cache_padded.rs
22//
23// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
24// lines at a time, so we have to align to 128 bytes rather than 64.
25//
26// Sources:
27// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
28// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
29//
30// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
31//
32// Sources:
33// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
34//
35// powerpc64 has 128-byte cache line size.
36//
37// Sources:
38// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
39#[cfg_attr(
40    any(
41        target_arch = "x86_64",
42        target_arch = "aarch64",
43        target_arch = "powerpc64",
44    ),
45    repr(align(128))
46)]
47// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
48//
49// Sources:
50// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
51// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
52// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
53// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
54// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
55// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
56#[cfg_attr(
57    any(
58        target_arch = "arm",
59        target_arch = "mips",
60        target_arch = "mips64",
61        target_arch = "sparc",
62        target_arch = "hexagon",
63    ),
64    repr(align(32))
65)]
66// m68k has 16-byte cache line size.
67//
68// Sources:
69// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
70#[cfg_attr(target_arch = "m68k", repr(align(16)))]
71// s390x has 256-byte cache line size.
72//
73// Sources:
74// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
75// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
76#[cfg_attr(target_arch = "s390x", repr(align(256)))]
77// x86, riscv, wasm, and sparc64 have 64-byte cache line size.
78//
79// Sources:
80// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
81// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
82// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
83// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
84//
85// All others are assumed to have 64-byte cache line size.
86#[cfg_attr(
87    not(any(
88        target_arch = "x86_64",
89        target_arch = "aarch64",
90        target_arch = "powerpc64",
91        target_arch = "arm",
92        target_arch = "mips",
93        target_arch = "mips64",
94        target_arch = "sparc",
95        target_arch = "hexagon",
96        target_arch = "m68k",
97        target_arch = "s390x",
98    )),
99    repr(align(64))
100)]
101pub(crate) struct ScheduledIo {
102    pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
103
104    /// Packs the resource's readiness and I/O driver latest tick.
105    readiness: AtomicUsize,
106
107    waiters: Mutex<Waiters>,
108}
109
110type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
111
112#[derive(Debug, Default)]
113struct Waiters {
114    /// List of all current waiters.
115    list: WaitList,
116
117    /// Waker used for `AsyncRead`.
118    reader: Option<Waker>,
119
120    /// Waker used for `AsyncWrite`.
121    writer: Option<Waker>,
122}
123
124#[derive(Debug)]
125struct Waiter {
126    pointers: linked_list::Pointers<Waiter>,
127
128    /// The waker for this task.
129    waker: Option<Waker>,
130
131    /// The interest this waiter is waiting on.
132    interest: Interest,
133
134    is_ready: bool,
135
136    /// Should never be `!Unpin`.
137    _p: PhantomPinned,
138}
139
140generate_addr_of_methods! {
141    impl<> Waiter {
142        unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
143            &self.pointers
144        }
145    }
146}
147
148/// Future returned by `readiness()`.
149struct Readiness<'a> {
150    scheduled_io: &'a ScheduledIo,
151
152    state: State,
153
154    /// Entry in the waiter `LinkedList`.
155    waiter: UnsafeCell<Waiter>,
156}
157
158enum State {
159    Init,
160    Waiting,
161    Done,
162}
163
164// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
165//
166// | shutdown | driver tick | readiness |
167// |----------+-------------+-----------|
168// |   1 bit  |  15 bits    +   16 bits |
169
170const READINESS: bit::Pack = bit::Pack::least_significant(16);
171
172const TICK: bit::Pack = READINESS.then(15);
173
174const SHUTDOWN: bit::Pack = TICK.then(1);
175
176// ===== impl ScheduledIo =====
177
178impl Default for ScheduledIo {
179    fn default() -> ScheduledIo {
180        ScheduledIo {
181            linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()),
182            readiness: AtomicUsize::new(0),
183            waiters: Mutex::new(Waiters::default()),
184        }
185    }
186}
187
188impl ScheduledIo {
189    pub(crate) fn token(&self) -> mio::Token {
190        mio::Token(super::EXPOSE_IO.expose_provenance(self))
191    }
192
193    /// Invoked when the IO driver is shut down; forces this `ScheduledIo` into a
194    /// permanently shutdown state.
195    pub(super) fn shutdown(&self) {
196        let mask = SHUTDOWN.pack(1, 0);
197        self.readiness.fetch_or(mask, AcqRel);
198        self.wake(Ready::ALL);
199    }
200
201    /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
202    /// the current value, returning the previous readiness value.
203    ///
204    /// # Arguments
205    /// - `tick`: whether setting the tick or trying to clear readiness for a
206    ///    specific tick.
207    /// - `f`: a closure returning a new readiness value given the previous
208    ///   readiness.
209    pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
210        let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
211            // If the io driver is shut down, then you are only allowed to clear readiness.
212            debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_)));
213
214            const MAX_TICK: usize = TICK.max_value() + 1;
215            let tick = TICK.unpack(curr);
216
217            let new_tick = match tick_op {
218                // Trying to clear readiness with an old event!
219                Tick::Clear(t) if tick as u8 != t => return None,
220                Tick::Clear(t) => t as usize,
221                Tick::Set => tick.wrapping_add(1) % MAX_TICK,
222            };
223            let ready = Ready::from_usize(READINESS.unpack(curr));
224            Some(TICK.pack(new_tick, f(ready).as_usize()))
225        });
226    }
227
228    /// Notifies all pending waiters that have registered interest in `ready`.
229    ///
230    /// There may be many waiters to notify. Waking the pending task **must** be
231    /// done from outside of the lock otherwise there is a potential for a
232    /// deadlock.
233    ///
234    /// A stack array of wakers is created and filled with wakers to notify, the
235    /// lock is released, and the wakers are notified. Because there may be more
236    /// than 32 wakers to notify, if the stack array fills up, the lock is
237    /// released, the array is cleared, and the iteration continues.
238    pub(super) fn wake(&self, ready: Ready) {
239        let mut wakers = WakeList::new();
240
241        let mut waiters = self.waiters.lock();
242
243        // check for AsyncRead slot
244        if ready.is_readable() {
245            if let Some(waker) = waiters.reader.take() {
246                wakers.push(waker);
247            }
248        }
249
250        // check for AsyncWrite slot
251        if ready.is_writable() {
252            if let Some(waker) = waiters.writer.take() {
253                wakers.push(waker);
254            }
255        }
256
257        'outer: loop {
258            let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
259
260            while wakers.can_push() {
261                match iter.next() {
262                    Some(waiter) => {
263                        let waiter = unsafe { &mut *waiter.as_ptr() };
264
265                        if let Some(waker) = waiter.waker.take() {
266                            waiter.is_ready = true;
267                            wakers.push(waker);
268                        }
269                    }
270                    None => {
271                        break 'outer;
272                    }
273                }
274            }
275
276            drop(waiters);
277
278            wakers.wake_all();
279
280            // Acquire the lock again.
281            waiters = self.waiters.lock();
282        }
283
284        // Release the lock before notifying
285        drop(waiters);
286
287        wakers.wake_all();
288    }
289
290    pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
291        let curr = self.readiness.load(Acquire);
292
293        ReadyEvent {
294            tick: TICK.unpack(curr) as u8,
295            ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
296            is_shutdown: SHUTDOWN.unpack(curr) != 0,
297        }
298    }
299
300    /// Polls for readiness events in a given direction.
301    ///
302    /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
303    /// which cannot use the `async fn` version. This uses reserved reader
304    /// and writer slots.
305    pub(super) fn poll_readiness(
306        &self,
307        cx: &mut Context<'_>,
308        direction: Direction,
309    ) -> Poll<ReadyEvent> {
310        let curr = self.readiness.load(Acquire);
311
312        let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
313        let is_shutdown = SHUTDOWN.unpack(curr) != 0;
314
315        if ready.is_empty() && !is_shutdown {
316            // Update the task info
317            let mut waiters = self.waiters.lock();
318            let waker = match direction {
319                Direction::Read => &mut waiters.reader,
320                Direction::Write => &mut waiters.writer,
321            };
322
323            // Avoid cloning the waker if one is already stored that matches the
324            // current task.
325            match waker {
326                Some(waker) => waker.clone_from(cx.waker()),
327                None => *waker = Some(cx.waker().clone()),
328            }
329
330            // Try again, in case the readiness was changed while we were
331            // taking the waiters lock
332            let curr = self.readiness.load(Acquire);
333            let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
334            let is_shutdown = SHUTDOWN.unpack(curr) != 0;
335            if is_shutdown {
336                Poll::Ready(ReadyEvent {
337                    tick: TICK.unpack(curr) as u8,
338                    ready: direction.mask(),
339                    is_shutdown,
340                })
341            } else if ready.is_empty() {
342                Poll::Pending
343            } else {
344                Poll::Ready(ReadyEvent {
345                    tick: TICK.unpack(curr) as u8,
346                    ready,
347                    is_shutdown,
348                })
349            }
350        } else {
351            Poll::Ready(ReadyEvent {
352                tick: TICK.unpack(curr) as u8,
353                ready,
354                is_shutdown,
355            })
356        }
357    }
358
359    pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
360        // This consumes the current readiness state **except** for closed
361        // states. Closed states are excluded because they are final states.
362        let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
363        self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed);
364    }
365
366    pub(crate) fn clear_wakers(&self) {
367        let mut waiters = self.waiters.lock();
368        waiters.reader.take();
369        waiters.writer.take();
370    }
371}
372
373impl Drop for ScheduledIo {
374    fn drop(&mut self) {
375        self.wake(Ready::ALL);
376    }
377}
378
379unsafe impl Send for ScheduledIo {}
380unsafe impl Sync for ScheduledIo {}
381
382impl ScheduledIo {
383    /// An async version of `poll_readiness` which uses a linked list of wakers.
384    pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
385        self.readiness_fut(interest).await
386    }
387
388    // This is in a separate function so that the borrow checker doesn't think
389    // we are borrowing the `UnsafeCell` possibly over await boundaries.
390    //
391    // Go figure.
392    fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
393        Readiness {
394            scheduled_io: self,
395            state: State::Init,
396            waiter: UnsafeCell::new(Waiter {
397                pointers: linked_list::Pointers::new(),
398                waker: None,
399                is_ready: false,
400                interest,
401                _p: PhantomPinned,
402            }),
403        }
404    }
405}
406
407unsafe impl linked_list::Link for Waiter {
408    type Handle = NonNull<Waiter>;
409    type Target = Waiter;
410
411    fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
412        *handle
413    }
414
415    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
416        ptr
417    }
418
419    unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
420        Waiter::addr_of_pointers(target)
421    }
422}
423
424// ===== impl Readiness =====
425
426impl Future for Readiness<'_> {
427    type Output = ReadyEvent;
428
429    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
430        use std::sync::atomic::Ordering::SeqCst;
431
432        let (scheduled_io, state, waiter) = unsafe {
433            let me = self.get_unchecked_mut();
434            (&me.scheduled_io, &mut me.state, &me.waiter)
435        };
436
437        loop {
438            match *state {
439                State::Init => {
440                    // Optimistically check existing readiness
441                    let curr = scheduled_io.readiness.load(SeqCst);
442                    let is_shutdown = SHUTDOWN.unpack(curr) != 0;
443
444                    // Safety: `waiter.interest` never changes
445                    let interest = unsafe { (*waiter.get()).interest };
446                    let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
447
448                    if !ready.is_empty() || is_shutdown {
449                        // Currently ready!
450                        let tick = TICK.unpack(curr) as u8;
451                        *state = State::Done;
452                        return Poll::Ready(ReadyEvent {
453                            tick,
454                            ready,
455                            is_shutdown,
456                        });
457                    }
458
459                    // Wasn't ready, take the lock (and check again while locked).
460                    let mut waiters = scheduled_io.waiters.lock();
461
462                    let curr = scheduled_io.readiness.load(SeqCst);
463                    let mut ready = Ready::from_usize(READINESS.unpack(curr));
464                    let is_shutdown = SHUTDOWN.unpack(curr) != 0;
465
466                    if is_shutdown {
467                        ready = Ready::ALL;
468                    }
469
470                    let ready = ready.intersection(interest);
471
472                    if !ready.is_empty() || is_shutdown {
473                        // Currently ready!
474                        let tick = TICK.unpack(curr) as u8;
475                        *state = State::Done;
476                        return Poll::Ready(ReadyEvent {
477                            tick,
478                            ready,
479                            is_shutdown,
480                        });
481                    }
482
483                    // Not ready even after locked, insert into list...
484
485                    // Safety: called while locked
486                    unsafe {
487                        (*waiter.get()).waker = Some(cx.waker().clone());
488                    }
489
490                    // Insert the waiter into the linked list
491                    //
492                    // safety: pointers from `UnsafeCell` are never null.
493                    waiters
494                        .list
495                        .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
496                    *state = State::Waiting;
497                }
498                State::Waiting => {
499                    // Currently in the "Waiting" state, implying the caller has
500                    // a waiter stored in the waiter list (guarded by
501                    // `notify.waiters`). In order to access the waker fields,
502                    // we must hold the lock.
503
504                    let waiters = scheduled_io.waiters.lock();
505
506                    // Safety: called while locked
507                    let w = unsafe { &mut *waiter.get() };
508
509                    if w.is_ready {
510                        // Our waker has been notified.
511                        *state = State::Done;
512                    } else {
513                        // Update the waker, if necessary.
514                        w.waker.as_mut().unwrap().clone_from(cx.waker());
515                        return Poll::Pending;
516                    }
517
518                    // Explicit drop of the lock to indicate the scope that the
519                    // lock is held. Because holding the lock is required to
520                    // ensure safe access to fields not held within the lock, it
521                    // is helpful to visualize the scope of the critical
522                    // section.
523                    drop(waiters);
524                }
525                State::Done => {
526                    // Safety: State::Done means it is no longer shared
527                    let w = unsafe { &mut *waiter.get() };
528
529                    let curr = scheduled_io.readiness.load(Acquire);
530                    let is_shutdown = SHUTDOWN.unpack(curr) != 0;
531
532                    // The returned tick might be newer than the event
533                    // which notified our waker. This is ok because the future
534                    // still didn't return `Poll::Ready`.
535                    let tick = TICK.unpack(curr) as u8;
536
537                    // The readiness state could have been cleared in the meantime,
538                    // but we allow the returned ready set to be empty.
539                    let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest);
540
541                    return Poll::Ready(ReadyEvent {
542                        tick,
543                        ready,
544                        is_shutdown,
545                    });
546                }
547            }
548        }
549    }
550}
551
552impl Drop for Readiness<'_> {
553    fn drop(&mut self) {
554        let mut waiters = self.scheduled_io.waiters.lock();
555
556        // Safety: `waiter` is only ever stored in `waiters`
557        unsafe {
558            waiters
559                .list
560                .remove(NonNull::new_unchecked(self.waiter.get()))
561        };
562    }
563}
564
565unsafe impl Send for Readiness<'_> {}
566unsafe impl Sync for Readiness<'_> {}