1use crate::tree::{error::InsertBlockFatalError, TreeOutcome};
2use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
3use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
4use reth_errors::ProviderError;
5use reth_evm::metrics::ExecutorMetrics;
6use reth_execution_types::BlockExecutionOutput;
7use reth_metrics::{
8 metrics::{Counter, Gauge, Histogram},
9 Metrics,
10};
11use reth_primitives_traits::{constants::gas_units::MEGAGAS, FastInstant as Instant};
12use reth_trie::updates::TrieUpdates;
13use std::time::Duration;
14
15const GAS_BUCKET_THRESHOLDS: [u64; 5] =
18 [5 * MEGAGAS, 10 * MEGAGAS, 20 * MEGAGAS, 30 * MEGAGAS, 40 * MEGAGAS];
19
20const NUM_GAS_BUCKETS: usize = GAS_BUCKET_THRESHOLDS.len() + 1;
22
23#[derive(Debug, Default)]
25pub struct EngineApiMetrics {
26 pub engine: EngineMetrics,
28 pub executor: ExecutorMetrics,
30 pub block_validation: BlockValidationMetrics,
32 pub tree: TreeMetrics,
34 #[allow(dead_code)]
36 pub(crate) bal: BalMetrics,
37 pub(crate) execution_gas_buckets: ExecutionGasBucketMetrics,
39 pub(crate) block_validation_gas_buckets: BlockValidationGasBucketMetrics,
41}
42
43impl EngineApiMetrics {
44 pub fn record_block_execution<R>(
49 &self,
50 output: &BlockExecutionOutput<R>,
51 execution_duration: Duration,
52 ) {
53 let execution_secs = execution_duration.as_secs_f64();
54 let gas_used = output.result.gas_used;
55
56 self.executor.gas_processed_total.increment(gas_used);
58 self.executor.gas_per_second.set(gas_used as f64 / execution_secs);
59 self.executor.gas_used_histogram.record(gas_used as f64);
60 self.executor.execution_histogram.record(execution_secs);
61 self.executor.execution_duration.set(execution_secs);
62
63 let accounts = output.state.state.len();
65 let storage_slots =
66 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
67 let bytecodes = output.state.contracts.len();
68
69 self.executor.accounts_updated_histogram.record(accounts as f64);
70 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
71 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
72 }
73
74 pub const fn executor_metrics(&self) -> &ExecutorMetrics {
76 &self.executor
77 }
78
79 pub fn record_pre_execution(&self, elapsed: Duration) {
81 self.executor.pre_execution_histogram.record(elapsed);
82 }
83
84 pub fn record_post_execution(&self, elapsed: Duration) {
86 self.executor.post_execution_histogram.record(elapsed);
87 }
88
89 pub fn record_block_execution_gas_bucket(&self, gas_used: u64, elapsed: Duration) {
91 let idx = GasBucketMetrics::bucket_index(gas_used);
92 self.execution_gas_buckets.buckets[idx]
93 .execution_gas_bucket_histogram
94 .record(elapsed.as_secs_f64());
95 }
96
97 pub fn record_state_root_gas_bucket(&self, gas_used: u64, elapsed_secs: f64) {
99 let idx = GasBucketMetrics::bucket_index(gas_used);
100 self.block_validation_gas_buckets.buckets[idx]
101 .state_root_gas_bucket_histogram
102 .record(elapsed_secs);
103 }
104
105 pub fn record_transaction_wait(&self, elapsed: Duration) {
107 self.executor.transaction_wait_histogram.record(elapsed);
108 }
109
110 pub fn record_transaction_execution(&self, elapsed: Duration) {
112 self.executor.transaction_execution_histogram.record(elapsed);
113 }
114}
115
116#[derive(Metrics)]
118#[metrics(scope = "blockchain_tree")]
119pub struct TreeMetrics {
120 pub canonical_chain_height: Gauge,
122 #[metric(skip)]
124 pub reorgs: ReorgMetrics,
125 pub latest_reorg_depth: Gauge,
127 pub safe_block_height: Gauge,
129 pub finalized_block_height: Gauge,
131}
132
133#[derive(Debug)]
135pub struct ReorgMetrics {
136 pub head: Counter,
138 pub safe: Counter,
140 pub finalized: Counter,
142}
143
144impl Default for ReorgMetrics {
145 fn default() -> Self {
146 Self {
147 head: metrics::counter!("blockchain_tree_reorgs", "commitment" => "head"),
148 safe: metrics::counter!("blockchain_tree_reorgs", "commitment" => "safe"),
149 finalized: metrics::counter!("blockchain_tree_reorgs", "commitment" => "finalized"),
150 }
151 }
152}
153
154#[derive(Metrics)]
156#[metrics(scope = "consensus.engine.beacon")]
157pub struct EngineMetrics {
158 #[metric(skip)]
160 pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
161 #[metric(skip)]
163 pub(crate) new_payload: NewPayloadStatusMetrics,
164 pub(crate) executed_blocks: Gauge,
166 pub(crate) inserted_already_executed_blocks: Counter,
168 pub(crate) pipeline_runs: Counter,
170 pub(crate) executed_new_block_cache_miss: Counter,
172 pub(crate) persistence_duration: Histogram,
174 pub(crate) failed_new_payload_response_deliveries: Counter,
180 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
182 pub(crate) block_insert_total_duration: Histogram,
184}
185
186#[derive(Metrics)]
188#[metrics(scope = "consensus.engine.beacon")]
189pub(crate) struct ForkchoiceUpdatedMetrics {
190 #[metric(skip)]
192 pub(crate) latest_finish_at: Option<Instant>,
193 #[metric(skip)]
195 pub(crate) latest_start_at: Option<Instant>,
196 pub(crate) forkchoice_updated_messages: Counter,
198 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
200 pub(crate) forkchoice_updated_valid: Counter,
203 pub(crate) forkchoice_updated_invalid: Counter,
206 pub(crate) forkchoice_updated_syncing: Counter,
209 pub(crate) forkchoice_updated_error: Counter,
212 pub(crate) forkchoice_updated_latency: Histogram,
214 pub(crate) forkchoice_updated_last: Gauge,
216 pub(crate) new_payload_forkchoice_updated_time_diff: Histogram,
218 pub(crate) time_between_forkchoice_updated: Histogram,
221 pub(crate) forkchoice_updated_interval: Histogram,
224}
225
226impl ForkchoiceUpdatedMetrics {
227 pub(crate) fn update_response_metrics(
229 &mut self,
230 start: Instant,
231 latest_new_payload_at: &mut Option<Instant>,
232 has_attrs: bool,
233 result: &Result<TreeOutcome<OnForkChoiceUpdated>, ProviderError>,
234 ) {
235 let finish = Instant::now();
236 let elapsed = finish - start;
237
238 if let Some(prev_finish) = self.latest_finish_at {
239 self.time_between_forkchoice_updated.record(start - prev_finish);
240 }
241 if let Some(prev_start) = self.latest_start_at {
242 self.forkchoice_updated_interval.record(start - prev_start);
243 }
244 self.latest_finish_at = Some(finish);
245 self.latest_start_at = Some(start);
246
247 match result {
248 Ok(outcome) => match outcome.outcome.forkchoice_status() {
249 ForkchoiceStatus::Valid => self.forkchoice_updated_valid.increment(1),
250 ForkchoiceStatus::Invalid => self.forkchoice_updated_invalid.increment(1),
251 ForkchoiceStatus::Syncing => self.forkchoice_updated_syncing.increment(1),
252 },
253 Err(_) => self.forkchoice_updated_error.increment(1),
254 }
255 self.forkchoice_updated_messages.increment(1);
256 if has_attrs {
257 self.forkchoice_with_attributes_updated_messages.increment(1);
258 }
259 self.forkchoice_updated_latency.record(elapsed);
260 self.forkchoice_updated_last.set(elapsed);
261 if let Some(latest_new_payload_at) = latest_new_payload_at.take() {
262 self.new_payload_forkchoice_updated_time_diff.record(start - latest_new_payload_at);
263 }
264 }
265}
266
267#[derive(Clone, Metrics)]
269#[metrics(scope = "consensus.engine.beacon")]
270pub(crate) struct NewPayloadGasBucketMetrics {
271 pub(crate) new_payload_gas_bucket_latency: Histogram,
273 pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
275}
276
277#[derive(Debug)]
279pub(crate) struct GasBucketMetrics {
280 buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
281}
282
283impl Default for GasBucketMetrics {
284 fn default() -> Self {
285 Self {
286 buckets: std::array::from_fn(|i| {
287 let label = Self::bucket_label(i);
288 NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
289 }),
290 }
291 }
292}
293
294impl GasBucketMetrics {
295 fn record(&self, gas_used: u64, elapsed: Duration) {
296 let idx = Self::bucket_index(gas_used);
297 self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
298 self.buckets[idx]
299 .new_payload_gas_bucket_gas_per_second
300 .record(gas_used as f64 / elapsed.as_secs_f64());
301 }
302
303 pub(crate) fn bucket_index(gas_used: u64) -> usize {
305 GAS_BUCKET_THRESHOLDS
306 .iter()
307 .position(|&threshold| gas_used < threshold)
308 .unwrap_or(GAS_BUCKET_THRESHOLDS.len())
309 }
310
311 pub(crate) fn bucket_label(index: usize) -> String {
313 if index == 0 {
314 let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
315 format!("<{hi}M")
316 } else if index < GAS_BUCKET_THRESHOLDS.len() {
317 let lo = GAS_BUCKET_THRESHOLDS[index - 1] / MEGAGAS;
318 let hi = GAS_BUCKET_THRESHOLDS[index] / MEGAGAS;
319 format!("{lo}-{hi}M")
320 } else {
321 let lo = GAS_BUCKET_THRESHOLDS[GAS_BUCKET_THRESHOLDS.len() - 1] / MEGAGAS;
322 format!(">{lo}M")
323 }
324 }
325}
326
327#[derive(Clone, Metrics)]
329#[metrics(scope = "sync.execution")]
330pub(crate) struct ExecutionGasBucketSeries {
331 pub(crate) execution_gas_bucket_histogram: Histogram,
333}
334
335#[derive(Debug)]
337pub(crate) struct ExecutionGasBucketMetrics {
338 buckets: [ExecutionGasBucketSeries; NUM_GAS_BUCKETS],
339}
340
341impl Default for ExecutionGasBucketMetrics {
342 fn default() -> Self {
343 Self {
344 buckets: std::array::from_fn(|i| {
345 let label = GasBucketMetrics::bucket_label(i);
346 ExecutionGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
347 }),
348 }
349 }
350}
351
352#[derive(Clone, Metrics)]
354#[metrics(scope = "sync.block_validation")]
355pub(crate) struct BlockValidationGasBucketSeries {
356 pub(crate) state_root_gas_bucket_histogram: Histogram,
358}
359
360#[derive(Debug)]
362pub(crate) struct BlockValidationGasBucketMetrics {
363 buckets: [BlockValidationGasBucketSeries; NUM_GAS_BUCKETS],
364}
365
366impl Default for BlockValidationGasBucketMetrics {
367 fn default() -> Self {
368 Self {
369 buckets: std::array::from_fn(|i| {
370 let label = GasBucketMetrics::bucket_label(i);
371 BlockValidationGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
372 }),
373 }
374 }
375}
376
377#[derive(Metrics)]
379#[metrics(scope = "consensus.engine.beacon")]
380pub(crate) struct NewPayloadStatusMetrics {
381 #[metric(skip)]
383 pub(crate) latest_finish_at: Option<Instant>,
384 #[metric(skip)]
386 pub(crate) latest_start_at: Option<Instant>,
387 #[metric(skip)]
389 pub(crate) gas_bucket: GasBucketMetrics,
390 pub(crate) new_payload_messages: Counter,
392 pub(crate) new_payload_valid: Counter,
395 pub(crate) new_payload_invalid: Counter,
398 pub(crate) new_payload_syncing: Counter,
401 pub(crate) new_payload_accepted: Counter,
404 pub(crate) new_payload_error: Counter,
407 pub(crate) new_payload_total_gas: Histogram,
409 pub(crate) new_payload_total_gas_last: Gauge,
411 pub(crate) new_payload_gas_per_second: Histogram,
413 pub(crate) new_payload_gas_per_second_last: Gauge,
415 pub(crate) new_payload_latency: Histogram,
417 pub(crate) new_payload_last: Gauge,
419 pub(crate) time_between_new_payloads: Histogram,
421 pub(crate) new_payload_interval: Histogram,
423 pub(crate) forkchoice_updated_new_payload_time_diff: Histogram,
425}
426
427impl NewPayloadStatusMetrics {
428 pub(crate) fn update_response_metrics(
430 &mut self,
431 start: Instant,
432 latest_forkchoice_updated_at: &mut Option<Instant>,
433 result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
434 gas_used: u64,
435 ) {
436 let finish = Instant::now();
437 let elapsed = finish - start;
438
439 if let Some(prev_finish) = self.latest_finish_at {
440 self.time_between_new_payloads.record(start - prev_finish);
441 }
442 if let Some(prev_start) = self.latest_start_at {
443 self.new_payload_interval.record(start - prev_start);
444 }
445 self.latest_finish_at = Some(finish);
446 self.latest_start_at = Some(start);
447 match result {
448 Ok(outcome) => match outcome.outcome.status {
449 PayloadStatusEnum::Valid => {
450 self.new_payload_valid.increment(1);
451 if !outcome.already_seen {
452 self.new_payload_total_gas.record(gas_used as f64);
453 self.new_payload_total_gas_last.set(gas_used as f64);
454 let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
455 self.new_payload_gas_per_second.record(gas_per_second);
456 self.new_payload_gas_per_second_last.set(gas_per_second);
457
458 self.new_payload_latency.record(elapsed);
459 self.new_payload_last.set(elapsed);
460 self.gas_bucket.record(gas_used, elapsed);
461 }
462 }
463 PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
464 PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
465 PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
466 },
467 Err(_) => self.new_payload_error.increment(1),
468 }
469 self.new_payload_messages.increment(1);
470 if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
471 self.forkchoice_updated_new_payload_time_diff
472 .record(start - latest_forkchoice_updated_at);
473 }
474 }
475}
476
477#[allow(dead_code)]
481#[derive(Metrics, Clone)]
482#[metrics(scope = "execution.block_access_list")]
483pub(crate) struct BalMetrics {
484 pub(crate) size_bytes: Gauge,
486 pub(crate) valid_total: Counter,
488 pub(crate) invalid_total: Counter,
490 pub(crate) validation_time_seconds: Histogram,
492 pub(crate) account_changes: Gauge,
494 pub(crate) storage_changes: Gauge,
496 pub(crate) balance_changes: Gauge,
498 pub(crate) nonce_changes: Gauge,
500 pub(crate) code_changes: Gauge,
502}
503
504#[derive(Metrics, Clone)]
506#[metrics(scope = "sync.block_validation")]
507pub struct BlockValidationMetrics {
508 pub state_root_storage_tries_updated_total: Counter,
510 pub state_root_parallel_fallback_total: Counter,
512 pub state_root_task_fallback_success_total: Counter,
514 pub state_root_task_timeout_total: Counter,
516 pub state_root_duration: Gauge,
518 pub state_root_histogram: Histogram,
520 pub deferred_trie_compute_duration: Histogram,
522 pub payload_validation_duration: Gauge,
524 pub payload_validation_histogram: Histogram,
526 pub spawn_payload_processor: Histogram,
528 pub post_execution_validation_duration: Histogram,
530 pub total_duration: Histogram,
532 pub hashed_post_state_size: Histogram,
534 pub trie_updates_sorted_size: Histogram,
536 pub anchored_overlay_trie_updates_size: Histogram,
538 pub anchored_overlay_hashed_state_size: Histogram,
540}
541
542impl BlockValidationMetrics {
543 pub fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
545 self.state_root_storage_tries_updated_total
546 .increment(trie_output.storage_tries_ref().len() as u64);
547 self.state_root_duration.set(elapsed_as_secs);
548 self.state_root_histogram.record(elapsed_as_secs);
549 }
550
551 pub fn record_payload_validation(&self, elapsed_as_secs: f64) {
554 self.payload_validation_duration.set(elapsed_as_secs);
555 self.payload_validation_histogram.record(elapsed_as_secs);
556 }
557}
558
559#[derive(Metrics)]
561#[metrics(scope = "blockchain_tree.block_buffer")]
562pub(crate) struct BlockBufferMetrics {
563 pub blocks: Gauge,
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570 use alloy_eips::eip7685::Requests;
571 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
572 use reth_ethereum_primitives::Receipt;
573 use reth_execution_types::BlockExecutionResult;
574 use reth_revm::db::BundleState;
575
576 fn setup_test_recorder() -> Snapshotter {
577 let recorder = DebuggingRecorder::new();
578 let snapshotter = recorder.snapshotter();
579 recorder.install().unwrap();
580 snapshotter
581 }
582
583 #[test]
584 fn test_record_block_execution_metrics() {
585 let snapshotter = setup_test_recorder();
586 let metrics = EngineApiMetrics::default();
587
588 metrics.executor.gas_processed_total.increment(0);
590 metrics.executor.gas_per_second.set(0.0);
591 metrics.executor.gas_used_histogram.record(0.0);
592
593 let output = BlockExecutionOutput::<Receipt> {
594 state: BundleState::default(),
595 result: BlockExecutionResult {
596 receipts: vec![],
597 requests: Requests::default(),
598 gas_used: 21000,
599 blob_gas_used: 0,
600 },
601 };
602
603 metrics.record_block_execution(&output, Duration::from_millis(100));
604
605 let snapshot = snapshotter.snapshot().into_vec();
606
607 let mut found_metrics = false;
609 for (key, _unit, _desc, _value) in snapshot {
610 let metric_name = key.key().name();
611 if metric_name.starts_with("sync.execution") {
612 found_metrics = true;
613 break;
614 }
615 }
616
617 assert!(found_metrics, "Expected to find sync.execution metrics");
618 }
619}