1use crate::frame::Reason;
2use crate::proto::{WindowSize, MAX_WINDOW_SIZE};
34use std::fmt;
56// We don't want to send WINDOW_UPDATE frames for tiny changes, but instead
7// aggregate them when the changes are significant. Many implementations do
8// this by keeping a "ratio" of the update version the allowed window size.
9//
10// While some may wish to represent this ratio as percentage, using a f32,
11// we skip having to deal with float math and stick to integers. To do so,
12// the "ratio" is represented by 2 i32s, split into the numerator and
13// denominator. For example, a 50% ratio is simply represented as 1/2.
14//
15// An example applying this ratio: If a stream has an allowed window size of
16// 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change
17// becomes greater than 1/2, or 50 bytes.
18const UNCLAIMED_NUMERATOR: i32 = 1;
19const UNCLAIMED_DENOMINATOR: i32 = 2;
2021#[test]
22#[allow(clippy::assertions_on_constants)]
23fn sanity_unclaimed_ratio() {
24assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR);
25assert!(UNCLAIMED_NUMERATOR >= 0);
26assert!(UNCLAIMED_DENOMINATOR > 0);
27}
2829#[derive(Copy, Clone, Debug)]
30pub struct FlowControl {
31/// Window the peer knows about.
32 ///
33 /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received.
34 ///
35 /// For example, say the peer sends a request and uses 32kb of the window.
36 /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust
37 /// its understanding of the capacity of the window, and that would be:
38 ///
39 /// ```notrust
40 /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb
41 /// ```
42window_size: Window,
4344/// Window that we know about.
45 ///
46 /// This can go negative if a user declares a smaller target window than
47 /// the peer knows about.
48available: Window,
49}
5051impl FlowControl {
52pub fn new() -> FlowControl {
53 FlowControl {
54 window_size: Window(0),
55 available: Window(0),
56 }
57 }
5859/// Returns the window size as known by the peer
60pub fn window_size(&self) -> WindowSize {
61self.window_size.as_size()
62 }
6364/// Returns the window size available to the consumer
65pub fn available(&self) -> Window {
66self.available
67 }
6869/// Returns true if there is unavailable window capacity
70pub fn has_unavailable(&self) -> bool {
71if self.window_size < 0 {
72return false;
73 }
7475self.window_size > self.available
76 }
7778pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
79self.available.decrease_by(capacity)
80 }
8182pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
83self.available.increase_by(capacity)
84 }
8586/// If a WINDOW_UPDATE frame should be sent, returns a positive number
87 /// representing the increment to be used.
88 ///
89 /// If there is no available bytes to be reclaimed, or the number of
90 /// available bytes does not reach the threshold, this returns `None`.
91 ///
92 /// This represents pending outbound WINDOW_UPDATE frames.
93pub fn unclaimed_capacity(&self) -> Option<WindowSize> {
94let available = self.available;
9596if self.window_size >= available {
97return None;
98 }
99100let unclaimed = available.0 - self.window_size.0;
101let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
102103if unclaimed < threshold {
104None
105} else {
106Some(unclaimed as WindowSize)
107 }
108 }
109110/// Increase the window size.
111 ///
112 /// This is called after receiving a WINDOW_UPDATE frame
113pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
114let (val, overflow) = self.window_size.0.overflowing_add(sz as i32);
115116if overflow {
117return Err(Reason::FLOW_CONTROL_ERROR);
118 }
119120if val > MAX_WINDOW_SIZE as i32 {
121return Err(Reason::FLOW_CONTROL_ERROR);
122 }
123124tracing::trace!(
125"inc_window; sz={}; old={}; new={}",
126 sz,
127self.window_size,
128 val
129 );
130131self.window_size = Window(val);
132Ok(())
133 }
134135/// Decrement the send-side window size.
136 ///
137 /// This is called after receiving a SETTINGS frame with a lower
138 /// INITIAL_WINDOW_SIZE value.
139pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
140tracing::trace!(
141"dec_window; sz={}; window={}, available={}",
142 sz,
143self.window_size,
144self.available
145 );
146// ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
147self.window_size.decrease_by(sz)?;
148Ok(())
149 }
150151/// Decrement the recv-side window size.
152 ///
153 /// This is called after receiving a SETTINGS ACK frame with a lower
154 /// INITIAL_WINDOW_SIZE value.
155pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
156tracing::trace!(
157"dec_recv_window; sz={}; window={}, available={}",
158 sz,
159self.window_size,
160self.available
161 );
162// This should not be able to overflow `window_size` from the bottom.
163self.window_size.decrease_by(sz)?;
164self.available.decrease_by(sz)?;
165Ok(())
166 }
167168/// Decrements the window reflecting data has actually been sent. The caller
169 /// must ensure that the window has capacity.
170pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
171tracing::trace!(
172"send_data; sz={}; window={}; available={}",
173 sz,
174self.window_size,
175self.available
176 );
177178// If send size is zero it's meaningless to update flow control window
179if sz > 0 {
180// Ensure that the argument is correct
181assert!(self.window_size.0 >= sz as i32);
182183// Update values
184self.window_size.decrease_by(sz)?;
185self.available.decrease_by(sz)?;
186 }
187Ok(())
188 }
189}
190191/// The current capacity of a flow-controlled Window.
192///
193/// This number can go negative when either side has used a certain amount
194/// of capacity when the other side advertises a reduction in size.
195///
196/// This type tries to centralize the knowledge of addition and subtraction
197/// to this capacity, instead of having integer casts throughout the source.
198#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd)]
199pub struct Window(i32);
200201impl Window {
202pub fn as_size(&self) -> WindowSize {
203if self.0 < 0 {
2040
205} else {
206self.0 as WindowSize
207 }
208 }
209210pub fn checked_size(&self) -> WindowSize {
211assert!(self.0 >= 0, "negative Window");
212self.0 as WindowSize
213 }
214215pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
216if let Some(v) = self.0.checked_sub(other as i32) {
217self.0 = v;
218Ok(())
219 } else {
220Err(Reason::FLOW_CONTROL_ERROR)
221 }
222 }
223224pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
225let other = self.add(other)?;
226self.0 = other.0;
227Ok(())
228 }
229230pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
231if let Some(v) = self.0.checked_add(other as i32) {
232Ok(Self(v))
233 } else {
234Err(Reason::FLOW_CONTROL_ERROR)
235 }
236 }
237}
238239impl PartialEq<usize> for Window {
240fn eq(&self, other: &usize) -> bool {
241if self.0 < 0 {
242false
243} else {
244 (self.0 as usize).eq(other)
245 }
246 }
247}
248249impl PartialOrd<usize> for Window {
250fn partial_cmp(&self, other: &usize) -> Option<::std::cmp::Ordering> {
251if self.0 < 0 {
252Some(::std::cmp::Ordering::Less)
253 } else {
254 (self.0 as usize).partial_cmp(other)
255 }
256 }
257}
258259impl fmt::Display for Window {
260fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261 fmt::Display::fmt(&self.0, f)
262 }
263}
264265impl From<Window> for isize {
266fn from(w: Window) -> isize {
267 w.0 as isize
268 }
269}