1use std::sync::{Arc, Weak};
2
3use metrics::{
4 Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SetRecorderError, SharedString,
5 Unit,
6};
7
8pub struct RecoveryHandle<R> {
9 handle: Arc<R>,
10}
11
12impl<R> RecoveryHandle<R> {
13 pub fn into_inner(mut self) -> R {
22 loop {
23 match Arc::try_unwrap(self.handle) {
24 Ok(recorder) => break recorder,
25 Err(handle) => {
26 self.handle = handle;
27 }
28 }
29 }
30 }
31}
32
33pub struct RecoverableRecorder<R> {
55 handle: Arc<R>,
56}
57
58impl<R: Recorder + 'static> RecoverableRecorder<R> {
59 pub fn new(recorder: R) -> Self {
61 Self { handle: Arc::new(recorder) }
62 }
63
64 pub(self) fn build(self) -> (WeakRecorder<R>, RecoveryHandle<R>) {
66 let wrapped = WeakRecorder::from_arc(&self.handle);
67
68 (wrapped, RecoveryHandle { handle: self.handle })
69 }
70
71 pub fn install(self) -> Result<RecoveryHandle<R>, SetRecorderError<R>> {
80 let (wrapped, handle) = self.build();
81 match metrics::set_global_recorder(wrapped) {
82 Ok(()) => Ok(handle),
83 Err(_) => {
84 let recorder = handle.into_inner();
85 Err(SetRecorderError(recorder))
86 }
87 }
88 }
89}
90
91struct WeakRecorder<R> {
92 recorder: Weak<R>,
93}
94
95impl<R> WeakRecorder<R> {
96 fn from_arc(recorder: &Arc<R>) -> Self {
97 Self { recorder: Arc::downgrade(recorder) }
98 }
99}
100
101impl<R: Recorder> Recorder for WeakRecorder<R> {
102 fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
103 if let Some(recorder) = self.recorder.upgrade() {
104 recorder.describe_counter(key, unit, description);
105 }
106 }
107
108 fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
109 if let Some(recorder) = self.recorder.upgrade() {
110 recorder.describe_gauge(key, unit, description);
111 }
112 }
113
114 fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
115 if let Some(recorder) = self.recorder.upgrade() {
116 recorder.describe_histogram(key, unit, description);
117 }
118 }
119
120 fn register_counter(&self, key: &Key, metadata: &Metadata<'_>) -> Counter {
121 if let Some(recorder) = self.recorder.upgrade() {
122 recorder.register_counter(key, metadata)
123 } else {
124 Counter::noop()
125 }
126 }
127
128 fn register_gauge(&self, key: &Key, metadata: &Metadata<'_>) -> Gauge {
129 if let Some(recorder) = self.recorder.upgrade() {
130 recorder.register_gauge(key, metadata)
131 } else {
132 Gauge::noop()
133 }
134 }
135
136 fn register_histogram(&self, key: &Key, metadata: &Metadata<'_>) -> Histogram {
137 if let Some(recorder) = self.recorder.upgrade() {
138 recorder.register_histogram(key, metadata)
139 } else {
140 Histogram::noop()
141 }
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use std::sync::atomic::{AtomicBool, Ordering};
148
149 use super::*;
150 use metrics::{atomics::AtomicU64, CounterFn, GaugeFn, HistogramFn, Key, Recorder};
151
152 struct CounterWrapper(AtomicU64);
153 struct GaugeWrapper(AtomicU64);
154 struct HistogramWrapper(AtomicU64);
155
156 impl CounterWrapper {
157 fn get(&self) -> u64 {
158 self.0.load(Ordering::Acquire)
159 }
160 }
161
162 impl GaugeWrapper {
163 fn get(&self) -> u64 {
164 self.0.load(Ordering::Acquire)
165 }
166 }
167
168 impl HistogramWrapper {
169 fn get(&self) -> u64 {
170 self.0.load(Ordering::Acquire)
171 }
172 }
173
174 impl CounterFn for CounterWrapper {
175 fn increment(&self, value: u64) {
176 self.0.fetch_add(value, Ordering::Release);
177 }
178
179 fn absolute(&self, value: u64) {
180 self.0.store(value, Ordering::Release);
181 }
182 }
183
184 impl GaugeFn for GaugeWrapper {
185 fn increment(&self, value: f64) {
186 self.0.fetch_add(value as u64, Ordering::Release);
187 }
188
189 fn decrement(&self, value: f64) {
190 self.0.fetch_sub(value as u64, Ordering::Release);
191 }
192
193 fn set(&self, value: f64) {
194 self.0.store(value as u64, Ordering::Release);
195 }
196 }
197
198 impl HistogramFn for HistogramWrapper {
199 fn record(&self, value: f64) {
200 self.0.fetch_add(value as u64, Ordering::Release);
201 }
202 }
203
204 struct TestRecorder {
205 dropped: Arc<AtomicBool>,
206 counter: Arc<CounterWrapper>,
207 gauge: Arc<GaugeWrapper>,
208 histogram: Arc<HistogramWrapper>,
209 }
210
211 impl TestRecorder {
212 fn new() -> (Self, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>) {
213 let (recorder, _, counter, gauge, histogram) = Self::new_with_drop();
214 (recorder, counter, gauge, histogram)
215 }
216
217 fn new_with_drop(
218 ) -> (Self, Arc<AtomicBool>, Arc<CounterWrapper>, Arc<GaugeWrapper>, Arc<HistogramWrapper>)
219 {
220 let dropped = Arc::new(AtomicBool::new(false));
221 let counter = Arc::new(CounterWrapper(AtomicU64::new(0)));
222 let gauge = Arc::new(GaugeWrapper(AtomicU64::new(0)));
223 let histogram = Arc::new(HistogramWrapper(AtomicU64::new(0)));
224
225 let recorder = Self {
226 dropped: Arc::clone(&dropped),
227 counter: Arc::clone(&counter),
228 gauge: Arc::clone(&gauge),
229 histogram: Arc::clone(&histogram),
230 };
231
232 (recorder, dropped, counter, gauge, histogram)
233 }
234 }
235
236 impl Recorder for TestRecorder {
237 fn describe_counter(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
238 todo!()
239 }
240
241 fn describe_gauge(&self, _key: KeyName, _unit: Option<Unit>, _description: SharedString) {
242 todo!()
243 }
244
245 fn describe_histogram(
246 &self,
247 _key: KeyName,
248 _unit: Option<Unit>,
249 _description: SharedString,
250 ) {
251 todo!()
252 }
253
254 fn register_counter(&self, _: &Key, _: &Metadata<'_>) -> Counter {
255 Counter::from_arc(Arc::clone(&self.counter))
256 }
257
258 fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> Gauge {
259 Gauge::from_arc(Arc::clone(&self.gauge))
260 }
261
262 fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram {
263 Histogram::from_arc(Arc::clone(&self.histogram))
264 }
265 }
266
267 impl Drop for TestRecorder {
268 fn drop(&mut self) {
269 self.dropped.store(true, Ordering::Release);
270 }
271 }
272
273 #[test]
274 fn basic() {
275 let (recorder, counter, gauge, histogram) = TestRecorder::new();
277 let recoverable = RecoverableRecorder::new(recorder);
278 let (recorder, handle) = recoverable.build();
279
280 metrics::with_local_recorder(&recorder, || {
283 metrics::counter!("counter").increment(5);
284 metrics::gauge!("gauge").increment(5.0);
285 metrics::gauge!("gauge").increment(5.0);
286 metrics::histogram!("histogram").record(5.0);
287 metrics::histogram!("histogram").record(5.0);
288 metrics::histogram!("histogram").record(5.0);
289 });
290
291 let _recorder = handle.into_inner();
292 assert_eq!(counter.get(), 5);
293 assert_eq!(gauge.get(), 10);
294 assert_eq!(histogram.get(), 15);
295
296 metrics::with_local_recorder(&recorder, || {
299 metrics::counter!("counter").increment(7);
300 metrics::gauge!("gauge").increment(7.0);
301 metrics::histogram!("histogram").record(7.0);
302 });
303
304 assert_eq!(counter.get(), 5);
305 assert_eq!(gauge.get(), 10);
306 assert_eq!(histogram.get(), 15);
307 }
308
309 #[test]
310 fn on_drop() {
311 let (recorder, dropped, counter, gauge, histogram) = TestRecorder::new_with_drop();
313 let recoverable = RecoverableRecorder::new(recorder);
314 let (recorder, handle) = recoverable.build();
315
316 metrics::with_local_recorder(&recorder, || {
319 metrics::counter!("counter").increment(5);
320 metrics::gauge!("gauge").increment(5.0);
321 metrics::gauge!("gauge").increment(5.0);
322 metrics::histogram!("histogram").record(5.0);
323 metrics::histogram!("histogram").record(5.0);
324 metrics::histogram!("histogram").record(5.0);
325 });
326
327 drop(handle.into_inner());
328 assert_eq!(counter.get(), 5);
329 assert_eq!(gauge.get(), 10);
330 assert_eq!(histogram.get(), 15);
331
332 metrics::with_local_recorder(&recorder, || {
335 metrics::counter!("counter").increment(7);
336 metrics::gauge!("gauge").increment(7.0);
337 metrics::histogram!("histogram").record(7.0);
338 });
339
340 assert_eq!(counter.get(), 5);
341 assert_eq!(gauge.get(), 10);
342 assert_eq!(histogram.get(), 15);
343
344 assert!(dropped.load(Ordering::Acquire));
346 }
347}