hyper/proto/h2/
ping.rs

1/// HTTP2 Ping usage
2///
3/// hyper uses HTTP2 pings for two purposes:
4///
5/// 1. Adaptive flow control using BDP
6/// 2. Connection keep-alive
7///
8/// Both cases are optional.
9///
10/// # BDP Algorithm
11///
12/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
13///   1a. Record current time.
14///   1b. Send a BDP ping.
15/// 2. Increment the number of received bytes.
16/// 3. When the BDP ping ack is received:
17///   3a. Record duration from sent time.
18///   3b. Merge RTT with a running average.
19///   3c. Calculate bdp as bytes/rtt.
20///   3d. If bdp is over 2/3 max, set new max to bdp and update windows.
21
22#[cfg(feature = "runtime")]
23use std::fmt;
24#[cfg(feature = "runtime")]
25use std::future::Future;
26#[cfg(feature = "runtime")]
27use std::pin::Pin;
28use std::sync::{Arc, Mutex};
29use std::task::{self, Poll};
30use std::time::Duration;
31#[cfg(not(feature = "runtime"))]
32use std::time::Instant;
33
34use h2::{Ping, PingPong};
35#[cfg(feature = "runtime")]
36use tokio::time::{Instant, Sleep};
37use tracing::{debug, trace};
38
39type WindowSize = u32;
40
41pub(super) fn disabled() -> Recorder {
42    Recorder { shared: None }
43}
44
45pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) {
46    debug_assert!(
47        config.is_enabled(),
48        "ping channel requires bdp or keep-alive config",
49    );
50
51    let bdp = config.bdp_initial_window.map(|wnd| Bdp {
52        bdp: wnd,
53        max_bandwidth: 0.0,
54        rtt: 0.0,
55        ping_delay: Duration::from_millis(100),
56        stable_count: 0,
57    });
58
59    let (bytes, next_bdp_at) = if bdp.is_some() {
60        (Some(0), Some(Instant::now()))
61    } else {
62        (None, None)
63    };
64
65    #[cfg(feature = "runtime")]
66    let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
67        interval,
68        timeout: config.keep_alive_timeout,
69        while_idle: config.keep_alive_while_idle,
70        timer: Box::pin(tokio::time::sleep(interval)),
71        state: KeepAliveState::Init,
72    });
73
74    #[cfg(feature = "runtime")]
75    let last_read_at = keep_alive.as_ref().map(|_| Instant::now());
76
77    let shared = Arc::new(Mutex::new(Shared {
78        bytes,
79        #[cfg(feature = "runtime")]
80        last_read_at,
81        #[cfg(feature = "runtime")]
82        is_keep_alive_timed_out: false,
83        ping_pong,
84        ping_sent_at: None,
85        next_bdp_at,
86    }));
87
88    (
89        Recorder {
90            shared: Some(shared.clone()),
91        },
92        Ponger {
93            bdp,
94            #[cfg(feature = "runtime")]
95            keep_alive,
96            shared,
97        },
98    )
99}
100
101#[derive(Clone)]
102pub(super) struct Config {
103    pub(super) bdp_initial_window: Option<WindowSize>,
104    /// If no frames are received in this amount of time, a PING frame is sent.
105    #[cfg(feature = "runtime")]
106    pub(super) keep_alive_interval: Option<Duration>,
107    /// After sending a keepalive PING, the connection will be closed if
108    /// a pong is not received in this amount of time.
109    #[cfg(feature = "runtime")]
110    pub(super) keep_alive_timeout: Duration,
111    /// If true, sends pings even when there are no active streams.
112    #[cfg(feature = "runtime")]
113    pub(super) keep_alive_while_idle: bool,
114}
115
116#[derive(Clone)]
117pub(crate) struct Recorder {
118    shared: Option<Arc<Mutex<Shared>>>,
119}
120
121pub(super) struct Ponger {
122    bdp: Option<Bdp>,
123    #[cfg(feature = "runtime")]
124    keep_alive: Option<KeepAlive>,
125    shared: Arc<Mutex<Shared>>,
126}
127
128struct Shared {
129    ping_pong: PingPong,
130    ping_sent_at: Option<Instant>,
131
132    // bdp
133    /// If `Some`, bdp is enabled, and this tracks how many bytes have been
134    /// read during the current sample.
135    bytes: Option<usize>,
136    /// We delay a variable amount of time between BDP pings. This allows us
137    /// to send less pings as the bandwidth stabilizes.
138    next_bdp_at: Option<Instant>,
139
140    // keep-alive
141    /// If `Some`, keep-alive is enabled, and the Instant is how long ago
142    /// the connection read the last frame.
143    #[cfg(feature = "runtime")]
144    last_read_at: Option<Instant>,
145
146    #[cfg(feature = "runtime")]
147    is_keep_alive_timed_out: bool,
148}
149
150struct Bdp {
151    /// Current BDP in bytes
152    bdp: u32,
153    /// Largest bandwidth we've seen so far.
154    max_bandwidth: f64,
155    /// Round trip time in seconds
156    rtt: f64,
157    /// Delay the next ping by this amount.
158    ///
159    /// This will change depending on how stable the current bandwidth is.
160    ping_delay: Duration,
161    /// The count of ping round trips where BDP has stayed the same.
162    stable_count: u32,
163}
164
165#[cfg(feature = "runtime")]
166struct KeepAlive {
167    /// If no frames are received in this amount of time, a PING frame is sent.
168    interval: Duration,
169    /// After sending a keepalive PING, the connection will be closed if
170    /// a pong is not received in this amount of time.
171    timeout: Duration,
172    /// If true, sends pings even when there are no active streams.
173    while_idle: bool,
174
175    state: KeepAliveState,
176    timer: Pin<Box<Sleep>>,
177}
178
179#[cfg(feature = "runtime")]
180enum KeepAliveState {
181    Init,
182    Scheduled,
183    PingSent,
184}
185
186pub(super) enum Ponged {
187    SizeUpdate(WindowSize),
188    #[cfg(feature = "runtime")]
189    KeepAliveTimedOut,
190}
191
192#[cfg(feature = "runtime")]
193#[derive(Debug)]
194pub(super) struct KeepAliveTimedOut;
195
196// ===== impl Config =====
197
198impl Config {
199    pub(super) fn is_enabled(&self) -> bool {
200        #[cfg(feature = "runtime")]
201        {
202            self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
203        }
204
205        #[cfg(not(feature = "runtime"))]
206        {
207            self.bdp_initial_window.is_some()
208        }
209    }
210}
211
212// ===== impl Recorder =====
213
214impl Recorder {
215    pub(crate) fn record_data(&self, len: usize) {
216        let shared = if let Some(ref shared) = self.shared {
217            shared
218        } else {
219            return;
220        };
221
222        let mut locked = shared.lock().unwrap();
223
224        #[cfg(feature = "runtime")]
225        locked.update_last_read_at();
226
227        // are we ready to send another bdp ping?
228        // if not, we don't need to record bytes either
229
230        if let Some(ref next_bdp_at) = locked.next_bdp_at {
231            if Instant::now() < *next_bdp_at {
232                return;
233            } else {
234                locked.next_bdp_at = None;
235            }
236        }
237
238        if let Some(ref mut bytes) = locked.bytes {
239            *bytes += len;
240        } else {
241            // no need to send bdp ping if bdp is disabled
242            return;
243        }
244
245        if !locked.is_ping_sent() {
246            locked.send_ping();
247        }
248    }
249
250    pub(crate) fn record_non_data(&self) {
251        #[cfg(feature = "runtime")]
252        {
253            let shared = if let Some(ref shared) = self.shared {
254                shared
255            } else {
256                return;
257            };
258
259            let mut locked = shared.lock().unwrap();
260
261            locked.update_last_read_at();
262        }
263    }
264
265    /// If the incoming stream is already closed, convert self into
266    /// a disabled reporter.
267    #[cfg(feature = "client")]
268    pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
269        if stream.is_end_stream() {
270            disabled()
271        } else {
272            self
273        }
274    }
275
276    pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
277        #[cfg(feature = "runtime")]
278        {
279            if let Some(ref shared) = self.shared {
280                let locked = shared.lock().unwrap();
281                if locked.is_keep_alive_timed_out {
282                    return Err(KeepAliveTimedOut.crate_error());
283                }
284            }
285        }
286
287        // else
288        Ok(())
289    }
290}
291
292// ===== impl Ponger =====
293
294impl Ponger {
295    pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
296        let now = Instant::now();
297        let mut locked = self.shared.lock().unwrap();
298        #[cfg(feature = "runtime")]
299        let is_idle = self.is_idle();
300
301        #[cfg(feature = "runtime")]
302        {
303            if let Some(ref mut ka) = self.keep_alive {
304                ka.schedule(is_idle, &locked);
305                ka.maybe_ping(cx, &mut locked);
306            }
307        }
308
309        if !locked.is_ping_sent() {
310            // XXX: this doesn't register a waker...?
311            return Poll::Pending;
312        }
313
314        match locked.ping_pong.poll_pong(cx) {
315            Poll::Ready(Ok(_pong)) => {
316                let start = locked
317                    .ping_sent_at
318                    .expect("pong received implies ping_sent_at");
319                locked.ping_sent_at = None;
320                let rtt = now - start;
321                trace!("recv pong");
322
323                #[cfg(feature = "runtime")]
324                {
325                    if let Some(ref mut ka) = self.keep_alive {
326                        locked.update_last_read_at();
327                        ka.schedule(is_idle, &locked);
328                    }
329                }
330
331                if let Some(ref mut bdp) = self.bdp {
332                    let bytes = locked.bytes.expect("bdp enabled implies bytes");
333                    locked.bytes = Some(0); // reset
334                    trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
335
336                    let update = bdp.calculate(bytes, rtt);
337                    locked.next_bdp_at = Some(now + bdp.ping_delay);
338                    if let Some(update) = update {
339                        return Poll::Ready(Ponged::SizeUpdate(update));
340                    }
341                }
342            }
343            Poll::Ready(Err(e)) => {
344                debug!("pong error: {}", e);
345            }
346            Poll::Pending => {
347                #[cfg(feature = "runtime")]
348                {
349                    if let Some(ref mut ka) = self.keep_alive {
350                        if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
351                            self.keep_alive = None;
352                            locked.is_keep_alive_timed_out = true;
353                            return Poll::Ready(Ponged::KeepAliveTimedOut);
354                        }
355                    }
356                }
357            }
358        }
359
360        // XXX: this doesn't register a waker...?
361        Poll::Pending
362    }
363
364    #[cfg(feature = "runtime")]
365    fn is_idle(&self) -> bool {
366        Arc::strong_count(&self.shared) <= 2
367    }
368}
369
370// ===== impl Shared =====
371
372impl Shared {
373    fn send_ping(&mut self) {
374        match self.ping_pong.send_ping(Ping::opaque()) {
375            Ok(()) => {
376                self.ping_sent_at = Some(Instant::now());
377                trace!("sent ping");
378            }
379            Err(err) => {
380                debug!("error sending ping: {}", err);
381            }
382        }
383    }
384
385    fn is_ping_sent(&self) -> bool {
386        self.ping_sent_at.is_some()
387    }
388
389    #[cfg(feature = "runtime")]
390    fn update_last_read_at(&mut self) {
391        if self.last_read_at.is_some() {
392            self.last_read_at = Some(Instant::now());
393        }
394    }
395
396    #[cfg(feature = "runtime")]
397    fn last_read_at(&self) -> Instant {
398        self.last_read_at.expect("keep_alive expects last_read_at")
399    }
400}
401
402// ===== impl Bdp =====
403
404/// Any higher than this likely will be hitting the TCP flow control.
405const BDP_LIMIT: usize = 1024 * 1024 * 16;
406
407impl Bdp {
408    fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
409        // No need to do any math if we're at the limit.
410        if self.bdp as usize == BDP_LIMIT {
411            self.stabilize_delay();
412            return None;
413        }
414
415        // average the rtt
416        let rtt = seconds(rtt);
417        if self.rtt == 0.0 {
418            // First sample means rtt is first rtt.
419            self.rtt = rtt;
420        } else {
421            // Weigh this rtt as 1/8 for a moving average.
422            self.rtt += (rtt - self.rtt) * 0.125;
423        }
424
425        // calculate the current bandwidth
426        let bw = (bytes as f64) / (self.rtt * 1.5);
427        trace!("current bandwidth = {:.1}B/s", bw);
428
429        if bw < self.max_bandwidth {
430            // not a faster bandwidth, so don't update
431            self.stabilize_delay();
432            return None;
433        } else {
434            self.max_bandwidth = bw;
435        }
436
437        // if the current `bytes` sample is at least 2/3 the previous
438        // bdp, increase to double the current sample.
439        if bytes >= self.bdp as usize * 2 / 3 {
440            self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
441            trace!("BDP increased to {}", self.bdp);
442
443            self.stable_count = 0;
444            self.ping_delay /= 2;
445            Some(self.bdp)
446        } else {
447            self.stabilize_delay();
448            None
449        }
450    }
451
452    fn stabilize_delay(&mut self) {
453        if self.ping_delay < Duration::from_secs(10) {
454            self.stable_count += 1;
455
456            if self.stable_count >= 2 {
457                self.ping_delay *= 4;
458                self.stable_count = 0;
459            }
460        }
461    }
462}
463
464fn seconds(dur: Duration) -> f64 {
465    const NANOS_PER_SEC: f64 = 1_000_000_000.0;
466    let secs = dur.as_secs() as f64;
467    secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
468}
469
470// ===== impl KeepAlive =====
471
472#[cfg(feature = "runtime")]
473impl KeepAlive {
474    fn schedule(&mut self, is_idle: bool, shared: &Shared) {
475        match self.state {
476            KeepAliveState::Init => {
477                if !self.while_idle && is_idle {
478                    return;
479                }
480
481                self.state = KeepAliveState::Scheduled;
482                let interval = shared.last_read_at() + self.interval;
483                self.timer.as_mut().reset(interval);
484            }
485            KeepAliveState::PingSent => {
486                if shared.is_ping_sent() {
487                    return;
488                }
489
490                self.state = KeepAliveState::Scheduled;
491                let interval = shared.last_read_at() + self.interval;
492                self.timer.as_mut().reset(interval);
493            }
494            KeepAliveState::Scheduled => (),
495        }
496    }
497
498    fn maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared) {
499        match self.state {
500            KeepAliveState::Scheduled => {
501                if Pin::new(&mut self.timer).poll(cx).is_pending() {
502                    return;
503                }
504                // check if we've received a frame while we were scheduled
505                if shared.last_read_at() + self.interval > self.timer.deadline() {
506                    self.state = KeepAliveState::Init;
507                    cx.waker().wake_by_ref(); // schedule us again
508                    return;
509                }
510                trace!("keep-alive interval ({:?}) reached", self.interval);
511                shared.send_ping();
512                self.state = KeepAliveState::PingSent;
513                let timeout = Instant::now() + self.timeout;
514                self.timer.as_mut().reset(timeout);
515            }
516            KeepAliveState::Init | KeepAliveState::PingSent => (),
517        }
518    }
519
520    fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
521        match self.state {
522            KeepAliveState::PingSent => {
523                if Pin::new(&mut self.timer).poll(cx).is_pending() {
524                    return Ok(());
525                }
526                trace!("keep-alive timeout ({:?}) reached", self.timeout);
527                Err(KeepAliveTimedOut)
528            }
529            KeepAliveState::Init | KeepAliveState::Scheduled => Ok(()),
530        }
531    }
532}
533
534// ===== impl KeepAliveTimedOut =====
535
536#[cfg(feature = "runtime")]
537impl KeepAliveTimedOut {
538    pub(super) fn crate_error(self) -> crate::Error {
539        crate::Error::new(crate::error::Kind::Http2).with(self)
540    }
541}
542
543#[cfg(feature = "runtime")]
544impl fmt::Display for KeepAliveTimedOut {
545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546        f.write_str("keep-alive timed out")
547    }
548}
549
550#[cfg(feature = "runtime")]
551impl std::error::Error for KeepAliveTimedOut {
552    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
553        Some(&crate::error::TimedOut)
554    }
555}