h2/proto/streams/
flow_control.rs

1use crate::frame::Reason;
2use crate::proto::{WindowSize, MAX_WINDOW_SIZE};
3
4use std::fmt;
5
6// We don't want to send WINDOW_UPDATE frames for tiny changes, but instead
7// aggregate them when the changes are significant. Many implementations do
8// this by keeping a "ratio" of the update version the allowed window size.
9//
10// While some may wish to represent this ratio as percentage, using a f32,
11// we skip having to deal with float math and stick to integers. To do so,
12// the "ratio" is represented by 2 i32s, split into the numerator and
13// denominator. For example, a 50% ratio is simply represented as 1/2.
14//
15// An example applying this ratio: If a stream has an allowed window size of
16// 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change
17// becomes greater than 1/2, or 50 bytes.
18const UNCLAIMED_NUMERATOR: i32 = 1;
19const UNCLAIMED_DENOMINATOR: i32 = 2;
20
21#[test]
22#[allow(clippy::assertions_on_constants)]
23fn sanity_unclaimed_ratio() {
24    assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
25    assert!(UNCLAIMED_NUMERATOR >= 0);
26    assert!(UNCLAIMED_DENOMINATOR > 0);
27}
28
29#[derive(Copy, Clone, Debug)]
30pub struct FlowControl {
31    /// Window the peer knows about.
32    ///
33    /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received.
34    ///
35    /// For example, say the peer sends a request and uses 32kb of the window.
36    /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust
37    /// its understanding of the capacity of the window, and that would be:
38    ///
39    /// ```notrust
40    /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb
41    /// ```
42    window_size: Window,
43
44    /// Window that we know about.
45    ///
46    /// This can go negative if a user declares a smaller target window than
47    /// the peer knows about.
48    available: Window,
49}
50
51impl FlowControl {
52    pub fn new() -> FlowControl {
53        FlowControl {
54            window_size: Window(0),
55            available: Window(0),
56        }
57    }
58
59    /// Returns the window size as known by the peer
60    pub fn window_size(&self) -> WindowSize {
61        self.window_size.as_size()
62    }
63
64    /// Returns the window size available to the consumer
65    pub fn available(&self) -> Window {
66        self.available
67    }
68
69    /// Returns true if there is unavailable window capacity
70    pub fn has_unavailable(&self) -> bool {
71        if self.window_size < 0 {
72            return false;
73        }
74
75        self.window_size > self.available
76    }
77
78    pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
79        self.available.decrease_by(capacity)
80    }
81
82    pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
83        self.available.increase_by(capacity)
84    }
85
86    /// If a WINDOW_UPDATE frame should be sent, returns a positive number
87    /// representing the increment to be used.
88    ///
89    /// If there is no available bytes to be reclaimed, or the number of
90    /// available bytes does not reach the threshold, this returns `None`.
91    ///
92    /// This represents pending outbound WINDOW_UPDATE frames.
93    pub fn unclaimed_capacity(&self) -> Option<WindowSize> {
94        let available = self.available;
95
96        if self.window_size >= available {
97            return None;
98        }
99
100        let unclaimed = available.0 - self.window_size.0;
101        let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
102
103        if unclaimed < threshold {
104            None
105        } else {
106            Some(unclaimed as WindowSize)
107        }
108    }
109
110    /// Increase the window size.
111    ///
112    /// This is called after receiving a WINDOW_UPDATE frame
113    pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
114        let (val, overflow) = self.window_size.0.overflowing_add(sz as i32);
115
116        if overflow {
117            return Err(Reason::FLOW_CONTROL_ERROR);
118        }
119
120        if val > MAX_WINDOW_SIZE as i32 {
121            return Err(Reason::FLOW_CONTROL_ERROR);
122        }
123
124        tracing::trace!(
125            "inc_window; sz={}; old={}; new={}",
126            sz,
127            self.window_size,
128            val
129        );
130
131        self.window_size = Window(val);
132        Ok(())
133    }
134
135    /// Decrement the send-side window size.
136    ///
137    /// This is called after receiving a SETTINGS frame with a lower
138    /// INITIAL_WINDOW_SIZE value.
139    pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
140        tracing::trace!(
141            "dec_window; sz={}; window={}, available={}",
142            sz,
143            self.window_size,
144            self.available
145        );
146        // ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
147        self.window_size.decrease_by(sz)?;
148        Ok(())
149    }
150
151    /// Decrement the recv-side window size.
152    ///
153    /// This is called after receiving a SETTINGS ACK frame with a lower
154    /// INITIAL_WINDOW_SIZE value.
155    pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
156        tracing::trace!(
157            "dec_recv_window; sz={}; window={}, available={}",
158            sz,
159            self.window_size,
160            self.available
161        );
162        // This should not be able to overflow `window_size` from the bottom.
163        self.window_size.decrease_by(sz)?;
164        self.available.decrease_by(sz)?;
165        Ok(())
166    }
167
168    /// Decrements the window reflecting data has actually been sent. The caller
169    /// must ensure that the window has capacity.
170    pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
171        tracing::trace!(
172            "send_data; sz={}; window={}; available={}",
173            sz,
174            self.window_size,
175            self.available
176        );
177
178        // If send size is zero it's meaningless to update flow control window
179        if sz > 0 {
180            // Ensure that the argument is correct
181            assert!(self.window_size.0 >= sz as i32);
182
183            // Update values
184            self.window_size.decrease_by(sz)?;
185            self.available.decrease_by(sz)?;
186        }
187        Ok(())
188    }
189}
190
191/// The current capacity of a flow-controlled Window.
192///
193/// This number can go negative when either side has used a certain amount
194/// of capacity when the other side advertises a reduction in size.
195///
196/// This type tries to centralize the knowledge of addition and subtraction
197/// to this capacity, instead of having integer casts throughout the source.
198#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)]
199pub struct Window(i32);
200
201impl Window {
202    pub fn as_size(&self) -> WindowSize {
203        if self.0 < 0 {
204            0
205        } else {
206            self.0 as WindowSize
207        }
208    }
209
210    pub fn checked_size(&self) -> WindowSize {
211        assert!(self.0 >= 0, "negative Window");
212        self.0 as WindowSize
213    }
214
215    pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
216        if let Some(v) = self.0.checked_sub(other as i32) {
217            self.0 = v;
218            Ok(())
219        } else {
220            Err(Reason::FLOW_CONTROL_ERROR)
221        }
222    }
223
224    pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
225        let other = self.add(other)?;
226        self.0 = other.0;
227        Ok(())
228    }
229
230    pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
231        if let Some(v) = self.0.checked_add(other as i32) {
232            Ok(Self(v))
233        } else {
234            Err(Reason::FLOW_CONTROL_ERROR)
235        }
236    }
237}
238
239impl PartialEq<usize> for Window {
240    fn eq(&self, other: &usize) -> bool {
241        if self.0 < 0 {
242            false
243        } else {
244            (self.0 as usize).eq(other)
245        }
246    }
247}
248
249impl PartialOrd<usize> for Window {
250    fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> {
251        if self.0 < 0 {
252            Some(::std::cmp::Ordering::Less)
253        } else {
254            (self.0 as usize).partial_cmp(other)
255        }
256    }
257}
258
259impl fmt::Display for Window {
260    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261        fmt::Display::fmt(&self.0, f)
262    }
263}
264
265impl From<Window> for isize {
266    fn from(w: Window) -> isize {
267        w.0 as isize
268    }
269}