sketches_ddsketch/
store.rs

1#[cfg(feature = "use_serde")]
2use serde::{Deserialize, Serialize};
3
4const CHUNK_SIZE: i32 = 128;
5
6// Divide the `dividend` by the `divisor`, rounding towards positive infinity.
7//
8// Similar to the nightly only `std::i32::div_ceil`.
9fn div_ceil(dividend: i32, divisor: i32) -> i32 {
10    (dividend + divisor - 1) / divisor
11}
12
13/// CollapsingLowestDenseStore
14#[derive(Clone, Debug)]
15#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
16pub struct Store {
17    bins: Vec<u64>,
18    count: u64,
19    min_key: i32,
20    max_key: i32,
21    offset: i32,
22    bin_limit: usize,
23    is_collapsed: bool,
24}
25
26impl Store {
27    pub fn new(bin_limit: usize) -> Self {
28        Store {
29            bins: Vec::new(),
30            count: 0,
31            min_key: i32::MAX,
32            max_key: i32::MIN,
33            offset: 0,
34            bin_limit,
35            is_collapsed: false,
36        }
37    }
38
39    /// Return the number of bins.
40    pub fn length(&self) -> i32 {
41        self.bins.len() as i32
42    }
43
44    pub fn is_empty(&self) -> bool {
45        self.bins.is_empty()
46    }
47
48    pub fn add(&mut self, key: i32) {
49        let idx = self.get_index(key);
50        self.bins[idx] += 1;
51        self.count += 1;
52    }
53
54    fn get_index(&mut self, key: i32) -> usize {
55        if key < self.min_key {
56            if self.is_collapsed {
57                return 0;
58            }
59
60            self.extend_range(key, None);
61            if self.is_collapsed {
62                return 0;
63            }
64        } else if key > self.max_key {
65            self.extend_range(key, None);
66        }
67
68        (key - self.offset) as usize
69    }
70
71    fn extend_range(&mut self, key: i32, second_key: Option<i32>) {
72        let second_key = second_key.unwrap_or(key);
73        let new_min_key = i32::min(key, i32::min(second_key, self.min_key));
74        let new_max_key = i32::max(key, i32::max(second_key, self.max_key));
75
76        if self.is_empty() {
77            let new_len = self.get_new_length(new_min_key, new_max_key);
78            self.bins.resize(new_len, 0);
79            self.offset = new_min_key;
80            self.adjust(new_min_key, new_max_key);
81        } else if new_min_key >= self.min_key && new_max_key < self.offset + self.length() {
82            self.min_key = new_min_key;
83            self.max_key = new_max_key;
84        } else {
85            // Grow bins
86            let new_length = self.get_new_length(new_min_key, new_max_key);
87            if new_length > self.length() as usize {
88                self.bins.resize(new_length, 0);
89            }
90            self.adjust(new_min_key, new_max_key);
91        }
92    }
93
94    fn get_new_length(&self, new_min_key: i32, new_max_key: i32) -> usize {
95        let desired_length = new_max_key - new_min_key + 1;
96        usize::min(
97            (CHUNK_SIZE * div_ceil(desired_length, CHUNK_SIZE)) as usize,
98            self.bin_limit,
99        )
100    }
101
102    fn adjust(&mut self, new_min_key: i32, new_max_key: i32) {
103        if new_max_key - new_min_key + 1 > self.length() {
104            let new_min_key = new_max_key - self.length() + 1;
105
106            if new_min_key >= self.max_key {
107                // Put everything in the first bin.
108                self.offset = new_min_key;
109                self.min_key = new_min_key;
110                self.bins.fill(0);
111                self.bins[0] = self.count;
112            } else {
113                let shift = self.offset - new_min_key;
114                if shift < 0 {
115                    let collapse_start_index = (self.min_key - self.offset) as usize;
116                    let collapse_end_index = (new_min_key - self.offset) as usize;
117                    let collapsed_count: u64 = self.bins[collapse_start_index..collapse_end_index]
118                        .iter()
119                        .sum();
120                    let zero_len = (new_min_key - self.min_key) as usize;
121                    self.bins.splice(
122                        collapse_start_index..collapse_end_index,
123                        std::iter::repeat(0).take(zero_len),
124                    );
125                    self.bins[collapse_end_index] += collapsed_count;
126                }
127                self.min_key = new_min_key;
128                self.shift_bins(shift);
129            }
130
131            self.max_key = new_max_key;
132            self.is_collapsed = true;
133        } else {
134            self.center_bins(new_min_key, new_max_key);
135            self.min_key = new_min_key;
136            self.max_key = new_max_key;
137        }
138    }
139
140    fn shift_bins(&mut self, shift: i32) {
141        if shift > 0 {
142            let shift = shift as usize;
143            self.bins.rotate_right(shift);
144            for idx in 0..shift {
145                self.bins[idx] = 0;
146            }
147        } else {
148            let shift = shift.abs() as usize;
149            for idx in 0..shift {
150                self.bins[idx] = 0;
151            }
152            self.bins.rotate_left(shift);
153        }
154
155        self.offset -= shift;
156    }
157
158    fn center_bins(&mut self, new_min_key: i32, new_max_key: i32) {
159        let middle_key = new_min_key + (new_max_key - new_min_key + 1) / 2;
160        let shift = self.offset + self.length() / 2 - middle_key;
161        self.shift_bins(shift)
162    }
163
164    pub fn key_at_rank(&self, rank: u64) -> i32 {
165        let mut n = 0;
166        for (i, bin) in self.bins.iter().enumerate() {
167            n += *bin;
168            if n > rank {
169                return i as i32 + self.offset;
170            }
171        }
172
173        self.max_key
174    }
175
176    pub fn count(&self) -> u64 {
177        self.count
178    }
179
180    pub fn merge(&mut self, other: &Store) {
181        if other.count == 0 {
182            return;
183        }
184
185        if self.count == 0 {
186            self.copy(other);
187            return;
188        }
189
190        if other.min_key < self.min_key || other.max_key > self.max_key {
191            self.extend_range(other.min_key, Some(other.max_key));
192        }
193
194        let collapse_start_index = other.min_key - other.offset;
195        let mut collapse_end_index = i32::min(self.min_key, other.max_key + 1) - other.offset;
196        if collapse_end_index > collapse_start_index {
197            let collapsed_count: u64 = self.bins
198                [collapse_start_index as usize..collapse_end_index as usize]
199                .iter()
200                .sum();
201            self.bins[0] += collapsed_count;
202        } else {
203            collapse_end_index = collapse_start_index;
204        }
205
206        for key in (collapse_end_index + other.offset)..(other.max_key + 1) {
207            self.bins[(key - self.offset) as usize] += other.bins[(key - other.offset) as usize]
208        }
209
210        self.count += other.count;
211    }
212
213    fn copy(&mut self, o: &Store) {
214        self.bins = o.bins.clone();
215        self.count = o.count;
216        self.min_key = o.min_key;
217        self.max_key = o.max_key;
218        self.offset = o.offset;
219        self.bin_limit = o.bin_limit;
220        self.is_collapsed = o.is_collapsed;
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use crate::store::Store;
227
228    #[test]
229    fn test_simple_store() {
230        let mut s = Store::new(2048);
231
232        for i in 0..2048 {
233            s.add(i);
234        }
235    }
236
237    #[test]
238    fn test_simple_store_rev() {
239        let mut s = Store::new(2048);
240
241        for i in 2048..0 {
242            s.add(i);
243        }
244    }
245}