sketches_ddsketch/
store.rs
1#[cfg(feature = "use_serde")]
2use serde::{Deserialize, Serialize};
3
4const CHUNK_SIZE: i32 = 128;
5
6fn div_ceil(dividend: i32, divisor: i32) -> i32 {
10 (dividend + divisor - 1) / divisor
11}
12
13#[derive(Clone, Debug)]
15#[cfg_attr(feature = "use_serde", derive(Serialize, Deserialize))]
16pub struct Store {
17 bins: Vec<u64>,
18 count: u64,
19 min_key: i32,
20 max_key: i32,
21 offset: i32,
22 bin_limit: usize,
23 is_collapsed: bool,
24}
25
26impl Store {
27 pub fn new(bin_limit: usize) -> Self {
28 Store {
29 bins: Vec::new(),
30 count: 0,
31 min_key: i32::MAX,
32 max_key: i32::MIN,
33 offset: 0,
34 bin_limit,
35 is_collapsed: false,
36 }
37 }
38
39 pub fn length(&self) -> i32 {
41 self.bins.len() as i32
42 }
43
44 pub fn is_empty(&self) -> bool {
45 self.bins.is_empty()
46 }
47
48 pub fn add(&mut self, key: i32) {
49 let idx = self.get_index(key);
50 self.bins[idx] += 1;
51 self.count += 1;
52 }
53
54 fn get_index(&mut self, key: i32) -> usize {
55 if key < self.min_key {
56 if self.is_collapsed {
57 return 0;
58 }
59
60 self.extend_range(key, None);
61 if self.is_collapsed {
62 return 0;
63 }
64 } else if key > self.max_key {
65 self.extend_range(key, None);
66 }
67
68 (key - self.offset) as usize
69 }
70
71 fn extend_range(&mut self, key: i32, second_key: Option<i32>) {
72 let second_key = second_key.unwrap_or(key);
73 let new_min_key = i32::min(key, i32::min(second_key, self.min_key));
74 let new_max_key = i32::max(key, i32::max(second_key, self.max_key));
75
76 if self.is_empty() {
77 let new_len = self.get_new_length(new_min_key, new_max_key);
78 self.bins.resize(new_len, 0);
79 self.offset = new_min_key;
80 self.adjust(new_min_key, new_max_key);
81 } else if new_min_key >= self.min_key && new_max_key < self.offset + self.length() {
82 self.min_key = new_min_key;
83 self.max_key = new_max_key;
84 } else {
85 let new_length = self.get_new_length(new_min_key, new_max_key);
87 if new_length > self.length() as usize {
88 self.bins.resize(new_length, 0);
89 }
90 self.adjust(new_min_key, new_max_key);
91 }
92 }
93
94 fn get_new_length(&self, new_min_key: i32, new_max_key: i32) -> usize {
95 let desired_length = new_max_key - new_min_key + 1;
96 usize::min(
97 (CHUNK_SIZE * div_ceil(desired_length, CHUNK_SIZE)) as usize,
98 self.bin_limit,
99 )
100 }
101
102 fn adjust(&mut self, new_min_key: i32, new_max_key: i32) {
103 if new_max_key - new_min_key + 1 > self.length() {
104 let new_min_key = new_max_key - self.length() + 1;
105
106 if new_min_key >= self.max_key {
107 self.offset = new_min_key;
109 self.min_key = new_min_key;
110 self.bins.fill(0);
111 self.bins[0] = self.count;
112 } else {
113 let shift = self.offset - new_min_key;
114 if shift < 0 {
115 let collapse_start_index = (self.min_key - self.offset) as usize;
116 let collapse_end_index = (new_min_key - self.offset) as usize;
117 let collapsed_count: u64 = self.bins[collapse_start_index..collapse_end_index]
118 .iter()
119 .sum();
120 let zero_len = (new_min_key - self.min_key) as usize;
121 self.bins.splice(
122 collapse_start_index..collapse_end_index,
123 std::iter::repeat(0).take(zero_len),
124 );
125 self.bins[collapse_end_index] += collapsed_count;
126 }
127 self.min_key = new_min_key;
128 self.shift_bins(shift);
129 }
130
131 self.max_key = new_max_key;
132 self.is_collapsed = true;
133 } else {
134 self.center_bins(new_min_key, new_max_key);
135 self.min_key = new_min_key;
136 self.max_key = new_max_key;
137 }
138 }
139
140 fn shift_bins(&mut self, shift: i32) {
141 if shift > 0 {
142 let shift = shift as usize;
143 self.bins.rotate_right(shift);
144 for idx in 0..shift {
145 self.bins[idx] = 0;
146 }
147 } else {
148 let shift = shift.abs() as usize;
149 for idx in 0..shift {
150 self.bins[idx] = 0;
151 }
152 self.bins.rotate_left(shift);
153 }
154
155 self.offset -= shift;
156 }
157
158 fn center_bins(&mut self, new_min_key: i32, new_max_key: i32) {
159 let middle_key = new_min_key + (new_max_key - new_min_key + 1) / 2;
160 let shift = self.offset + self.length() / 2 - middle_key;
161 self.shift_bins(shift)
162 }
163
164 pub fn key_at_rank(&self, rank: u64) -> i32 {
165 let mut n = 0;
166 for (i, bin) in self.bins.iter().enumerate() {
167 n += *bin;
168 if n > rank {
169 return i as i32 + self.offset;
170 }
171 }
172
173 self.max_key
174 }
175
176 pub fn count(&self) -> u64 {
177 self.count
178 }
179
180 pub fn merge(&mut self, other: &Store) {
181 if other.count == 0 {
182 return;
183 }
184
185 if self.count == 0 {
186 self.copy(other);
187 return;
188 }
189
190 if other.min_key < self.min_key || other.max_key > self.max_key {
191 self.extend_range(other.min_key, Some(other.max_key));
192 }
193
194 let collapse_start_index = other.min_key - other.offset;
195 let mut collapse_end_index = i32::min(self.min_key, other.max_key + 1) - other.offset;
196 if collapse_end_index > collapse_start_index {
197 let collapsed_count: u64 = self.bins
198 [collapse_start_index as usize..collapse_end_index as usize]
199 .iter()
200 .sum();
201 self.bins[0] += collapsed_count;
202 } else {
203 collapse_end_index = collapse_start_index;
204 }
205
206 for key in (collapse_end_index + other.offset)..(other.max_key + 1) {
207 self.bins[(key - self.offset) as usize] += other.bins[(key - other.offset) as usize]
208 }
209
210 self.count += other.count;
211 }
212
213 fn copy(&mut self, o: &Store) {
214 self.bins = o.bins.clone();
215 self.count = o.count;
216 self.min_key = o.min_key;
217 self.max_key = o.max_key;
218 self.offset = o.offset;
219 self.bin_limit = o.bin_limit;
220 self.is_collapsed = o.is_collapsed;
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use crate::store::Store;
227
228 #[test]
229 fn test_simple_store() {
230 let mut s = Store::new(2048);
231
232 for i in 0..2048 {
233 s.add(i);
234 }
235 }
236
237 #[test]
238 fn test_simple_store_rev() {
239 let mut s = Store::new(2048);
240
241 for i in 2048..0 {
242 s.add(i);
243 }
244 }
245}