Skip to main content

reth_db/
metrics.rs

1use crate::Tables;
2use metrics::Histogram;
3use quanta::Instant;
4use reth_metrics::{metrics::Counter, Metrics};
5use rustc_hash::FxHashMap;
6use std::time::Duration;
7use strum::{EnumCount, EnumIter, IntoEnumIterator};
8
9const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
10
11/// Caches metric handles for database environment to make sure handles are not re-created
12/// on every operation.
13///
14/// Requires a metric recorder to be registered before creating an instance of this struct.
15/// Otherwise, metric recording will no-op.
16#[derive(Debug)]
17pub(crate) struct DatabaseEnvMetrics {
18    /// Caches `OperationMetrics` handles for each table and operation tuple.
19    operations: FxHashMap<(&'static str, Operation), OperationMetrics>,
20    /// Caches `TransactionMetrics` handles for counters grouped by only transaction mode.
21    /// Updated both at tx open and close.
22    transactions: FxHashMap<TransactionMode, TransactionMetrics>,
23    /// Caches `TransactionOutcomeMetrics` handles for counters grouped by transaction mode and
24    /// outcome. Can only be updated at tx close, as outcome is only known at that point.
25    transaction_outcomes:
26        FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
27}
28
29impl DatabaseEnvMetrics {
30    pub(crate) fn new() -> Self {
31        // Pre-populate metric handle maps with all possible combinations of labels
32        // to avoid runtime locks on the map when recording metrics.
33        Self {
34            operations: Self::generate_operation_handles(),
35            transactions: Self::generate_transaction_handles(),
36            transaction_outcomes: Self::generate_transaction_outcome_handles(),
37        }
38    }
39
40    /// Generate a map of all possible operation handles for each table and operation tuple.
41    /// Used for tracking all operation metrics.
42    fn generate_operation_handles() -> FxHashMap<(&'static str, Operation), OperationMetrics> {
43        let mut operations = FxHashMap::with_capacity_and_hasher(
44            Tables::COUNT * Operation::COUNT,
45            Default::default(),
46        );
47        for table in Tables::ALL {
48            for operation in Operation::iter() {
49                operations.insert(
50                    (table.name(), operation),
51                    OperationMetrics::new_with_labels(&[
52                        (Labels::Table.as_str(), table.name()),
53                        (Labels::Operation.as_str(), operation.as_str()),
54                    ]),
55                );
56            }
57        }
58        operations
59    }
60
61    /// Generate a map of all possible transaction modes to metric handles.
62    /// Used for tracking a counter of open transactions.
63    fn generate_transaction_handles() -> FxHashMap<TransactionMode, TransactionMetrics> {
64        TransactionMode::iter()
65            .map(|mode| {
66                (
67                    mode,
68                    TransactionMetrics::new_with_labels(&[(
69                        Labels::TransactionMode.as_str(),
70                        mode.as_str(),
71                    )]),
72                )
73            })
74            .collect()
75    }
76
77    /// Generate a map of all possible transaction mode and outcome handles.
78    /// Used for tracking various stats for finished transactions (e.g. commit duration).
79    fn generate_transaction_outcome_handles(
80    ) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> {
81        let mut transaction_outcomes = FxHashMap::with_capacity_and_hasher(
82            TransactionMode::COUNT * TransactionOutcome::COUNT,
83            Default::default(),
84        );
85        for mode in TransactionMode::iter() {
86            for outcome in TransactionOutcome::iter() {
87                transaction_outcomes.insert(
88                    (mode, outcome),
89                    TransactionOutcomeMetrics::new_with_labels(&[
90                        (Labels::TransactionMode.as_str(), mode.as_str()),
91                        (Labels::TransactionOutcome.as_str(), outcome.as_str()),
92                    ]),
93                );
94            }
95        }
96        transaction_outcomes
97    }
98
99    /// Record a metric for database operation executed in `f`.
100    /// Panics if a metric recorder is not found for the given table and operation.
101    pub(crate) fn record_operation<R>(
102        &self,
103        table: &'static str,
104        operation: Operation,
105        value_size: Option<usize>,
106        f: impl FnOnce() -> R,
107    ) -> R {
108        if let Some(metrics) = self.operations.get(&(table, operation)) {
109            metrics.record(value_size, f)
110        } else {
111            f()
112        }
113    }
114
115    /// Record metrics for opening a database transaction.
116    pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
117        self.transactions
118            .get(&mode)
119            .expect("transaction mode metric handle not found")
120            .record_open();
121    }
122
123    /// Record metrics for closing a database transactions.
124    #[cfg(feature = "mdbx")]
125    pub(crate) fn record_closed_transaction(
126        &self,
127        mode: TransactionMode,
128        outcome: TransactionOutcome,
129        open_duration: Duration,
130        close_duration: Option<Duration>,
131        commit_latency: Option<reth_libmdbx::CommitLatency>,
132    ) {
133        self.transactions
134            .get(&mode)
135            .expect("transaction mode metric handle not found")
136            .record_close();
137
138        self.transaction_outcomes
139            .get(&(mode, outcome))
140            .expect("transaction outcome metric handle not found")
141            .record(open_duration, close_duration, commit_latency);
142    }
143}
144
145/// Transaction mode for the database, either read-only or read-write.
146#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
147pub(crate) enum TransactionMode {
148    /// Read-only transaction mode.
149    ReadOnly,
150    /// Read-write transaction mode.
151    ReadWrite,
152}
153
154impl TransactionMode {
155    /// Returns the transaction mode as a string.
156    pub(crate) const fn as_str(&self) -> &'static str {
157        match self {
158            Self::ReadOnly => "read-only",
159            Self::ReadWrite => "read-write",
160        }
161    }
162
163    /// Returns `true` if the transaction mode is read-only.
164    pub(crate) const fn is_read_only(&self) -> bool {
165        matches!(self, Self::ReadOnly)
166    }
167}
168
169/// Transaction outcome after a database operation - commit, abort, or drop.
170#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
171pub(crate) enum TransactionOutcome {
172    /// Successful commit of the transaction.
173    Commit,
174    /// Aborted transaction.
175    Abort,
176    /// Dropped transaction.
177    Drop,
178}
179
180impl TransactionOutcome {
181    /// Returns the transaction outcome as a string.
182    pub(crate) const fn as_str(&self) -> &'static str {
183        match self {
184            Self::Commit => "commit",
185            Self::Abort => "abort",
186            Self::Drop => "drop",
187        }
188    }
189
190    /// Returns `true` if the transaction outcome is a commit.
191    pub(crate) const fn is_commit(&self) -> bool {
192        matches!(self, Self::Commit)
193    }
194}
195
196/// Types of operations conducted on the database: get, put, delete, and various cursor operations.
197#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
198pub(crate) enum Operation {
199    /// Database get operation.
200    Get,
201    /// Database put upsert operation.
202    PutUpsert,
203    /// Database put append operation.
204    PutAppend,
205    /// Database delete operation.
206    Delete,
207    /// Database cursor upsert operation.
208    CursorUpsert,
209    /// Database cursor insert operation.
210    CursorInsert,
211    /// Database cursor append operation.
212    CursorAppend,
213    /// Database cursor append duplicates operation.
214    CursorAppendDup,
215    /// Database cursor delete current operation.
216    CursorDeleteCurrent,
217    /// Database cursor delete current duplicates operation.
218    CursorDeleteCurrentDuplicates,
219}
220
221impl Operation {
222    /// Returns the operation as a string.
223    pub(crate) const fn as_str(&self) -> &'static str {
224        match self {
225            Self::Get => "get",
226            Self::PutUpsert => "put-upsert",
227            Self::PutAppend => "put-append",
228            Self::Delete => "delete",
229            Self::CursorUpsert => "cursor-upsert",
230            Self::CursorInsert => "cursor-insert",
231            Self::CursorAppend => "cursor-append",
232            Self::CursorAppendDup => "cursor-append-dup",
233            Self::CursorDeleteCurrent => "cursor-delete-current",
234            Self::CursorDeleteCurrentDuplicates => "cursor-delete-current-duplicates",
235        }
236    }
237}
238
239/// Enum defining labels for various aspects used in metrics.
240enum Labels {
241    /// Label representing a table.
242    Table,
243    /// Label representing a transaction mode.
244    TransactionMode,
245    /// Label representing a transaction outcome.
246    TransactionOutcome,
247    /// Label representing a database operation.
248    Operation,
249}
250
251impl Labels {
252    /// Converts each label variant into its corresponding string representation.
253    pub(crate) const fn as_str(&self) -> &'static str {
254        match self {
255            Self::Table => "table",
256            Self::TransactionMode => "mode",
257            Self::TransactionOutcome => "outcome",
258            Self::Operation => "operation",
259        }
260    }
261}
262
263#[derive(Metrics, Clone)]
264#[metrics(scope = "database.transaction")]
265pub(crate) struct TransactionMetrics {
266    /// Total number of opened database transactions (cumulative)
267    opened_total: Counter,
268    /// Total number of closed database transactions (cumulative)
269    closed_total: Counter,
270}
271
272impl TransactionMetrics {
273    pub(crate) fn record_open(&self) {
274        self.opened_total.increment(1);
275    }
276
277    pub(crate) fn record_close(&self) {
278        self.closed_total.increment(1);
279    }
280}
281
282#[derive(Metrics, Clone)]
283#[metrics(scope = "database.transaction")]
284pub(crate) struct TransactionOutcomeMetrics {
285    /// The time a database transaction has been open
286    open_duration_seconds: Histogram,
287    /// The time it took to close a database transaction
288    close_duration_seconds: Histogram,
289    /// The time it took to prepare a transaction commit
290    commit_preparation_duration_seconds: Histogram,
291    /// Duration of GC update during transaction commit by wall clock
292    commit_gc_wallclock_duration_seconds: Histogram,
293    /// The time it took to conduct audit of a transaction commit
294    commit_audit_duration_seconds: Histogram,
295    /// The time it took to write dirty/modified data pages to a filesystem during transaction
296    /// commit
297    commit_write_duration_seconds: Histogram,
298    /// The time it took to sync written data to the disk/storage during transaction commit
299    commit_sync_duration_seconds: Histogram,
300    /// The time it took to release resources during transaction commit
301    commit_ending_duration_seconds: Histogram,
302    /// The total duration of a transaction commit
303    commit_whole_duration_seconds: Histogram,
304    /// User-mode CPU time spent on GC update during transaction commit
305    commit_gc_cputime_duration_seconds: Histogram,
306}
307
308impl TransactionOutcomeMetrics {
309    /// Record transaction closing with the duration it was open and the duration it took to close
310    /// it.
311    #[cfg(feature = "mdbx")]
312    pub(crate) fn record(
313        &self,
314        open_duration: Duration,
315        close_duration: Option<Duration>,
316        commit_latency: Option<reth_libmdbx::CommitLatency>,
317    ) {
318        self.open_duration_seconds.record(open_duration);
319
320        if let Some(close_duration) = close_duration {
321            self.close_duration_seconds.record(close_duration)
322        }
323
324        if let Some(commit_latency) = commit_latency {
325            self.commit_preparation_duration_seconds.record(commit_latency.preparation());
326            self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
327            self.commit_audit_duration_seconds.record(commit_latency.audit());
328            self.commit_write_duration_seconds.record(commit_latency.write());
329            self.commit_sync_duration_seconds.record(commit_latency.sync());
330            self.commit_ending_duration_seconds.record(commit_latency.ending());
331            self.commit_whole_duration_seconds.record(commit_latency.whole());
332            self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
333        }
334    }
335}
336
337#[derive(Metrics, Clone)]
338#[metrics(scope = "database.operation")]
339pub(crate) struct OperationMetrics {
340    /// Total number of database operations made
341    calls_total: Counter,
342    /// The time it took to execute a database operation (`put/upsert/insert/append/append_dup`)
343    /// with value larger than [`LARGE_VALUE_THRESHOLD_BYTES`] bytes.
344    large_value_duration_seconds: Histogram,
345}
346
347impl OperationMetrics {
348    /// Record operation metric.
349    ///
350    /// The duration it took to execute the closure is recorded only if the provided `value_size` is
351    /// larger than [`LARGE_VALUE_THRESHOLD_BYTES`].
352    pub(crate) fn record<R>(&self, value_size: Option<usize>, f: impl FnOnce() -> R) -> R {
353        self.calls_total.increment(1);
354
355        // Record duration only for large values to prevent the performance hit of clock syscall
356        // on small operations
357        if value_size.is_some_and(|size| size > LARGE_VALUE_THRESHOLD_BYTES) {
358            let start = Instant::now();
359            let result = f();
360            self.large_value_duration_seconds.record(start.elapsed());
361            result
362        } else {
363            f()
364        }
365    }
366}