reth_db/metrics.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
use crate::Tables;
use metrics::{Gauge, Histogram};
use reth_metrics::{metrics::Counter, Metrics};
use rustc_hash::FxHashMap;
use std::time::{Duration, Instant};
use strum::{EnumCount, EnumIter, IntoEnumIterator};
const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
/// Caches metric handles for database environment to make sure handles are not re-created
/// on every operation.
///
/// Requires a metric recorder to be registered before creating an instance of this struct.
/// Otherwise, metric recording will no-op.
#[derive(Debug)]
pub(crate) struct DatabaseEnvMetrics {
/// Caches `OperationMetrics` handles for each table and operation tuple.
operations: FxHashMap<(&'static str, Operation), OperationMetrics>,
/// Caches `TransactionMetrics` handles for counters grouped by only transaction mode.
/// Updated both at tx open and close.
transactions: FxHashMap<TransactionMode, TransactionMetrics>,
/// Caches `TransactionOutcomeMetrics` handles for counters grouped by transaction mode and
/// outcome. Can only be updated at tx close, as outcome is only known at that point.
transaction_outcomes:
FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
}
impl DatabaseEnvMetrics {
pub(crate) fn new() -> Self {
// Pre-populate metric handle maps with all possible combinations of labels
// to avoid runtime locks on the map when recording metrics.
Self {
operations: Self::generate_operation_handles(),
transactions: Self::generate_transaction_handles(),
transaction_outcomes: Self::generate_transaction_outcome_handles(),
}
}
/// Generate a map of all possible operation handles for each table and operation tuple.
/// Used for tracking all operation metrics.
fn generate_operation_handles() -> FxHashMap<(&'static str, Operation), OperationMetrics> {
let mut operations = FxHashMap::with_capacity_and_hasher(
Tables::COUNT * Operation::COUNT,
Default::default(),
);
for table in Tables::ALL {
for operation in Operation::iter() {
operations.insert(
(table.name(), operation),
OperationMetrics::new_with_labels(&[
(Labels::Table.as_str(), table.name()),
(Labels::Operation.as_str(), operation.as_str()),
]),
);
}
}
operations
}
/// Generate a map of all possible transaction modes to metric handles.
/// Used for tracking a counter of open transactions.
fn generate_transaction_handles() -> FxHashMap<TransactionMode, TransactionMetrics> {
TransactionMode::iter()
.map(|mode| {
(
mode,
TransactionMetrics::new_with_labels(&[(
Labels::TransactionMode.as_str(),
mode.as_str(),
)]),
)
})
.collect()
}
/// Generate a map of all possible transaction mode and outcome handles.
/// Used for tracking various stats for finished transactions (e.g. commit duration).
fn generate_transaction_outcome_handles(
) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> {
let mut transaction_outcomes = FxHashMap::with_capacity_and_hasher(
TransactionMode::COUNT * TransactionOutcome::COUNT,
Default::default(),
);
for mode in TransactionMode::iter() {
for outcome in TransactionOutcome::iter() {
transaction_outcomes.insert(
(mode, outcome),
TransactionOutcomeMetrics::new_with_labels(&[
(Labels::TransactionMode.as_str(), mode.as_str()),
(Labels::TransactionOutcome.as_str(), outcome.as_str()),
]),
);
}
}
transaction_outcomes
}
/// Record a metric for database operation executed in `f`.
/// Panics if a metric recorder is not found for the given table and operation.
pub(crate) fn record_operation<R>(
&self,
table: &'static str,
operation: Operation,
value_size: Option<usize>,
f: impl FnOnce() -> R,
) -> R {
if let Some(metrics) = self.operations.get(&(table, operation)) {
metrics.record(value_size, f)
} else {
f()
}
}
/// Record metrics for opening a database transaction.
pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
self.transactions
.get(&mode)
.expect("transaction mode metric handle not found")
.record_open();
}
/// Record metrics for closing a database transactions.
#[cfg(feature = "mdbx")]
pub(crate) fn record_closed_transaction(
&self,
mode: TransactionMode,
outcome: TransactionOutcome,
open_duration: Duration,
close_duration: Option<Duration>,
commit_latency: Option<reth_libmdbx::CommitLatency>,
) {
self.transactions
.get(&mode)
.expect("transaction mode metric handle not found")
.record_close();
self.transaction_outcomes
.get(&(mode, outcome))
.expect("transaction outcome metric handle not found")
.record(open_duration, close_duration, commit_latency);
}
}
/// Transaction mode for the database, either read-only or read-write.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
pub(crate) enum TransactionMode {
/// Read-only transaction mode.
ReadOnly,
/// Read-write transaction mode.
ReadWrite,
}
impl TransactionMode {
/// Returns the transaction mode as a string.
pub(crate) const fn as_str(&self) -> &'static str {
match self {
Self::ReadOnly => "read-only",
Self::ReadWrite => "read-write",
}
}
/// Returns `true` if the transaction mode is read-only.
pub(crate) const fn is_read_only(&self) -> bool {
matches!(self, Self::ReadOnly)
}
}
/// Transaction outcome after a database operation - commit, abort, or drop.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
pub(crate) enum TransactionOutcome {
/// Successful commit of the transaction.
Commit,
/// Aborted transaction.
Abort,
/// Dropped transaction.
Drop,
}
impl TransactionOutcome {
/// Returns the transaction outcome as a string.
pub(crate) const fn as_str(&self) -> &'static str {
match self {
Self::Commit => "commit",
Self::Abort => "abort",
Self::Drop => "drop",
}
}
/// Returns `true` if the transaction outcome is a commit.
pub(crate) const fn is_commit(&self) -> bool {
matches!(self, Self::Commit)
}
}
/// Types of operations conducted on the database: get, put, delete, and various cursor operations.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
pub(crate) enum Operation {
/// Database get operation.
Get,
/// Database put operation.
Put,
/// Database delete operation.
Delete,
/// Database cursor upsert operation.
CursorUpsert,
/// Database cursor insert operation.
CursorInsert,
/// Database cursor append operation.
CursorAppend,
/// Database cursor append duplicates operation.
CursorAppendDup,
/// Database cursor delete current operation.
CursorDeleteCurrent,
/// Database cursor delete current duplicates operation.
CursorDeleteCurrentDuplicates,
}
impl Operation {
/// Returns the operation as a string.
pub(crate) const fn as_str(&self) -> &'static str {
match self {
Self::Get => "get",
Self::Put => "put",
Self::Delete => "delete",
Self::CursorUpsert => "cursor-upsert",
Self::CursorInsert => "cursor-insert",
Self::CursorAppend => "cursor-append",
Self::CursorAppendDup => "cursor-append-dup",
Self::CursorDeleteCurrent => "cursor-delete-current",
Self::CursorDeleteCurrentDuplicates => "cursor-delete-current-duplicates",
}
}
}
/// Enum defining labels for various aspects used in metrics.
enum Labels {
/// Label representing a table.
Table,
/// Label representing a transaction mode.
TransactionMode,
/// Label representing a transaction outcome.
TransactionOutcome,
/// Label representing a database operation.
Operation,
}
impl Labels {
/// Converts each label variant into its corresponding string representation.
pub(crate) const fn as_str(&self) -> &'static str {
match self {
Self::Table => "table",
Self::TransactionMode => "mode",
Self::TransactionOutcome => "outcome",
Self::Operation => "operation",
}
}
}
#[derive(Metrics, Clone)]
#[metrics(scope = "database.transaction")]
pub(crate) struct TransactionMetrics {
/// Total number of currently open database transactions
open_total: Gauge,
}
impl TransactionMetrics {
pub(crate) fn record_open(&self) {
self.open_total.increment(1.0);
}
pub(crate) fn record_close(&self) {
self.open_total.decrement(1.0);
}
}
#[derive(Metrics, Clone)]
#[metrics(scope = "database.transaction")]
pub(crate) struct TransactionOutcomeMetrics {
/// The time a database transaction has been open
open_duration_seconds: Histogram,
/// The time it took to close a database transaction
close_duration_seconds: Histogram,
/// The time it took to prepare a transaction commit
commit_preparation_duration_seconds: Histogram,
/// Duration of GC update during transaction commit by wall clock
commit_gc_wallclock_duration_seconds: Histogram,
/// The time it took to conduct audit of a transaction commit
commit_audit_duration_seconds: Histogram,
/// The time it took to write dirty/modified data pages to a filesystem during transaction
/// commit
commit_write_duration_seconds: Histogram,
/// The time it took to sync written data to the disk/storage during transaction commit
commit_sync_duration_seconds: Histogram,
/// The time it took to release resources during transaction commit
commit_ending_duration_seconds: Histogram,
/// The total duration of a transaction commit
commit_whole_duration_seconds: Histogram,
/// User-mode CPU time spent on GC update during transaction commit
commit_gc_cputime_duration_seconds: Histogram,
}
impl TransactionOutcomeMetrics {
/// Record transaction closing with the duration it was open and the duration it took to close
/// it.
#[cfg(feature = "mdbx")]
pub(crate) fn record(
&self,
open_duration: Duration,
close_duration: Option<Duration>,
commit_latency: Option<reth_libmdbx::CommitLatency>,
) {
self.open_duration_seconds.record(open_duration);
if let Some(close_duration) = close_duration {
self.close_duration_seconds.record(close_duration)
}
if let Some(commit_latency) = commit_latency {
self.commit_preparation_duration_seconds.record(commit_latency.preparation());
self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
self.commit_audit_duration_seconds.record(commit_latency.audit());
self.commit_write_duration_seconds.record(commit_latency.write());
self.commit_sync_duration_seconds.record(commit_latency.sync());
self.commit_ending_duration_seconds.record(commit_latency.ending());
self.commit_whole_duration_seconds.record(commit_latency.whole());
self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
}
}
}
#[derive(Metrics, Clone)]
#[metrics(scope = "database.operation")]
pub(crate) struct OperationMetrics {
/// Total number of database operations made
calls_total: Counter,
/// The time it took to execute a database operation (`put/upsert/insert/append/append_dup`)
/// with value larger than [`LARGE_VALUE_THRESHOLD_BYTES`] bytes.
large_value_duration_seconds: Histogram,
}
impl OperationMetrics {
/// Record operation metric.
///
/// The duration it took to execute the closure is recorded only if the provided `value_size` is
/// larger than [`LARGE_VALUE_THRESHOLD_BYTES`].
pub(crate) fn record<R>(&self, value_size: Option<usize>, f: impl FnOnce() -> R) -> R {
self.calls_total.increment(1);
// Record duration only for large values to prevent the performance hit of clock syscall
// on small operations
if value_size.is_some_and(|size| size > LARGE_VALUE_THRESHOLD_BYTES) {
let start = Instant::now();
let result = f();
self.large_value_duration_seconds.record(start.elapsed());
result
} else {
f()
}
}
}