metrics_util/
recoverable.rs

1use std::sync::{Arc, Weak};
2
3use metrics::{
4    Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SetRecorderError, SharedString,
5    Unit,
6};
7
8pub struct RecoveryHandle<R> {
9    handle: Arc<R>,
10}
11
12impl<R> RecoveryHandle<R> {
13    /// Consumes the handle, returning the original recorder.
14    ///
15    /// This method will loop until there are no other strong references to the recorder. This means
16    /// that the wrapped recorder which was installed is not being actively used, as using it
17    /// temporarily upgrades its internal weak reference to a strong reference.
18    ///
19    /// It is not advised to call this method under heavy load, as doing so is not deterministic or
20    /// ordered and may block for an indefinite amount of time.
21    pub fn into_inner(mut self) -> R {
22        loop {
23            match Arc::try_unwrap(self.handle) {
24                Ok(recorder) => break recorder,
25                Err(handle) => {
26                    self.handle = handle;
27                }
28            }
29        }
30    }
31}
32
33/// Wraps a recorder to allow for recovering it after being installed.
34///
35/// Installing a recorder generally involves providing an owned value, which means that it is not
36/// possible to recover the recorder after it has been installed. For some recorder implementations,
37/// it can be important to perform finalization before the application exits, which is not possible
38/// if the application cannot consume the recorder.
39///
40/// `RecoverableRecorder` allows wrapping a recorder such that a weak reference to it is installed
41/// globally, while the recorder itself is held by `RecoveryHandle<R>`. This allows the recorder to
42/// be used globally so long as the recovery handle is active, keeping the original recorder alive.
43///
44/// ## As a drop guard
45///
46/// While `RecoveryHandle<R>` provides a method to manually recover the recorder directly, one
47/// particular benefit is that due to how the recorder is wrapped, when `RecoveryHandle<R>` is
48/// dropped, and the last active reference to the wrapped recorder is dropped, the recorder itself
49/// will be dropped.
50///
51/// This allows using `RecoveryHandle<R>` as a drop guard, ensuring that by dropping it, the
52/// recorder itself will be dropped, and any finalization logic implemented for the recorder will be
53/// run.
54pub struct RecoverableRecorder<R> {
55    handle: Arc<R>,
56}
57
58impl<R: Recorder + 'static> RecoverableRecorder<R> {
59    /// Creates a new `RecoverableRecorder` from the given recorder.
60    pub fn new(recorder: R) -> Self {
61        Self { handle: Arc::new(recorder) }
62    }
63
64    /// Builds the wrapped recorder and a handle to recover the original.
65    pub(self) fn build(self) -> (WeakRecorder<R>, RecoveryHandle<R>) {
66        let wrapped = WeakRecorder::from_arc(&self.handle);
67
68        (wrapped, RecoveryHandle { handle: self.handle })
69    }
70
71    /// Installs the wrapped recorder globally, returning a handle to recover it.
72    ///
73    /// A weakly-referenced version of the recorder is installed globally, while the original
74    /// recorder is held within `RecoverableRecorder`, and can be recovered by calling `into_inner`.
75    ///
76    /// # Errors
77    ///
78    /// If a recorder is already installed, an error is returned containing the original recorder.
79    pub fn install(self) -> Result<RecoveryHandle<R>, SetRecorderError<R>> {
80        let (wrapped, handle) = self.build();
81        match metrics::set_global_recorder(wrapped) {
82            Ok(()) => Ok(handle),
83            Err(_) => {
84                let recorder = handle.into_inner();
85                Err(SetRecorderError(recorder))
86            }
87        }
88    }
89}
90
91struct WeakRecorder<R> {
92    recorder: Weak<R>,
93}
94
95impl<R> WeakRecorder<R> {
96    fn from_arc(recorder: &Arc<R>) -> Self {
97        Self { recorder: Arc::downgrade(recorder) }
98    }
99}
100
101impl<R: Recorder> Recorder for WeakRecorder<R> {
102    fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
103        if let Some(recorder) = self.recorder.upgrade() {
104            recorder.describe_counter(key, unit, description);
105        }
106    }
107
108    fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
109        if let Some(recorder) = self.recorder.upgrade() {
110            recorder.describe_gauge(key, unit, description);
111        }
112    }
113
114    fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
115        if let Some(recorder) = self.recorder.upgrade() {
116            recorder.describe_histogram(key, unit, description);
117        }
118    }
119
120    fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
121        if let Some(recorder) = self.recorder.upgrade() {
122            recorder.register_counter(key, metadata)
123        } else {
124            Counter::noop()
125        }
126    }
127
128    fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
129        if let Some(recorder) = self.recorder.upgrade() {
130            recorder.register_gauge(key, metadata)
131        } else {
132            Gauge::noop()
133        }
134    }
135
136    fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
137        if let Some(recorder) = self.recorder.upgrade() {
138            recorder.register_histogram(key, metadata)
139        } else {
140            Histogram::noop()
141        }
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use std::sync::atomic::{AtomicBool, Ordering};
148
149    use super::*;
150    use metrics::{atomics::AtomicU64, CounterFn, GaugeFn, HistogramFn, Key, Recorder};
151
152    struct CounterWrapper(AtomicU64);
153    struct GaugeWrapper(AtomicU64);
154    struct HistogramWrapper(AtomicU64);
155
156    impl CounterWrapper {
157        fn get(&self) -> u64 {
158            self.0.load(Ordering::Acquire)
159        }
160    }
161
162    impl GaugeWrapper {
163        fn get(&self) -> u64 {
164            self.0.load(Ordering::Acquire)
165        }
166    }
167
168    impl HistogramWrapper {
169        fn get(&self) -> u64 {
170            self.0.load(Ordering::Acquire)
171        }
172    }
173
174    impl CounterFn for CounterWrapper {
175        fn increment(&self, value: u64) {
176            self.0.fetch_add(value, Ordering::Release);
177        }
178
179        fn absolute(&self, value: u64) {
180            self.0.store(value, Ordering::Release);
181        }
182    }
183
184    impl GaugeFn for GaugeWrapper {
185        fn increment(&self, value: f64) {
186            self.0.fetch_add(value as u64, Ordering::Release);
187        }
188
189        fn decrement(&self, value: f64) {
190            self.0.fetch_sub(value as u64, Ordering::Release);
191        }
192
193        fn set(&self, value: f64) {
194            self.0.store(value as u64, Ordering::Release);
195        }
196    }
197
198    impl HistogramFn for HistogramWrapper {
199        fn record(&self, value: f64) {
200            self.0.fetch_add(value as u64, Ordering::Release);
201        }
202    }
203
204    struct TestRecorder {
205        dropped: Arc<AtomicBool>,
206        counter: Arc<CounterWrapper>,
207        gauge: Arc<GaugeWrapper>,
208        histogram: Arc<HistogramWrapper>,
209    }
210
211    impl TestRecorder {
212        fn new() -> (Self, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>) {
213            let (recorder, _, counter, gauge, histogram) = Self::new_with_drop();
214            (recorder, counter, gauge, histogram)
215        }
216
217        fn new_with_drop(
218        ) -> (Self, Arc<AtomicBool>, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>)
219        {
220            let dropped = Arc::new(AtomicBool::new(false));
221            let counter = Arc::new(CounterWrapper(AtomicU64::new(0)));
222            let gauge = Arc::new(GaugeWrapper(AtomicU64::new(0)));
223            let histogram = Arc::new(HistogramWrapper(AtomicU64::new(0)));
224
225            let recorder = Self {
226                dropped: Arc::clone(&dropped),
227                counter: Arc::clone(&counter),
228                gauge: Arc::clone(&gauge),
229                histogram: Arc::clone(&histogram),
230            };
231
232            (recorder, dropped, counter, gauge, histogram)
233        }
234    }
235
236    impl Recorder for TestRecorder {
237        fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
238            todo!()
239        }
240
241        fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
242            todo!()
243        }
244
245        fn describe_histogram(
246            &self,
247            _key: KeyName,
248            _unit: Option<Unit>,
249            _description: SharedString,
250        ) {
251            todo!()
252        }
253
254        fn register_counter(&self, _: &Key, _: &Metadata<'_>) -> Counter {
255            Counter::from_arc(Arc::clone(&self.counter))
256        }
257
258        fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> Gauge {
259            Gauge::from_arc(Arc::clone(&self.gauge))
260        }
261
262        fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram {
263            Histogram::from_arc(Arc::clone(&self.histogram))
264        }
265    }
266
267    impl Drop for TestRecorder {
268        fn drop(&mut self) {
269            self.dropped.store(true, Ordering::Release);
270        }
271    }
272
273    #[test]
274    fn basic() {
275        // Create and install the recorder.
276        let (recorder, counter, gauge, histogram) = TestRecorder::new();
277        let recoverable = RecoverableRecorder::new(recorder);
278        let (recorder, handle) = recoverable.build();
279
280        // Record some metrics, and make sure the atomics for each metric type are
281        // incremented as we would expect them to be.
282        metrics::with_local_recorder(&recorder, || {
283            metrics::counter!("counter").increment(5);
284            metrics::gauge!("gauge").increment(5.0);
285            metrics::gauge!("gauge").increment(5.0);
286            metrics::histogram!("histogram").record(5.0);
287            metrics::histogram!("histogram").record(5.0);
288            metrics::histogram!("histogram").record(5.0);
289        });
290
291        let _recorder = handle.into_inner();
292        assert_eq!(counter.get(), 5);
293        assert_eq!(gauge.get(), 10);
294        assert_eq!(histogram.get(), 15);
295
296        // Now that we've recovered the recorder, incrementing the same metrics should
297        // not actually increment the value of the atomics for each metric type.
298        metrics::with_local_recorder(&recorder, || {
299            metrics::counter!("counter").increment(7);
300            metrics::gauge!("gauge").increment(7.0);
301            metrics::histogram!("histogram").record(7.0);
302        });
303
304        assert_eq!(counter.get(), 5);
305        assert_eq!(gauge.get(), 10);
306        assert_eq!(histogram.get(), 15);
307    }
308
309    #[test]
310    fn on_drop() {
311        // Create and install the recorder.
312        let (recorder, dropped, counter, gauge, histogram) = TestRecorder::new_with_drop();
313        let recoverable = RecoverableRecorder::new(recorder);
314        let (recorder, handle) = recoverable.build();
315
316        // Record some metrics, and make sure the atomics for each metric type are
317        // incremented as we would expect them to be.
318        metrics::with_local_recorder(&recorder, || {
319            metrics::counter!("counter").increment(5);
320            metrics::gauge!("gauge").increment(5.0);
321            metrics::gauge!("gauge").increment(5.0);
322            metrics::histogram!("histogram").record(5.0);
323            metrics::histogram!("histogram").record(5.0);
324            metrics::histogram!("histogram").record(5.0);
325        });
326
327        drop(handle.into_inner());
328        assert_eq!(counter.get(), 5);
329        assert_eq!(gauge.get(), 10);
330        assert_eq!(histogram.get(), 15);
331
332        // Now that we've recovered the recorder, incrementing the same metrics should
333        // not actually increment the value of the atomics for each metric type.
334        metrics::with_local_recorder(&recorder, || {
335            metrics::counter!("counter").increment(7);
336            metrics::gauge!("gauge").increment(7.0);
337            metrics::histogram!("histogram").record(7.0);
338        });
339
340        assert_eq!(counter.get(), 5);
341        assert_eq!(gauge.get(), 10);
342        assert_eq!(histogram.get(), 15);
343
344        // And we should be able to check that the recorder was indeed dropped.
345        assert!(dropped.load(Ordering::Acquire));
346    }
347}