Skip to main content

reth_db/
metrics.rs

1use crate::Tables;
2use metrics::Histogram;
3use quanta::Instant;
4use reth_metrics::{metrics::Counter, Metrics};
5use rustc_hash::FxHashMap;
6use std::{array, sync::Arc, time::Duration};
7use strum::{EnumCount, EnumIter, IntoEnumIterator};
8
9const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
10
11/// Caches metric handles for database environment to make sure handles are not re-created
12/// on every operation.
13///
14/// Requires a metric recorder to be registered before creating an instance of this struct.
15/// Otherwise, metric recording will no-op.
16#[derive(Debug)]
17pub(crate) struct DatabaseEnvMetrics {
18    /// Caches per-table operation metric handles for all database operation metrics.
19    operations: FxHashMap<&'static str, TableOperationMetrics>,
20    /// Caches `TransactionMetrics` handles for counters grouped by only transaction mode.
21    /// Updated both at tx open and close.
22    transactions: FxHashMap<TransactionMode, TransactionMetrics>,
23    /// Caches `TransactionOutcomeMetrics` handles for counters grouped by transaction mode and
24    /// outcome. Can only be updated at tx close, as outcome is only known at that point.
25    transaction_outcomes:
26        FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
27}
28
29/// Per-table operation metric handles cached for hot cursor paths.
30pub(crate) type TableOperationMetrics = Arc<[OperationMetrics; Operation::COUNT]>;
31
32impl DatabaseEnvMetrics {
33    pub(crate) fn new() -> Self {
34        // Pre-populate metric handle maps with all possible combinations of labels
35        // to avoid runtime locks on the map when recording metrics.
36        Self {
37            operations: Self::generate_operation_handles(),
38            transactions: Self::generate_transaction_handles(),
39            transaction_outcomes: Self::generate_transaction_outcome_handles(),
40        }
41    }
42
43    /// Generate a map of pre-bound operation handles for each table.
44    fn generate_operation_handles() -> FxHashMap<&'static str, TableOperationMetrics> {
45        let mut operations = FxHashMap::with_capacity_and_hasher(Tables::COUNT, Default::default());
46
47        for table in Tables::ALL {
48            let table_name = table.name();
49            let metrics = array::from_fn(|index| {
50                let operation = Operation::from_index(index);
51                OperationMetrics::new_with_labels(&[
52                    (Labels::Table.as_str(), table_name),
53                    (Labels::Operation.as_str(), operation.as_str()),
54                ])
55            });
56
57            operations.insert(table_name, Arc::new(metrics));
58        }
59
60        operations
61    }
62
63    /// Generate a map of all possible transaction modes to metric handles.
64    /// Used for tracking a counter of open transactions.
65    fn generate_transaction_handles() -> FxHashMap<TransactionMode, TransactionMetrics> {
66        TransactionMode::iter()
67            .map(|mode| {
68                (
69                    mode,
70                    TransactionMetrics::new_with_labels(&[(
71                        Labels::TransactionMode.as_str(),
72                        mode.as_str(),
73                    )]),
74                )
75            })
76            .collect()
77    }
78
79    /// Generate a map of all possible transaction mode and outcome handles.
80    /// Used for tracking various stats for finished transactions (e.g. commit duration).
81    fn generate_transaction_outcome_handles(
82    ) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> {
83        let mut transaction_outcomes = FxHashMap::with_capacity_and_hasher(
84            TransactionMode::COUNT * TransactionOutcome::COUNT,
85            Default::default(),
86        );
87        for mode in TransactionMode::iter() {
88            for outcome in TransactionOutcome::iter() {
89                transaction_outcomes.insert(
90                    (mode, outcome),
91                    TransactionOutcomeMetrics::new_with_labels(&[
92                        (Labels::TransactionMode.as_str(), mode.as_str()),
93                        (Labels::TransactionOutcome.as_str(), outcome.as_str()),
94                    ]),
95                );
96            }
97        }
98        transaction_outcomes
99    }
100
101    /// Record a metric for database operation executed in `f`.
102    /// Panics if a metric recorder is not found for the given table and operation.
103    pub(crate) fn record_operation<R>(
104        &self,
105        table: &'static str,
106        operation: Operation,
107        value_size: Option<usize>,
108        f: impl FnOnce() -> R,
109    ) -> R {
110        if let Some(metrics) = self.operations.get(table) {
111            metrics[operation.index()].record(value_size, f)
112        } else {
113            f()
114        }
115    }
116
117    /// Returns pre-bound operation metric handles for a single table.
118    pub(crate) fn table_operation_metrics(&self, table: &'static str) -> TableOperationMetrics {
119        self.operations.get(table).expect("table operation metric handles not found").clone()
120    }
121
122    /// Record metrics for opening a database transaction.
123    pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
124        self.transactions
125            .get(&mode)
126            .expect("transaction mode metric handle not found")
127            .record_open();
128    }
129
130    /// Record metrics for closing a database transactions.
131    #[cfg(feature = "mdbx")]
132    pub(crate) fn record_closed_transaction(
133        &self,
134        mode: TransactionMode,
135        outcome: TransactionOutcome,
136        open_duration: Duration,
137        close_duration: Option<Duration>,
138        commit_latency: Option<reth_libmdbx::CommitLatency>,
139    ) {
140        self.transactions
141            .get(&mode)
142            .expect("transaction mode metric handle not found")
143            .record_close();
144
145        self.transaction_outcomes
146            .get(&(mode, outcome))
147            .expect("transaction outcome metric handle not found")
148            .record(open_duration, close_duration, commit_latency);
149    }
150}
151
152/// Transaction mode for the database, either read-only or read-write.
153#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
154pub(crate) enum TransactionMode {
155    /// Read-only transaction mode.
156    ReadOnly,
157    /// Read-write transaction mode.
158    ReadWrite,
159}
160
161impl TransactionMode {
162    /// Returns the transaction mode as a string.
163    pub(crate) const fn as_str(&self) -> &'static str {
164        match self {
165            Self::ReadOnly => "read-only",
166            Self::ReadWrite => "read-write",
167        }
168    }
169
170    /// Returns `true` if the transaction mode is read-only.
171    pub(crate) const fn is_read_only(&self) -> bool {
172        matches!(self, Self::ReadOnly)
173    }
174}
175
176/// Transaction outcome after a database operation - commit, abort, or drop.
177#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
178pub(crate) enum TransactionOutcome {
179    /// Successful commit of the transaction.
180    Commit,
181    /// Aborted transaction.
182    Abort,
183    /// Dropped transaction.
184    Drop,
185}
186
187impl TransactionOutcome {
188    /// Returns the transaction outcome as a string.
189    pub(crate) const fn as_str(&self) -> &'static str {
190        match self {
191            Self::Commit => "commit",
192            Self::Abort => "abort",
193            Self::Drop => "drop",
194        }
195    }
196
197    /// Returns `true` if the transaction outcome is a commit.
198    pub(crate) const fn is_commit(&self) -> bool {
199        matches!(self, Self::Commit)
200    }
201}
202
203/// Types of operations conducted on the database: get, put, delete, and various cursor operations.
204#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
205pub(crate) enum Operation {
206    /// Database get operation.
207    Get,
208    /// Database put upsert operation.
209    PutUpsert,
210    /// Database put append operation.
211    PutAppend,
212    /// Database delete operation.
213    Delete,
214    /// Database cursor upsert operation.
215    CursorUpsert,
216    /// Database cursor insert operation.
217    CursorInsert,
218    /// Database cursor append operation.
219    CursorAppend,
220    /// Database cursor append duplicates operation.
221    CursorAppendDup,
222    /// Database cursor delete current operation.
223    CursorDeleteCurrent,
224    /// Database cursor delete current duplicates operation.
225    CursorDeleteCurrentDuplicates,
226}
227
228impl Operation {
229    /// Returns the index of the operation in the cached per-table operation array.
230    pub(crate) const fn index(&self) -> usize {
231        match self {
232            Self::Get => 0,
233            Self::PutUpsert => 1,
234            Self::PutAppend => 2,
235            Self::Delete => 3,
236            Self::CursorUpsert => 4,
237            Self::CursorInsert => 5,
238            Self::CursorAppend => 6,
239            Self::CursorAppendDup => 7,
240            Self::CursorDeleteCurrent => 8,
241            Self::CursorDeleteCurrentDuplicates => 9,
242        }
243    }
244
245    /// Returns the operation for the given index in the cached per-table operation array.
246    const fn from_index(index: usize) -> Self {
247        match index {
248            0 => Self::Get,
249            1 => Self::PutUpsert,
250            2 => Self::PutAppend,
251            3 => Self::Delete,
252            4 => Self::CursorUpsert,
253            5 => Self::CursorInsert,
254            6 => Self::CursorAppend,
255            7 => Self::CursorAppendDup,
256            8 => Self::CursorDeleteCurrent,
257            9 => Self::CursorDeleteCurrentDuplicates,
258            _ => panic!("invalid operation index"),
259        }
260    }
261
262    /// Returns the operation as a string.
263    pub(crate) const fn as_str(&self) -> &'static str {
264        match self {
265            Self::Get => "get",
266            Self::PutUpsert => "put-upsert",
267            Self::PutAppend => "put-append",
268            Self::Delete => "delete",
269            Self::CursorUpsert => "cursor-upsert",
270            Self::CursorInsert => "cursor-insert",
271            Self::CursorAppend => "cursor-append",
272            Self::CursorAppendDup => "cursor-append-dup",
273            Self::CursorDeleteCurrent => "cursor-delete-current",
274            Self::CursorDeleteCurrentDuplicates => "cursor-delete-current-duplicates",
275        }
276    }
277}
278
279/// Enum defining labels for various aspects used in metrics.
280enum Labels {
281    /// Label representing a table.
282    Table,
283    /// Label representing a transaction mode.
284    TransactionMode,
285    /// Label representing a transaction outcome.
286    TransactionOutcome,
287    /// Label representing a database operation.
288    Operation,
289}
290
291impl Labels {
292    /// Converts each label variant into its corresponding string representation.
293    pub(crate) const fn as_str(&self) -> &'static str {
294        match self {
295            Self::Table => "table",
296            Self::TransactionMode => "mode",
297            Self::TransactionOutcome => "outcome",
298            Self::Operation => "operation",
299        }
300    }
301}
302
303#[derive(Metrics, Clone)]
304#[metrics(scope = "database.transaction")]
305pub(crate) struct TransactionMetrics {
306    /// Total number of opened database transactions (cumulative)
307    opened_total: Counter,
308    /// Total number of closed database transactions (cumulative)
309    closed_total: Counter,
310}
311
312impl TransactionMetrics {
313    pub(crate) fn record_open(&self) {
314        self.opened_total.increment(1);
315    }
316
317    pub(crate) fn record_close(&self) {
318        self.closed_total.increment(1);
319    }
320}
321
322#[derive(Metrics, Clone)]
323#[metrics(scope = "database.transaction")]
324pub(crate) struct TransactionOutcomeMetrics {
325    /// The time a database transaction has been open
326    open_duration_seconds: Histogram,
327    /// The time it took to close a database transaction
328    close_duration_seconds: Histogram,
329    /// The time it took to prepare a transaction commit
330    commit_preparation_duration_seconds: Histogram,
331    /// Duration of GC update during transaction commit by wall clock
332    commit_gc_wallclock_duration_seconds: Histogram,
333    /// The time it took to conduct audit of a transaction commit
334    commit_audit_duration_seconds: Histogram,
335    /// The time it took to write dirty/modified data pages to a filesystem during transaction
336    /// commit
337    commit_write_duration_seconds: Histogram,
338    /// The time it took to sync written data to the disk/storage during transaction commit
339    commit_sync_duration_seconds: Histogram,
340    /// The time it took to release resources during transaction commit
341    commit_ending_duration_seconds: Histogram,
342    /// The total duration of a transaction commit
343    commit_whole_duration_seconds: Histogram,
344    /// User-mode CPU time spent on GC update during transaction commit
345    commit_gc_cputime_duration_seconds: Histogram,
346}
347
348impl TransactionOutcomeMetrics {
349    /// Record transaction closing with the duration it was open and the duration it took to close
350    /// it.
351    #[cfg(feature = "mdbx")]
352    pub(crate) fn record(
353        &self,
354        open_duration: Duration,
355        close_duration: Option<Duration>,
356        commit_latency: Option<reth_libmdbx::CommitLatency>,
357    ) {
358        self.open_duration_seconds.record(open_duration);
359
360        if let Some(close_duration) = close_duration {
361            self.close_duration_seconds.record(close_duration)
362        }
363
364        if let Some(commit_latency) = commit_latency {
365            self.commit_preparation_duration_seconds.record(commit_latency.preparation());
366            self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
367            self.commit_audit_duration_seconds.record(commit_latency.audit());
368            self.commit_write_duration_seconds.record(commit_latency.write());
369            self.commit_sync_duration_seconds.record(commit_latency.sync());
370            self.commit_ending_duration_seconds.record(commit_latency.ending());
371            self.commit_whole_duration_seconds.record(commit_latency.whole());
372            self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
373        }
374    }
375}
376
377#[derive(Metrics, Clone)]
378#[metrics(scope = "database.operation")]
379pub(crate) struct OperationMetrics {
380    /// Total number of database operations made
381    calls_total: Counter,
382    /// The time it took to execute a database operation (`put/upsert/insert/append/append_dup`)
383    /// with value larger than [`LARGE_VALUE_THRESHOLD_BYTES`] bytes.
384    large_value_duration_seconds: Histogram,
385}
386
387impl OperationMetrics {
388    /// Record operation metric.
389    ///
390    /// The duration it took to execute the closure is recorded only if the provided `value_size` is
391    /// larger than [`LARGE_VALUE_THRESHOLD_BYTES`].
392    pub(crate) fn record<R>(&self, value_size: Option<usize>, f: impl FnOnce() -> R) -> R {
393        self.calls_total.increment(1);
394
395        // Record duration only for large values to prevent the performance hit of clock syscall
396        // on small operations
397        if value_size.is_some_and(|size| size > LARGE_VALUE_THRESHOLD_BYTES) {
398            let start = Instant::now();
399            let result = f();
400            self.large_value_duration_seconds.record(start.elapsed());
401            result
402        } else {
403            f()
404        }
405    }
406}