metrics_util/registry/
mod.rs

1//! High-performance metrics storage.
2
3mod storage;
4use std::{
5    hash::BuildHasherDefault,
6    iter::repeat,
7    sync::{PoisonError, RwLock},
8};
9
10use hashbrown::{hash_map::RawEntryMut, HashMap};
11use metrics::{Key, KeyHasher};
12pub use storage::{AtomicStorage, Storage};
13
14#[cfg(feature = "recency")]
15mod recency;
16
17#[cfg(feature = "recency")]
18#[cfg_attr(docsrs, doc(cfg(feature = "recency")))]
19pub use recency::{
20    Generation, Generational, GenerationalAtomicStorage, GenerationalStorage, Recency,
21};
22
23use crate::Hashable;
24
25type RegistryHasher = KeyHasher;
26type RegistryHashMap<K, V> = HashMap<K, V, BuildHasherDefault<RegistryHasher>>;
27
28/// A high-performance metric registry.
29///
30/// `Registry` provides the ability to maintain a central listing of metrics mapped by a given key.
31/// Metrics themselves are stored in the objects returned by `S`.
32///
33/// ## Using `Registry` as the basis of an exporter
34///
35/// As a reusable building blocking for building exporter implementations, users should look at
36/// [`Key`] and [`AtomicStorage`] to use for their key and storage, respectively.
37///
38/// These two implementations provide behavior that is suitable for most exporters, providing
39/// seamless integration with the existing key type used by the core
40/// [`Recorder`][metrics::Recorder] trait, as well as atomic storage for metrics.
41///
42/// In some cases, users may prefer [`GenerationalAtomicStorage`] when know if a metric has been
43/// touched, even if its value has not changed since the last time it was observed, is necessary.
44///
45/// ## Performance
46///
47/// `Registry` is optimized for reads.
48pub struct Registry<K, S>
49where
50    S: Storage<K>,
51{
52    counters: Vec<RwLock<RegistryHashMap<K, S::Counter>>>,
53    gauges: Vec<RwLock<RegistryHashMap<K, S::Gauge>>>,
54    histograms: Vec<RwLock<RegistryHashMap<K, S::Histogram>>>,
55    shard_mask: usize,
56    storage: S,
57}
58
59impl Registry<Key, AtomicStorage> {
60    /// Creates a new `Registry` using a regular [`Key`] and atomic storage.
61    pub fn atomic() -> Self {
62        let shard_count = std::cmp::max(1, num_cpus::get()).next_power_of_two();
63        let shard_mask = shard_count - 1;
64        let counters =
65            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
66        let gauges =
67            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
68        let histograms =
69            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
70
71        Self { counters, gauges, histograms, shard_mask, storage: AtomicStorage }
72    }
73}
74
75impl<K, S> Registry<K, S>
76where
77    S: Storage<K>,
78{
79    /// Creates a new `Registry`.
80    pub fn new(storage: S) -> Self {
81        let shard_count = std::cmp::max(1, num_cpus::get()).next_power_of_two();
82        let shard_mask = shard_count - 1;
83        let counters =
84            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
85        let gauges =
86            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
87        let histograms =
88            repeat(()).take(shard_count).map(|_| RwLock::new(RegistryHashMap::default())).collect();
89
90        Self { counters, gauges, histograms, shard_mask, storage }
91    }
92
93    /// Removes all metrics from the registry.
94    ///
95    /// This operation is eventually consistent: metrics will be removed piecemeal, and this method
96    /// does not ensure that callers will see the registry as entirely empty at any given point.
97    pub fn clear(&self) {
98        for shard in &self.counters {
99            shard.write().unwrap_or_else(PoisonError::into_inner).clear();
100        }
101        for shard in &self.gauges {
102            shard.write().unwrap_or_else(PoisonError::into_inner).clear();
103        }
104        for shard in &self.histograms {
105            shard.write().unwrap_or_else(PoisonError::into_inner).clear();
106        }
107    }
108
109    /// Visits every counter stored in this registry.
110    ///
111    /// This operation does not lock the entire registry, but proceeds directly through the
112    /// "subshards" that are kept internally.  As a result, all subshards will be visited, but a
113    /// metric that existed at the exact moment that `visit_counters` was called may not actually be observed
114    /// if it is deleted before that subshard is reached.  Likewise, a metric that is added after
115    /// the call to `visit_counters`, but before `visit_counters` finishes, may also not be observed.
116    pub fn visit_counters<F>(&self, mut collect: F)
117    where
118        F: FnMut(&K, &S::Counter),
119    {
120        for subshard in self.counters.iter() {
121            let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
122            for (key, counter) in shard_read.iter() {
123                collect(key, counter);
124            }
125        }
126    }
127    /// Visits every gauge stored in this registry.
128    ///
129    /// This operation does not lock the entire registry, but proceeds directly through the
130    /// "subshards" that are kept internally.  As a result, all subshards will be visited, but a
131    /// metric that existed at the exact moment that `visit_gauges` was called may not actually be observed
132    /// if it is deleted before that subshard is reached.  Likewise, a metric that is added after
133    /// the call to `visit_gauges`, but before `visit_gauges` finishes, may also not be observed.
134    pub fn visit_gauges<F>(&self, mut collect: F)
135    where
136        F: FnMut(&K, &S::Gauge),
137    {
138        for subshard in self.gauges.iter() {
139            let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
140            for (key, gauge) in shard_read.iter() {
141                collect(key, gauge);
142            }
143        }
144    }
145
146    /// Visits every histogram stored in this registry.
147    ///
148    /// This operation does not lock the entire registry, but proceeds directly through the
149    /// "subshards" that are kept internally.  As a result, all subshards will be visited, but a
150    /// metric that existed at the exact moment that `visit_histograms` was called may not actually be observed
151    /// if it is deleted before that subshard is reached.  Likewise, a metric that is added after
152    /// the call to `visit_histograms`, but before `visit_histograms` finishes, may also not be observed.
153    pub fn visit_histograms<F>(&self, mut collect: F)
154    where
155        F: FnMut(&K, &S::Histogram),
156    {
157        for subshard in self.histograms.iter() {
158            let shard_read = subshard.read().unwrap_or_else(PoisonError::into_inner);
159            for (key, histogram) in shard_read.iter() {
160                collect(key, histogram);
161            }
162        }
163    }
164
165    /// Retains only counters specified by the predicate.
166    ///
167    /// Remove all counters for which f(&k, &c) returns false. This operation proceeds
168    /// through the "subshards" in the same way as `visit_counters`.
169    pub fn retain_counters<F>(&self, mut f: F)
170    where
171        F: FnMut(&K, &S::Counter) -> bool,
172    {
173        for subshard in self.counters.iter() {
174            let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
175            shard_write.retain(|k, c| f(k, c));
176        }
177    }
178
179    /// Retains only gauges specified by the predicate.
180    ///
181    /// Remove all gauges for which f(&k, &g) returns false. This operation proceeds
182    /// through the "subshards" in the same way as `visit_gauges`.
183    pub fn retain_gauges<F>(&self, mut f: F)
184    where
185        F: FnMut(&K, &S::Gauge) -> bool,
186    {
187        for subshard in self.gauges.iter() {
188            let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
189            shard_write.retain(|k, g| f(k, g));
190        }
191    }
192
193    /// Retains only histograms specified by the predicate.
194    ///
195    /// Remove all histograms for which f(&k, &h) returns false. This operation proceeds
196    /// through the "subshards" in the same way as `visit_histograms`.
197    pub fn retain_histograms<F>(&self, mut f: F)
198    where
199        F: FnMut(&K, &S::Histogram) -> bool,
200    {
201        for subshard in self.histograms.iter() {
202            let mut shard_write = subshard.write().unwrap_or_else(PoisonError::into_inner);
203            shard_write.retain(|k, h| f(k, h));
204        }
205    }
206}
207
208impl<K, S> Registry<K, S>
209where
210    S: Storage<K>,
211    K: Hashable,
212{
213    #[inline]
214    fn get_hash_and_shard_for_counter(
215        &self,
216        key: &K,
217    ) -> (u64, &RwLock<RegistryHashMap<K, S::Counter>>) {
218        let hash = key.hashable();
219
220        // SAFETY: We initialize vector of subshards with a power-of-two value, and
221        // `self.shard_mask` is `self.counters.len() - 1`, thus we can never have a result from the
222        // masking operation that results in a value which is not in bounds of our subshards vector.
223        let shard = unsafe { self.counters.get_unchecked(hash as usize & self.shard_mask) };
224
225        (hash, shard)
226    }
227
228    #[inline]
229    fn get_hash_and_shard_for_gauge(
230        &self,
231        key: &K,
232    ) -> (u64, &RwLock<RegistryHashMap<K, S::Gauge>>) {
233        let hash = key.hashable();
234
235        // SAFETY: We initialize the vector of subshards with a power-of-two value, and
236        // `self.shard_mask` is `self.gauges.len() - 1`, thus we can never have a result from the
237        // masking operation that results in a value which is not in bounds of our subshards vector.
238        let shard = unsafe { self.gauges.get_unchecked(hash as usize & self.shard_mask) };
239
240        (hash, shard)
241    }
242
243    #[inline]
244    fn get_hash_and_shard_for_histogram(
245        &self,
246        key: &K,
247    ) -> (u64, &RwLock<RegistryHashMap<K, S::Histogram>>) {
248        let hash = key.hashable();
249
250        // SAFETY: We initialize the vector of subshards with a power-of-two value, and
251        // `self.shard_mask` is `self.histograms.len() - 1`, thus we can never have a result from
252        // the masking operation that results in a value which is not in bounds of our subshards
253        // vector.
254        let shard = unsafe { self.histograms.get_unchecked(hash as usize & self.shard_mask) };
255
256        (hash, shard)
257    }
258}
259
260impl<K, S> Registry<K, S>
261where
262    S: Storage<K>,
263    K: Eq + Hashable,
264{
265    /// Deletes a counter from the registry.
266    ///
267    /// Returns `true` if the counter existed and was removed, `false` otherwise.
268    pub fn delete_counter(&self, key: &K) -> bool {
269        let (hash, shard) = self.get_hash_and_shard_for_counter(key);
270        let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
271        let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
272        if let RawEntryMut::Occupied(entry) = entry {
273            let _ = entry.remove_entry();
274            return true;
275        }
276
277        false
278    }
279
280    /// Deletes a gauge from the registry.
281    ///
282    /// Returns `true` if the gauge existed and was removed, `false` otherwise.
283    pub fn delete_gauge(&self, key: &K) -> bool {
284        let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
285        let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
286        let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
287        if let RawEntryMut::Occupied(entry) = entry {
288            let _ = entry.remove_entry();
289            return true;
290        }
291
292        false
293    }
294
295    /// Deletes a histogram from the registry.
296    ///
297    /// Returns `true` if the histogram existed and was removed, `false` otherwise.
298    pub fn delete_histogram(&self, key: &K) -> bool {
299        let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
300        let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
301        let entry = shard_write.raw_entry_mut().from_key_hashed_nocheck(hash, key);
302        if let RawEntryMut::Occupied(entry) = entry {
303            let _ = entry.remove_entry();
304            return true;
305        }
306
307        false
308    }
309
310    /// Gets a copy of an existing counter.
311    pub fn get_counter(&self, key: &K) -> Option<S::Counter> {
312        let (hash, shard) = self.get_hash_and_shard_for_counter(key);
313        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
314        shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
315    }
316
317    /// Gets a copy of an existing gauge.
318    pub fn get_gauge(&self, key: &K) -> Option<S::Gauge> {
319        let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
320        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
321        shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
322    }
323
324    /// Gets a copy of an existing histogram.
325    pub fn get_histogram(&self, key: &K) -> Option<S::Histogram> {
326        let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
327        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
328        shard_read.raw_entry().from_key_hashed_nocheck(hash, key).map(|(_, v)| v.clone())
329    }
330}
331
332impl<K, S> Registry<K, S>
333where
334    S: Storage<K>,
335    K: Clone + Eq + Hashable,
336{
337    /// Gets or creates the given counter.
338    ///
339    /// The `op` function will be called for the counter under the given `key`, with the counter
340    /// first being created if it does not already exist.
341    pub fn get_or_create_counter<O, V>(&self, key: &K, op: O) -> V
342    where
343        O: FnOnce(&S::Counter) -> V,
344    {
345        let (hash, shard) = self.get_hash_and_shard_for_counter(key);
346
347        // Try and get the handle if it exists, running our operation if we succeed.
348        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
349        if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
350            op(v)
351        } else {
352            // Switch to write guard and insert the handle first.
353            drop(shard_read);
354            let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
355            let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
356            {
357                v
358            } else {
359                let (_, v) = shard_write
360                    .raw_entry_mut()
361                    .from_key_hashed_nocheck(hash, key)
362                    .or_insert_with(|| (key.clone(), self.storage.counter(key)));
363
364                v
365            };
366
367            op(v)
368        }
369    }
370
371    /// Gets or creates the given gauge.
372    ///
373    /// The `op` function will be called for the gauge under the given `key`, with the gauge
374    /// first being created if it does not already exist.
375    pub fn get_or_create_gauge<O, V>(&self, key: &K, op: O) -> V
376    where
377        O: FnOnce(&S::Gauge) -> V,
378    {
379        let (hash, shard) = self.get_hash_and_shard_for_gauge(key);
380
381        // Try and get the handle if it exists, running our operation if we succeed.
382        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
383        if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
384            op(v)
385        } else {
386            // Switch to write guard and insert the handle first.
387            drop(shard_read);
388            let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
389            let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
390            {
391                v
392            } else {
393                let (_, v) = shard_write
394                    .raw_entry_mut()
395                    .from_key_hashed_nocheck(hash, key)
396                    .or_insert_with(|| (key.clone(), self.storage.gauge(key)));
397
398                v
399            };
400
401            op(v)
402        }
403    }
404
405    /// Gets or creates the given histogram.
406    ///
407    /// The `op` function will be called for the histogram under the given `key`, with the histogram
408    /// first being created if it does not already exist.
409    pub fn get_or_create_histogram<O, V>(&self, key: &K, op: O) -> V
410    where
411        O: FnOnce(&S::Histogram) -> V,
412    {
413        let (hash, shard) = self.get_hash_and_shard_for_histogram(key);
414
415        // Try and get the handle if it exists, running our operation if we succeed.
416        let shard_read = shard.read().unwrap_or_else(PoisonError::into_inner);
417        if let Some((_, v)) = shard_read.raw_entry().from_key_hashed_nocheck(hash, key) {
418            op(v)
419        } else {
420            // Switch to write guard and insert the handle first.
421            drop(shard_read);
422            let mut shard_write = shard.write().unwrap_or_else(PoisonError::into_inner);
423            let v = if let Some((_, v)) = shard_write.raw_entry().from_key_hashed_nocheck(hash, key)
424            {
425                v
426            } else {
427                let (_, v) = shard_write
428                    .raw_entry_mut()
429                    .from_key_hashed_nocheck(hash, key)
430                    .or_insert_with(|| (key.clone(), self.storage.histogram(key)));
431
432                v
433            };
434
435            op(v)
436        }
437    }
438    /// Gets a map of all present counters, mapped by key.
439    ///
440    /// This map is a point-in-time snapshot of the registry.
441    pub fn get_counter_handles(&self) -> HashMap<K, S::Counter> {
442        let mut counters = HashMap::new();
443        self.visit_counters(|k, v| {
444            counters.insert(k.clone(), v.clone());
445        });
446        counters
447    }
448
449    /// Gets a map of all present gauges, mapped by key.
450    ///
451    /// This map is a point-in-time snapshot of the registry.
452    pub fn get_gauge_handles(&self) -> HashMap<K, S::Gauge> {
453        let mut gauges = HashMap::new();
454        self.visit_gauges(|k, v| {
455            gauges.insert(k.clone(), v.clone());
456        });
457        gauges
458    }
459
460    /// Gets a map of all present histograms, mapped by key.
461    ///
462    /// This map is a point-in-time snapshot of the registry.
463    pub fn get_histogram_handles(&self) -> HashMap<K, S::Histogram> {
464        let mut histograms = HashMap::new();
465        self.visit_histograms(|k, v| {
466            histograms.insert(k.clone(), v.clone());
467        });
468        histograms
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use metrics::{atomics::AtomicU64, CounterFn, Key};
475
476    use super::Registry;
477    use std::sync::{atomic::Ordering, Arc};
478
479    #[test]
480    fn test_registry() {
481        let registry = Registry::atomic();
482        let key = Key::from_name("foobar");
483
484        let entries = registry.get_counter_handles();
485        assert_eq!(entries.len(), 0);
486
487        assert!(registry.get_counter(&key).is_none());
488
489        registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
490
491        let initial_entries = registry.get_counter_handles();
492        assert_eq!(initial_entries.len(), 1);
493
494        let initial_entry: (Key, Arc<AtomicU64>) =
495            initial_entries.into_iter().next().expect("failed to get first entry");
496
497        let (ikey, ivalue) = initial_entry;
498        assert_eq!(ikey, key);
499        assert_eq!(ivalue.load(Ordering::SeqCst), 1);
500
501        registry.get_or_create_counter(&key, |c: &Arc<AtomicU64>| c.increment(1));
502
503        let updated_entries = registry.get_counter_handles();
504        assert_eq!(updated_entries.len(), 1);
505
506        let updated_entry: (Key, Arc<AtomicU64>) =
507            updated_entries.into_iter().next().expect("failed to get updated entry");
508
509        let (ukey, uvalue) = updated_entry;
510        assert_eq!(ukey, key);
511        assert_eq!(uvalue.load(Ordering::SeqCst), 2);
512
513        let value = registry.get_counter(&key).expect("failed to get entry");
514        assert!(Arc::ptr_eq(&value, &uvalue));
515
516        registry.get_or_create_counter(&Key::from_name("baz"), |_| ());
517        assert_eq!(registry.get_counter_handles().len(), 2);
518
519        let mut n = 0;
520        registry.retain_counters(|k, _| {
521            n += 1;
522            k.name().starts_with("foo")
523        });
524        assert_eq!(n, 2);
525        assert_eq!(registry.get_counter_handles().len(), 1);
526
527        assert!(registry.delete_counter(&key));
528
529        let entries = registry.get_counter_handles();
530        assert_eq!(entries.len(), 0);
531    }
532}