1#![cfg_attr(not(feature = "full"), allow(dead_code))]
23//! Yield points for improved cooperative scheduling.
4//!
5//! Documentation for this can be found in the [`tokio::task`] module.
6//!
7//! [`tokio::task`]: crate::task.
89// ```ignore
10// # use tokio_stream::{Stream, StreamExt};
11// async fn drop_all<I: Stream + Unpin>(mut input: I) {
12// while let Some(_) = input.next().await {
13// tokio::coop::proceed().await;
14// }
15// }
16// ```
17//
18// The `proceed` future will coordinate with the executor to make sure that
19// every so often control is yielded back to the executor so it can run other
20// tasks.
21//
22// # Placing yield points
23//
24// Voluntary yield points should be placed _after_ at least some work has been
25// done. If they are not, a future sufficiently deep in the task hierarchy may
26// end up _never_ getting to run because of the number of yield points that
27// inevitably appear before it is reached. In general, you will want yield
28// points to only appear in "leaf" futures -- those that do not themselves poll
29// other futures. By doing this, you avoid double-counting each iteration of
30// the outer future against the cooperating budget.
3132use crate::runtime::context;
3334/// Opaque type tracking the amount of "work" a task may still do before
35/// yielding back to the scheduler.
36#[derive(Debug, Copy, Clone)]
37pub(crate) struct Budget(Option<u8>);
3839pub(crate) struct BudgetDecrement {
40 success: bool,
41 hit_zero: bool,
42}
4344impl Budget {
45/// Budget assigned to a task on each poll.
46 ///
47 /// The value itself is chosen somewhat arbitrarily. It needs to be high
48 /// enough to amortize wakeup and scheduling costs, but low enough that we
49 /// do not starve other tasks for too long. The value also needs to be high
50 /// enough that particularly deep tasks are able to do at least some useful
51 /// work at all.
52 ///
53 /// Note that as more yield points are added in the ecosystem, this value
54 /// will probably also have to be raised.
55const fn initial() -> Budget {
56 Budget(Some(128))
57 }
5859/// Returns an unconstrained budget. Operations will not be limited.
60pub(super) const fn unconstrained() -> Budget {
61 Budget(None)
62 }
6364fn has_remaining(self) -> bool {
65self.0.map_or(true, |budget| budget > 0)
66 }
67}
6869/// Runs the given closure with a cooperative task budget. When the function
70/// returns, the budget is reset to the value prior to calling the function.
71#[inline(always)]
72pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
73 with_budget(Budget::initial(), f)
74}
7576/// Runs the given closure with an unconstrained task budget. When the function returns, the budget
77/// is reset to the value prior to calling the function.
78#[inline(always)]
79pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
80 with_budget(Budget::unconstrained(), f)
81}
8283#[inline(always)]
84fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
85struct ResetGuard {
86 prev: Budget,
87 }
8889impl Drop for ResetGuard {
90fn drop(&mut self) {
91let _ = context::budget(|cell| {
92 cell.set(self.prev);
93 });
94 }
95 }
9697#[allow(unused_variables)]
98let maybe_guard = context::budget(|cell| {
99let prev = cell.get();
100 cell.set(budget);
101102 ResetGuard { prev }
103 });
104105// The function is called regardless even if the budget is not successfully
106 // set due to the thread-local being destroyed.
107f()
108}
109110#[inline(always)]
111pub(crate) fn has_budget_remaining() -> bool {
112// If the current budget cannot be accessed due to the thread-local being
113 // shutdown, then we assume there is budget remaining.
114context::budget(|cell| cell.get().has_remaining()).unwrap_or(true)
115}
116117cfg_rt_multi_thread! {
118/// Sets the current task's budget.
119pub(crate) fn set(budget: Budget) {
120let _ = context::budget(|cell| cell.set(budget));
121 }
122}
123124cfg_rt! {
125/// Forcibly removes the budgeting constraints early.
126 ///
127 /// Returns the remaining budget
128pub(crate) fn stop() -> Budget {
129 context::budget(|cell| {
130let prev = cell.get();
131 cell.set(Budget::unconstrained());
132 prev
133 }).unwrap_or(Budget::unconstrained())
134 }
135}
136137cfg_coop! {
138use pin_project_lite::pin_project;
139use std::cell::Cell;
140use std::future::Future;
141use std::pin::Pin;
142use std::task::{ready, Context, Poll};
143144#[must_use]
145pub(crate) struct RestoreOnPending(Cell<Budget>);
146147impl RestoreOnPending {
148pub(crate) fn made_progress(&self) {
149self.0.set(Budget::unconstrained());
150 }
151 }
152153impl Drop for RestoreOnPending {
154fn drop(&mut self) {
155// Don't reset if budget was unconstrained or if we made progress.
156 // They are both represented as the remembered budget being unconstrained.
157let budget = self.0.get();
158if !budget.is_unconstrained() {
159let _ = context::budget(|cell| {
160 cell.set(budget);
161 });
162 }
163 }
164 }
165166/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield.
167 ///
168 /// When you call this method, the current budget is decremented. However, to ensure that
169 /// progress is made every time a task is polled, the budget is automatically restored to its
170 /// former value if the returned `RestoreOnPending` is dropped. It is the caller's
171 /// responsibility to call `RestoreOnPending::made_progress` if it made progress, to ensure
172 /// that the budget empties appropriately.
173 ///
174 /// Note that `RestoreOnPending` restores the budget **as it was before `poll_proceed`**.
175 /// Therefore, if the budget is _further_ adjusted between when `poll_proceed` returns and
176 /// `RestRestoreOnPending` is dropped, those adjustments are erased unless the caller indicates
177 /// that progress was made.
178#[inline]
179pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<RestoreOnPending> {
180 context::budget(|cell| {
181let mut budget = cell.get();
182183let decrement = budget.decrement();
184185if decrement.success {
186let restore = RestoreOnPending(Cell::new(cell.get()));
187 cell.set(budget);
188189// avoid double counting
190if decrement.hit_zero {
191 inc_budget_forced_yield_count();
192 }
193194 Poll::Ready(restore)
195 } else {
196 cx.waker().wake_by_ref();
197 Poll::Pending
198 }
199 }).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
200 }
201202cfg_rt! {
203cfg_unstable_metrics! {
204#[inline(always)]
205fn inc_budget_forced_yield_count() {
206let _ = context::with_current(|handle| {
207 handle.scheduler_metrics().inc_budget_forced_yield_count();
208 });
209 }
210 }
211212cfg_not_unstable_metrics! {
213#[inline(always)]
214fn inc_budget_forced_yield_count() {}
215 }
216 }
217218cfg_not_rt! {
219#[inline(always)]
220fn inc_budget_forced_yield_count() {}
221 }
222223impl Budget {
224/// Decrements the budget. Returns `true` if successful. Decrementing fails
225 /// when there is not enough remaining budget.
226fn decrement(&mut self) -> BudgetDecrement {
227if let Some(num) = &mut self.0 {
228if *num > 0 {
229*num -= 1;
230231let hit_zero = *num == 0;
232233 BudgetDecrement { success: true, hit_zero }
234 } else {
235 BudgetDecrement { success: false, hit_zero: false }
236 }
237 } else {
238 BudgetDecrement { success: true, hit_zero: false }
239 }
240 }
241242fn is_unconstrained(self) -> bool {
243self.0.is_none()
244 }
245 }
246247pin_project! {
248/// Future wrapper to ensure cooperative scheduling.
249 ///
250 /// When being polled `poll_proceed` is called before the inner future is polled to check
251 /// if the inner future has exceeded its budget. If the inner future resolves, this will
252 /// automatically call `RestoreOnPending::made_progress` before resolving this future with
253 /// the result of the inner one. If polling the inner future is pending, polling this future
254 /// type will also return a `Poll::Pending`.
255#[must_use = "futures do nothing unless polled"]
256pub(crate) struct Coop<F: Future> {
257#[pin]
258pub(crate) fut: F,
259 }
260 }
261262impl<F: Future> Future for Coop<F> {
263type Output = F::Output;
264265fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
266let coop = ready!(poll_proceed(cx));
267let me = self.project();
268if let Poll::Ready(ret) = me.fut.poll(cx) {
269 coop.made_progress();
270 Poll::Ready(ret)
271 } else {
272 Poll::Pending
273 }
274 }
275 }
276277/// Run a future with a budget constraint for cooperative scheduling.
278 /// If the future exceeds its budget while being polled, control is yielded back to the
279 /// runtime.
280#[inline]
281pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> {
282 Coop { fut }
283 }
284}
285286#[cfg(all(test, not(loom)))]
287mod test {
288use super::*;
289290#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
291use wasm_bindgen_test::wasm_bindgen_test as test;
292293fn get() -> Budget {
294 context::budget(|cell| cell.get()).unwrap_or(Budget::unconstrained())
295 }
296297#[test]
298fn budgeting() {
299use std::future::poll_fn;
300use tokio_test::*;
301302assert!(get().0.is_none());
303304let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
305306assert!(get().0.is_none());
307 drop(coop);
308assert!(get().0.is_none());
309310 budget(|| {
311assert_eq!(get().0, Budget::initial().0);
312313let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
314assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
315 drop(coop);
316// we didn't make progress
317assert_eq!(get().0, Budget::initial().0);
318319let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
320assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
321 coop.made_progress();
322 drop(coop);
323// we _did_ make progress
324assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
325326let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
327assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
328 coop.made_progress();
329 drop(coop);
330assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
331332 budget(|| {
333assert_eq!(get().0, Budget::initial().0);
334335let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
336assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
337 coop.made_progress();
338 drop(coop);
339assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1);
340 });
341342assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2);
343 });
344345assert!(get().0.is_none());
346347 budget(|| {
348let n = get().0.unwrap();
349350for _ in 0..n {
351let coop = assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx)));
352 coop.made_progress();
353 }
354355let mut task = task::spawn(poll_fn(|cx| {
356let coop = std::task::ready!(poll_proceed(cx));
357 coop.made_progress();
358 Poll::Ready(())
359 }));
360361assert_pending!(task.poll());
362 });
363 }
364}