1use crate::Tables;
2use metrics::Histogram;
3use quanta::Instant;
4use reth_metrics::{metrics::Counter, Metrics};
5use rustc_hash::FxHashMap;
6use std::{array, sync::Arc, time::Duration};
7use strum::{EnumCount, EnumIter, IntoEnumIterator};
8
9const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
10
11#[derive(Debug)]
17pub(crate) struct DatabaseEnvMetrics {
18 operations: FxHashMap<&'static str, TableOperationMetrics>,
20 transactions: FxHashMap<TransactionMode, TransactionMetrics>,
23 transaction_outcomes:
26 FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
27}
28
29pub(crate) type TableOperationMetrics = Arc<[OperationMetrics; Operation::COUNT]>;
31
32impl DatabaseEnvMetrics {
33 pub(crate) fn new() -> Self {
34 Self {
37 operations: Self::generate_operation_handles(),
38 transactions: Self::generate_transaction_handles(),
39 transaction_outcomes: Self::generate_transaction_outcome_handles(),
40 }
41 }
42
43 fn generate_operation_handles() -> FxHashMap<&'static str, TableOperationMetrics> {
45 let mut operations = FxHashMap::with_capacity_and_hasher(Tables::COUNT, Default::default());
46
47 for table in Tables::ALL {
48 let table_name = table.name();
49 let metrics = array::from_fn(|index| {
50 let operation = Operation::from_index(index);
51 OperationMetrics::new_with_labels(&[
52 (Labels::Table.as_str(), table_name),
53 (Labels::Operation.as_str(), operation.as_str()),
54 ])
55 });
56
57 operations.insert(table_name, Arc::new(metrics));
58 }
59
60 operations
61 }
62
63 fn generate_transaction_handles() -> FxHashMap<TransactionMode, TransactionMetrics> {
66 TransactionMode::iter()
67 .map(|mode| {
68 (
69 mode,
70 TransactionMetrics::new_with_labels(&[(
71 Labels::TransactionMode.as_str(),
72 mode.as_str(),
73 )]),
74 )
75 })
76 .collect()
77 }
78
79 fn generate_transaction_outcome_handles(
82 ) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> {
83 let mut transaction_outcomes = FxHashMap::with_capacity_and_hasher(
84 TransactionMode::COUNT * TransactionOutcome::COUNT,
85 Default::default(),
86 );
87 for mode in TransactionMode::iter() {
88 for outcome in TransactionOutcome::iter() {
89 transaction_outcomes.insert(
90 (mode, outcome),
91 TransactionOutcomeMetrics::new_with_labels(&[
92 (Labels::TransactionMode.as_str(), mode.as_str()),
93 (Labels::TransactionOutcome.as_str(), outcome.as_str()),
94 ]),
95 );
96 }
97 }
98 transaction_outcomes
99 }
100
101 pub(crate) fn record_operation<R>(
104 &self,
105 table: &'static str,
106 operation: Operation,
107 value_size: Option<usize>,
108 f: impl FnOnce() -> R,
109 ) -> R {
110 if let Some(metrics) = self.operations.get(table) {
111 metrics[operation.index()].record(value_size, f)
112 } else {
113 f()
114 }
115 }
116
117 pub(crate) fn table_operation_metrics(&self, table: &'static str) -> TableOperationMetrics {
119 self.operations.get(table).expect("table operation metric handles not found").clone()
120 }
121
122 pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
124 self.transactions
125 .get(&mode)
126 .expect("transaction mode metric handle not found")
127 .record_open();
128 }
129
130 #[cfg(feature = "mdbx")]
132 pub(crate) fn record_closed_transaction(
133 &self,
134 mode: TransactionMode,
135 outcome: TransactionOutcome,
136 open_duration: Duration,
137 close_duration: Option<Duration>,
138 commit_latency: Option<reth_libmdbx::CommitLatency>,
139 ) {
140 self.transactions
141 .get(&mode)
142 .expect("transaction mode metric handle not found")
143 .record_close();
144
145 self.transaction_outcomes
146 .get(&(mode, outcome))
147 .expect("transaction outcome metric handle not found")
148 .record(open_duration, close_duration, commit_latency);
149 }
150}
151
152#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
154pub(crate) enum TransactionMode {
155 ReadOnly,
157 ReadWrite,
159}
160
161impl TransactionMode {
162 pub(crate) const fn as_str(&self) -> &'static str {
164 match self {
165 Self::ReadOnly => "read-only",
166 Self::ReadWrite => "read-write",
167 }
168 }
169
170 pub(crate) const fn is_read_only(&self) -> bool {
172 matches!(self, Self::ReadOnly)
173 }
174}
175
176#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
178pub(crate) enum TransactionOutcome {
179 Commit,
181 Abort,
183 Drop,
185}
186
187impl TransactionOutcome {
188 pub(crate) const fn as_str(&self) -> &'static str {
190 match self {
191 Self::Commit => "commit",
192 Self::Abort => "abort",
193 Self::Drop => "drop",
194 }
195 }
196
197 pub(crate) const fn is_commit(&self) -> bool {
199 matches!(self, Self::Commit)
200 }
201}
202
203#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
205pub(crate) enum Operation {
206 Get,
208 PutUpsert,
210 PutAppend,
212 Delete,
214 CursorUpsert,
216 CursorInsert,
218 CursorAppend,
220 CursorAppendDup,
222 CursorDeleteCurrent,
224 CursorDeleteCurrentDuplicates,
226}
227
228impl Operation {
229 pub(crate) const fn index(&self) -> usize {
231 match self {
232 Self::Get => 0,
233 Self::PutUpsert => 1,
234 Self::PutAppend => 2,
235 Self::Delete => 3,
236 Self::CursorUpsert => 4,
237 Self::CursorInsert => 5,
238 Self::CursorAppend => 6,
239 Self::CursorAppendDup => 7,
240 Self::CursorDeleteCurrent => 8,
241 Self::CursorDeleteCurrentDuplicates => 9,
242 }
243 }
244
245 const fn from_index(index: usize) -> Self {
247 match index {
248 0 => Self::Get,
249 1 => Self::PutUpsert,
250 2 => Self::PutAppend,
251 3 => Self::Delete,
252 4 => Self::CursorUpsert,
253 5 => Self::CursorInsert,
254 6 => Self::CursorAppend,
255 7 => Self::CursorAppendDup,
256 8 => Self::CursorDeleteCurrent,
257 9 => Self::CursorDeleteCurrentDuplicates,
258 _ => panic!("invalid operation index"),
259 }
260 }
261
262 pub(crate) const fn as_str(&self) -> &'static str {
264 match self {
265 Self::Get => "get",
266 Self::PutUpsert => "put-upsert",
267 Self::PutAppend => "put-append",
268 Self::Delete => "delete",
269 Self::CursorUpsert => "cursor-upsert",
270 Self::CursorInsert => "cursor-insert",
271 Self::CursorAppend => "cursor-append",
272 Self::CursorAppendDup => "cursor-append-dup",
273 Self::CursorDeleteCurrent => "cursor-delete-current",
274 Self::CursorDeleteCurrentDuplicates => "cursor-delete-current-duplicates",
275 }
276 }
277}
278
279enum Labels {
281 Table,
283 TransactionMode,
285 TransactionOutcome,
287 Operation,
289}
290
291impl Labels {
292 pub(crate) const fn as_str(&self) -> &'static str {
294 match self {
295 Self::Table => "table",
296 Self::TransactionMode => "mode",
297 Self::TransactionOutcome => "outcome",
298 Self::Operation => "operation",
299 }
300 }
301}
302
303#[derive(Metrics, Clone)]
304#[metrics(scope = "database.transaction")]
305pub(crate) struct TransactionMetrics {
306 opened_total: Counter,
308 closed_total: Counter,
310}
311
312impl TransactionMetrics {
313 pub(crate) fn record_open(&self) {
314 self.opened_total.increment(1);
315 }
316
317 pub(crate) fn record_close(&self) {
318 self.closed_total.increment(1);
319 }
320}
321
322#[derive(Metrics, Clone)]
323#[metrics(scope = "database.transaction")]
324pub(crate) struct TransactionOutcomeMetrics {
325 open_duration_seconds: Histogram,
327 close_duration_seconds: Histogram,
329 commit_preparation_duration_seconds: Histogram,
331 commit_gc_wallclock_duration_seconds: Histogram,
333 commit_audit_duration_seconds: Histogram,
335 commit_write_duration_seconds: Histogram,
338 commit_sync_duration_seconds: Histogram,
340 commit_ending_duration_seconds: Histogram,
342 commit_whole_duration_seconds: Histogram,
344 commit_gc_cputime_duration_seconds: Histogram,
346}
347
348impl TransactionOutcomeMetrics {
349 #[cfg(feature = "mdbx")]
352 pub(crate) fn record(
353 &self,
354 open_duration: Duration,
355 close_duration: Option<Duration>,
356 commit_latency: Option<reth_libmdbx::CommitLatency>,
357 ) {
358 self.open_duration_seconds.record(open_duration);
359
360 if let Some(close_duration) = close_duration {
361 self.close_duration_seconds.record(close_duration)
362 }
363
364 if let Some(commit_latency) = commit_latency {
365 self.commit_preparation_duration_seconds.record(commit_latency.preparation());
366 self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
367 self.commit_audit_duration_seconds.record(commit_latency.audit());
368 self.commit_write_duration_seconds.record(commit_latency.write());
369 self.commit_sync_duration_seconds.record(commit_latency.sync());
370 self.commit_ending_duration_seconds.record(commit_latency.ending());
371 self.commit_whole_duration_seconds.record(commit_latency.whole());
372 self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
373 }
374 }
375}
376
377#[derive(Metrics, Clone)]
378#[metrics(scope = "database.operation")]
379pub(crate) struct OperationMetrics {
380 calls_total: Counter,
382 large_value_duration_seconds: Histogram,
385}
386
387impl OperationMetrics {
388 pub(crate) fn record<R>(&self, value_size: Option<usize>, f: impl FnOnce() -> R) -> R {
393 self.calls_total.increment(1);
394
395 if value_size.is_some_and(|size| size > LARGE_VALUE_THRESHOLD_BYTES) {
398 let start = Instant::now();
399 let result = f();
400 self.large_value_duration_seconds.record(start.elapsed());
401 result
402 } else {
403 f()
404 }
405 }
406}