metrics_util/registry/
recency.rs

1//! Metric recency.
2//!
3//! `Recency` deals with the concept of removing metrics that have not been updated for a certain
4//! amount of time.  In some use cases, metrics are tied to specific labels which are short-lived,
5//! such as labels referencing a date or a version of software.  When these labels change, exporters
6//! may still be emitting those older metrics which are no longer relevant.  In many cases, a
7//! long-lived application could continue tracking metrics such that the unique number of metrics
8//! grows until a significant portion of memory is required to track them all, even if the majority
9//! of them are no longer used.
10//!
11//! As metrics are typically backed by atomic storage, exporters don't see the individual changes to
12//! a metric, and so need a way to measure if a metric has changed since the last time it was
13//! observed.  This could potentially be achieved by observing the value directly, but metrics like
14//! gauges can be updated in such a way that their value is the same between two observations even
15//! though it had actually been changed in between.
16//!
17//! We solve for this by tracking the generation of a metric, which represents the number of times
18//! it has been modified. In doing so, we can compare the generation of a metric between
19//! observations, which only ever increases monotonically.  This provides a universal mechanism that
20//! works for all metric types.
21//!
22//! `Recency` uses the generation of a metric, along with a measurement of time when a metric is
23//! observed, to build a complete picture that allows deciding if a given metric has gone "idle" or
24//! not, and thus whether it should actually be deleted.
25use std::sync::atomic::{AtomicUsize, Ordering};
26use std::sync::{Arc, Mutex, PoisonError};
27use std::time::Duration;
28use std::{collections::HashMap, ops::DerefMut};
29
30use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn};
31use quanta::{Clock, Instant};
32
33use crate::Hashable;
34use crate::{
35    kind::MetricKindMask,
36    registry::{AtomicStorage, Registry, Storage},
37    MetricKind,
38};
39
40/// The generation of a metric.
41///
42/// Generations are opaque and are not meant to be used directly, but meant to be used as a
43/// comparison amongst each other in terms of ordering.
44#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
45pub struct Generation(usize);
46
47/// Generation tracking for a metric.
48///
49/// Holds a generic interior value, and provides way to access the value such that each access
50/// increments the "generation" of the value.  This provides a means to understand if the value has
51/// been updated since the last time it was observed.
52///
53/// For example, if a gauge was observed to be X at one point in time, and then observed to be X
54/// again at a later point in time, it could have changed in between the two observations.  It also
55/// may not have changed, and thus `Generational` provides a way to determine if either of these
56/// events occurred.
57#[derive(Clone)]
58pub struct Generational<T> {
59    inner: T,
60    gen: Arc<AtomicUsize>,
61}
62
63impl<T> Generational<T> {
64    /// Creates a new `Generational<T>`.
65    fn new(inner: T) -> Generational<T> {
66        Generational { inner, gen: Arc::new(AtomicUsize::new(0)) }
67    }
68
69    /// Gets a reference to the inner value.
70    pub fn get_inner(&self) -> &T {
71        &self.inner
72    }
73
74    /// Gets the current generation.
75    pub fn get_generation(&self) -> Generation {
76        Generation(self.gen.load(Ordering::Acquire))
77    }
78
79    /// Acquires a reference to the inner value, and increments the generation.
80    pub fn with_increment<F, V>(&self, f: F) -> V
81    where
82        F: Fn(&T) -> V,
83    {
84        let result = f(&self.inner);
85        let _ = self.gen.fetch_add(1, Ordering::AcqRel);
86        result
87    }
88}
89
90impl<T> CounterFn for Generational<T>
91where
92    T: CounterFn,
93{
94    fn increment(&self, value: u64) {
95        self.with_increment(|c| c.increment(value))
96    }
97
98    fn absolute(&self, value: u64) {
99        self.with_increment(|c| c.absolute(value))
100    }
101}
102
103impl<T> GaugeFn for Generational<T>
104where
105    T: GaugeFn,
106{
107    fn increment(&self, value: f64) {
108        self.with_increment(|g| g.increment(value))
109    }
110
111    fn decrement(&self, value: f64) {
112        self.with_increment(|g| g.decrement(value))
113    }
114
115    fn set(&self, value: f64) {
116        self.with_increment(|g| g.set(value))
117    }
118}
119
120impl<T> HistogramFn for Generational<T>
121where
122    T: HistogramFn,
123{
124    fn record(&self, value: f64) {
125        self.with_increment(|h| h.record(value))
126    }
127}
128
129impl<T> From<Generational<T>> for Counter
130where
131    T: CounterFn + Send + Sync + 'static,
132{
133    fn from(inner: Generational<T>) -> Self {
134        Counter::from_arc(Arc::new(inner))
135    }
136}
137
138impl<T> From<Generational<T>> for Gauge
139where
140    T: GaugeFn + Send + Sync + 'static,
141{
142    fn from(inner: Generational<T>) -> Self {
143        Gauge::from_arc(Arc::new(inner))
144    }
145}
146
147impl<T> From<Generational<T>> for Histogram
148where
149    T: HistogramFn + Send + Sync + 'static,
150{
151    fn from(inner: Generational<T>) -> Self {
152        Histogram::from_arc(Arc::new(inner))
153    }
154}
155
156/// Generational metric storage.
157///
158/// Tracks the "generation" of a metric, which is used to detect updates to metrics where the value
159/// otherwise would not be sufficient to be used as an indicator.
160pub struct GenerationalStorage<S> {
161    inner: S,
162}
163
164impl<S> GenerationalStorage<S> {
165    /// Creates a new [`GenerationalStorage`].
166    ///
167    /// This wraps the given `storage` and provides generational semantics on top of it.
168    pub fn new(storage: S) -> Self {
169        Self { inner: storage }
170    }
171}
172
173impl<K, S: Storage<K>> Storage<K> for GenerationalStorage<S> {
174    type Counter = Generational<S::Counter>;
175    type Gauge = Generational<S::Gauge>;
176    type Histogram = Generational<S::Histogram>;
177
178    fn counter(&self, key: &K) -> Self::Counter {
179        Generational::new(self.inner.counter(key))
180    }
181
182    fn gauge(&self, key: &K) -> Self::Gauge {
183        Generational::new(self.inner.gauge(key))
184    }
185
186    fn histogram(&self, key: &K) -> Self::Histogram {
187        Generational::new(self.inner.histogram(key))
188    }
189}
190
191/// Generational atomic metric storage.
192///
193/// `GenerationalAtomicStorage` is based on [`AtomicStorage`], but additionally tracks the
194/// "generation" of a metric, which is used to detect updates to metrics where the value otherwise
195/// would not be sufficient to be used as an indicator.
196pub type GenerationalAtomicStorage = GenerationalStorage<AtomicStorage>;
197
198impl GenerationalAtomicStorage {
199    /// Creates a [`GenerationalStorage`] that uses [`AtomicStorage`] as its underlying storage.
200    pub fn atomic() -> Self {
201        Self { inner: AtomicStorage }
202    }
203}
204
205/// Tracks recency of metric updates by their registry generation and time.
206///
207/// In many cases, a user may have a long-running process where metrics are stored over time using
208/// labels that change for some particular reason, leaving behind versions of that metric with
209/// labels that are no longer relevant to the current process state.  This can lead to cases where
210/// metrics that no longer matter are still present in rendered output, adding bloat.
211///
212/// When coupled with [`Registry`], [`Recency`] can be used to track when the last update to a
213/// metric has occurred for the purposes of removing idle metrics from the registry.  In addition,
214/// it will remove the value from the registry itself to reduce the aforementioned bloat.
215///
216/// [`Recency`] is separate from [`Registry`] specifically to avoid imposing any slowdowns when
217/// tracking recency does not matter, despite their otherwise tight coupling.
218pub struct Recency<K> {
219    mask: MetricKindMask,
220    inner: Mutex<(Clock, HashMap<K, (Generation, Instant)>)>,
221    idle_timeout: Option<Duration>,
222}
223
224impl<K> Recency<K>
225where
226    K: Clone + Eq + Hashable,
227{
228    /// Creates a new [`Recency`].
229    ///
230    /// If `idle_timeout` is `None`, no recency checking will occur.  Otherwise, any metric that has
231    /// not been updated for longer than `idle_timeout` will be subject for deletion the next time
232    /// the metric is checked.
233    ///
234    /// The provided `clock` is used for tracking time, while `mask` controls which metrics
235    /// are covered by the recency logic.  For example, if `mask` only contains counters and
236    /// histograms, then gauges will not be considered for recency, and thus will never be deleted.
237    ///
238    /// Refer to the documentation for [`MetricKindMask`](crate::MetricKindMask) for more
239    /// information on defining a metric kind mask.
240    pub fn new(clock: Clock, mask: MetricKindMask, idle_timeout: Option<Duration>) -> Self {
241        Recency { mask, inner: Mutex::new((clock, HashMap::new())), idle_timeout }
242    }
243
244    /// Checks if the given counter should be stored, based on its known recency.
245    ///
246    /// If the given key has been updated recently enough, and should continue to be stored, this
247    /// method will return `true` and will update the last update time internally.  If the given key
248    /// has not been updated recently enough, the key will be removed from the given registry if the
249    /// given generation also matches.
250    pub fn should_store_counter<S>(
251        &self,
252        key: &K,
253        gen: Generation,
254        registry: &Registry<K, S>,
255    ) -> bool
256    where
257        S: Storage<K>,
258    {
259        self.should_store(key, gen, registry, MetricKind::Counter, |registry, key| {
260            registry.delete_counter(key)
261        })
262    }
263
264    /// Checks if the given gauge should be stored, based on its known recency.
265    ///
266    /// If the given key has been updated recently enough, and should continue to be stored, this
267    /// method will return `true` and will update the last update time internally.  If the given key
268    /// has not been updated recently enough, the key will be removed from the given registry if the
269    /// given generation also matches.
270    pub fn should_store_gauge<S>(&self, key: &K, gen: Generation, registry: &Registry<K, S>) -> bool
271    where
272        S: Storage<K>,
273    {
274        self.should_store(key, gen, registry, MetricKind::Gauge, |registry, key| {
275            registry.delete_gauge(key)
276        })
277    }
278
279    /// Checks if the given histogram should be stored, based on its known recency.
280    ///
281    /// If the given key has been updated recently enough, and should continue to be stored, this
282    /// method will return `true` and will update the last update time internally.  If the given key
283    /// has not been updated recently enough, the key will be removed from the given registry if the
284    /// given generation also matches.
285    pub fn should_store_histogram<S>(
286        &self,
287        key: &K,
288        gen: Generation,
289        registry: &Registry<K, S>,
290    ) -> bool
291    where
292        S: Storage<K>,
293    {
294        self.should_store(key, gen, registry, MetricKind::Histogram, |registry, key| {
295            registry.delete_histogram(key)
296        })
297    }
298
299    fn should_store<F, S>(
300        &self,
301        key: &K,
302        gen: Generation,
303        registry: &Registry<K, S>,
304        kind: MetricKind,
305        delete_op: F,
306    ) -> bool
307    where
308        F: Fn(&Registry<K, S>, &K) -> bool,
309        S: Storage<K>,
310    {
311        if let Some(idle_timeout) = self.idle_timeout {
312            if self.mask.matches(kind) {
313                let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
314                let (clock, entries) = guard.deref_mut();
315
316                let now = clock.now();
317                let deleted = if let Some((last_gen, last_update)) = entries.get_mut(key) {
318                    // If the value is the same as the latest value we have internally, and
319                    // we're over the idle timeout period, then remove it and continue.
320                    if *last_gen == gen {
321                        // If the delete returns false, that means that our generation counter is
322                        // out-of-date, and that the metric has been updated since, so we don't
323                        // actually want to delete it yet.
324                        (now - *last_update) > idle_timeout && delete_op(registry, key)
325                    } else {
326                        // Value has changed, so mark it such.
327                        *last_update = now;
328                        *last_gen = gen;
329                        false
330                    }
331                } else {
332                    entries.insert(key.clone(), (gen, now));
333                    false
334                };
335
336                if deleted {
337                    entries.remove(key);
338                    return false;
339                }
340            }
341        }
342
343        true
344    }
345}