1mod storage;
4use std::{
5 hash::BuildHasherDefault,
6 iter::repeat,
7 sync::{PoisonError, RwLock},
8};
9
10use hashbrown::{hash_map::RawEntryMut, HashMap};
11use metrics::{Key, KeyHasher};
12pub use storage::{AtomicStorage, Storage};
13
14#[cfg(feature = "recency")]
15mod recency;
16
17#[cfg(feature = "recency")]
18#[cfg_attr(docsrs, doc(cfg(feature = "recency")))]
19pub use recency::{
20 Generation, Generational, GenerationalAtomicStorage, GenerationalStorage, Recency,
21};
22
23use crate::Hashable;
24
25type RegistryHasher = KeyHasher;
26type RegistryHashMap<K, V> = HashMap<K, V, BuildHasherDefault<RegistryHasher>>;
27
28pub struct Registry<K, S>
49where
50 S: Storage<K>,
51{
52 counters: Vec<RwLock<RegistryHashMap<K, S::Counter>>>,
53 gauges: Vec<RwLock<RegistryHashMap<K, S::Gauge>>>,
54 histograms: Vec<RwLock<RegistryHashMap<K, S::Histogram>>>,
55 shard_mask: usize,
56 storage: S,
57}
58
59impl Registry<Key, AtomicStorage> {
60 pub fn atomic() -> Self {
62 let shard_count = std::cmp::max(1, num_cpus::get()).next_power_of_two();
63 let shard_mask = shard_count - 1;
64 let counters =
65 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
66 let gauges =
67 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
68 let histograms =
69 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
70
71 Self { counters, gauges, histograms, shard_mask, storage: AtomicStorage }
72 }
73}
74
75impl<K, S> Registry<K, S>
76where
77 S: Storage<K>,
78{
79 pub fn new(storage: S) -> Self {
81 let shard_count = std::cmp::max(1, num_cpus::get()).next_power_of_two();
82 let shard_mask = shard_count - 1;
83 let counters =
84 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
85 let gauges =
86 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
87 let histograms =
88 repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
89
90 Self { counters, gauges, histograms, shard_mask, storage }
91 }
92
93 pub fn clear(&self) {
98 for shard in &self.counters {
99 shard.write().unwrap_or_else(PoisonError::into_inner).clear();
100 }
101 for shard in &self.gauges {
102 shard.write().unwrap_or_else(PoisonError::into_inner).clear();
103 }
104 for shard in &self.histograms {
105 shard.write().unwrap_or_else(PoisonError::into_inner).clear();
106 }
107 }
108
109 pub fn visit_counters<F>(&self, mut collect: F)
117 where
118 F: FnMut(&K, &S::Counter),
119 {
120 for subshard in self.counters.iter() {
121 let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
122 for (key, counter) in shard_read.iter() {
123 collect(key, counter);
124 }
125 }
126 }
127 pub fn visit_gauges<F>(&self, mut collect: F)
135 where
136 F: FnMut(&K, &S::Gauge),
137 {
138 for subshard in self.gauges.iter() {
139 let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
140 for (key, gauge) in shard_read.iter() {
141 collect(key, gauge);
142 }
143 }
144 }
145
146 pub fn visit_histograms<F>(&self, mut collect: F)
154 where
155 F: FnMut(&K, &S::Histogram),
156 {
157 for subshard in self.histograms.iter() {
158 let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
159 for (key, histogram) in shard_read.iter() {
160 collect(key, histogram);
161 }
162 }
163 }
164
165 pub fn retain_counters<F>(&self, mut f: F)
170 where
171 F: FnMut(&K, &S::Counter) -> bool,
172 {
173 for subshard in self.counters.iter() {
174 let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
175 shard_write.retain(|k, c| f(k, c));
176 }
177 }
178
179 pub fn retain_gauges<F>(&self, mut f: F)
184 where
185 F: FnMut(&K, &S::Gauge) -> bool,
186 {
187 for subshard in self.gauges.iter() {
188 let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
189 shard_write.retain(|k, g| f(k, g));
190 }
191 }
192
193 pub fn retain_histograms<F>(&self, mut f: F)
198 where
199 F: FnMut(&K, &S::Histogram) -> bool,
200 {
201 for subshard in self.histograms.iter() {
202 let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
203 shard_write.retain(|k, h| f(k, h));
204 }
205 }
206}
207
208impl<K, S> Registry<K, S>
209where
210 S: Storage<K>,
211 K: Hashable,
212{
213 #[inline]
214 fn get_hash_and_shard_for_counter(
215 &self,
216 key: &K,
217 ) -> (u64, &RwLock<RegistryHashMap<K, S::Counter>>) {
218 let hash = key.hashable();
219
220 let shard = unsafe { self.counters.get_unchecked(hash as usize & self.shard_mask) };
224
225 (hash, shard)
226 }
227
228 #[inline]
229 fn get_hash_and_shard_for_gauge(
230 &self,
231 key: &K,
232 ) -> (u64, &RwLock<RegistryHashMap<K, S::Gauge>>) {
233 let hash = key.hashable();
234
235 let shard = unsafe { self.gauges.get_unchecked(hash as usize & self.shard_mask) };
239
240 (hash, shard)
241 }
242
243 #[inline]
244 fn get_hash_and_shard_for_histogram(
245 &self,
246 key: &K,
247 ) -> (u64, &RwLock<RegistryHashMap<K, S::Histogram>>) {
248 let hash = key.hashable();
249
250 let shard = unsafe { self.histograms.get_unchecked(hash as usize & self.shard_mask) };
255
256 (hash, shard)
257 }
258}
259
260impl<K, S> Registry<K, S>
261where
262 S: Storage<K>,
263 K: Eq + Hashable,
264{
265 pub fn delete_counter(&self, key: &K) -> bool {
269 let (hash, shard) = self.get_hash_and_shard_for_counter(key);
270 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
271 let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
272 if let RawEntryMut::Occupied(entry) = entry {
273 let _ = entry.remove_entry();
274 return true;
275 }
276
277 false
278 }
279
280 pub fn delete_gauge(&self, key: &K) -> bool {
284 let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
285 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
286 let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
287 if let RawEntryMut::Occupied(entry) = entry {
288 let _ = entry.remove_entry();
289 return true;
290 }
291
292 false
293 }
294
295 pub fn delete_histogram(&self, key: &K) -> bool {
299 let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
300 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
301 let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
302 if let RawEntryMut::Occupied(entry) = entry {
303 let _ = entry.remove_entry();
304 return true;
305 }
306
307 false
308 }
309
310 pub fn get_counter(&self, key: &K) -> Option<S::Counter> {
312 let (hash, shard) = self.get_hash_and_shard_for_counter(key);
313 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
314 shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
315 }
316
317 pub fn get_gauge(&self, key: &K) -> Option<S::Gauge> {
319 let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
320 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
321 shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
322 }
323
324 pub fn get_histogram(&self, key: &K) -> Option<S::Histogram> {
326 let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
327 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
328 shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
329 }
330}
331
332impl<K, S> Registry<K, S>
333where
334 S: Storage<K>,
335 K: Clone + Eq + Hashable,
336{
337 pub fn get_or_create_counter<O, V>(&self, key: &K, op: O) -> V
342 where
343 O: FnOnce(&S::Counter) -> V,
344 {
345 let (hash, shard) = self.get_hash_and_shard_for_counter(key);
346
347 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
349 if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
350 op(v)
351 } else {
352 drop(shard_read);
354 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
355 let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
356 {
357 v
358 } else {
359 let (_, v) = shard_write
360 .raw_entry_mut()
361 .from_key_hashed_nocheck(hash, key)
362 .or_insert_with(|| (key.clone(), self.storage.counter(key)));
363
364 v
365 };
366
367 op(v)
368 }
369 }
370
371 pub fn get_or_create_gauge<O, V>(&self, key: &K, op: O) -> V
376 where
377 O: FnOnce(&S::Gauge) -> V,
378 {
379 let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
380
381 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
383 if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
384 op(v)
385 } else {
386 drop(shard_read);
388 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
389 let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
390 {
391 v
392 } else {
393 let (_, v) = shard_write
394 .raw_entry_mut()
395 .from_key_hashed_nocheck(hash, key)
396 .or_insert_with(|| (key.clone(), self.storage.gauge(key)));
397
398 v
399 };
400
401 op(v)
402 }
403 }
404
405 pub fn get_or_create_histogram<O, V>(&self, key: &K, op: O) -> V
410 where
411 O: FnOnce(&S::Histogram) -> V,
412 {
413 let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
414
415 let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
417 if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
418 op(v)
419 } else {
420 drop(shard_read);
422 let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
423 let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
424 {
425 v
426 } else {
427 let (_, v) = shard_write
428 .raw_entry_mut()
429 .from_key_hashed_nocheck(hash, key)
430 .or_insert_with(|| (key.clone(), self.storage.histogram(key)));
431
432 v
433 };
434
435 op(v)
436 }
437 }
438 pub fn get_counter_handles(&self) -> HashMap<K, S::Counter> {
442 let mut counters = HashMap::new();
443 self.visit_counters(|k, v| {
444 counters.insert(k.clone(), v.clone());
445 });
446 counters
447 }
448
449 pub fn get_gauge_handles(&self) -> HashMap<K, S::Gauge> {
453 let mut gauges = HashMap::new();
454 self.visit_gauges(|k, v| {
455 gauges.insert(k.clone(), v.clone());
456 });
457 gauges
458 }
459
460 pub fn get_histogram_handles(&self) -> HashMap<K, S::Histogram> {
464 let mut histograms = HashMap::new();
465 self.visit_histograms(|k, v| {
466 histograms.insert(k.clone(), v.clone());
467 });
468 histograms
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use metrics::{atomics::AtomicU64, CounterFn, Key};
475
476 use super::Registry;
477 use std::sync::{atomic::Ordering, Arc};
478
479 #[test]
480 fn test_registry() {
481 let registry = Registry::atomic();
482 let key = Key::from_name("foobar");
483
484 let entries = registry.get_counter_handles();
485 assert_eq!(entries.len(), 0);
486
487 assert!(registry.get_counter(&key).is_none());
488
489 registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
490
491 let initial_entries = registry.get_counter_handles();
492 assert_eq!(initial_entries.len(), 1);
493
494 let initial_entry: (Key, Arc<AtomicU64>) =
495 initial_entries.into_iter().next().expect("failed to get first entry");
496
497 let (ikey, ivalue) = initial_entry;
498 assert_eq!(ikey, key);
499 assert_eq!(ivalue.load(Ordering::SeqCst), 1);
500
501 registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
502
503 let updated_entries = registry.get_counter_handles();
504 assert_eq!(updated_entries.len(), 1);
505
506 let updated_entry: (Key, Arc<AtomicU64>) =
507 updated_entries.into_iter().next().expect("failed to get updated entry");
508
509 let (ukey, uvalue) = updated_entry;
510 assert_eq!(ukey, key);
511 assert_eq!(uvalue.load(Ordering::SeqCst), 2);
512
513 let value = registry.get_counter(&key).expect("failed to get entry");
514 assert!(Arc::ptr_eq(&value, &uvalue));
515
516 registry.get_or_create_counter(&Key::from_name("baz"), |_| ());
517 assert_eq!(registry.get_counter_handles().len(), 2);
518
519 let mut n = 0;
520 registry.retain_counters(|k, _| {
521 n += 1;
522 k.name().starts_with("foo")
523 });
524 assert_eq!(n, 2);
525 assert_eq!(registry.get_counter_handles().len(), 1);
526
527 assert!(registry.delete_counter(&key));
528
529 let entries = registry.get_counter_handles();
530 assert_eq!(entries.len(), 0);
531 }
532}