1use crate::tree::{error::InsertBlockFatalError, TreeOutcome};
2use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
3use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
4use reth_errors::ProviderError;
5use reth_evm::metrics::ExecutorMetrics;
6use reth_execution_types::BlockExecutionOutput;
7use reth_metrics::{
8 metrics::{Counter, Gauge, Histogram},
9 Metrics,
10};
11use reth_primitives_traits::{constants::gas_units::MEGAGAS, FastInstant as Instant};
12use reth_trie::updates::TrieUpdates;
13use std::time::Duration;
14
15const GAS_BUCKET_THRESHOLDS: [u64; 5] =
18 [5 * MEGAGAS, 10 * MEGAGAS, 20 * MEGAGAS, 30 * MEGAGAS, 40 * MEGAGAS];
19
20const NUM_GAS_BUCKETS: usize = GAS_BUCKET_THRESHOLDS.len() + 1;
22
23#[derive(Debug, Default)]
25pub struct EngineApiMetrics {
26 pub engine: EngineMetrics,
28 pub executor: ExecutorMetrics,
30 pub block_validation: BlockValidationMetrics,
32 pub tree: TreeMetrics,
34 #[allow(dead_code)]
36 pub(crate) bal: BalMetrics,
37 pub(crate) execution_gas_buckets: ExecutionGasBucketMetrics,
39 pub(crate) block_validation_gas_buckets: BlockValidationGasBucketMetrics,
41}
42
43impl EngineApiMetrics {
44 pub fn record_block_execution<R>(
49 &self,
50 output: &BlockExecutionOutput<R>,
51 execution_duration: Duration,
52 ) {
53 let execution_secs = execution_duration.as_secs_f64();
54 let gas_used = output.result.gas_used;
55
56 self.executor.gas_processed_total.increment(gas_used);
58 self.executor.gas_per_second.set(gas_used as f64 / execution_secs);
59 self.executor.gas_used_histogram.record(gas_used as f64);
60 self.executor.execution_histogram.record(execution_secs);
61 self.executor.execution_duration.set(execution_secs);
62
63 let accounts = output.state.state.len();
65 let storage_slots =
66 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
67 let bytecodes = output.state.contracts.len();
68
69 self.executor.accounts_updated_histogram.record(accounts as f64);
70 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
71 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
72 }
73
74 pub const fn executor_metrics(&self) -> &ExecutorMetrics {
76 &self.executor
77 }
78
79 pub fn record_pre_execution(&self, elapsed: Duration) {
81 self.executor.pre_execution_histogram.record(elapsed);
82 }
83
84 pub fn record_post_execution(&self, elapsed: Duration) {
86 self.executor.post_execution_histogram.record(elapsed);
87 }
88
89 pub fn record_block_execution_gas_bucket(&self, gas_used: u64, elapsed: Duration) {
91 let idx = GasBucketMetrics::bucket_index(gas_used);
92 self.execution_gas_buckets.buckets[idx]
93 .execution_gas_bucket_histogram
94 .record(elapsed.as_secs_f64());
95 }
96
97 pub fn record_state_root_gas_bucket(&self, gas_used: u64, elapsed_secs: f64) {
99 let idx = GasBucketMetrics::bucket_index(gas_used);
100 self.block_validation_gas_buckets.buckets[idx]
101 .state_root_gas_bucket_histogram
102 .record(elapsed_secs);
103 }
104
105 pub fn record_transaction_wait(&self, elapsed: Duration) {
107 self.executor.transaction_wait_histogram.record(elapsed);
108 }
109
110 pub fn record_transaction_execution(&self, elapsed: Duration) {
112 self.executor.transaction_execution_histogram.record(elapsed);
113 }
114}
115
116#[derive(Metrics)]
118#[metrics(scope = "blockchain_tree")]
119pub struct TreeMetrics {
120 pub canonical_chain_height: Gauge,
122 #[metric(skip)]
124 pub reorgs: ReorgMetrics,
125 pub latest_reorg_depth: Gauge,
127 pub safe_block_height: Gauge,
129 pub finalized_block_height: Gauge,
131}
132
133#[derive(Debug)]
135pub struct ReorgMetrics {
136 pub head: Counter,
138 pub safe: Counter,
140 pub finalized: Counter,
142}
143
144impl Default for ReorgMetrics {
145 fn default() -> Self {
146 Self {
147 head: metrics::counter!("blockchain_tree_reorgs", "commitment" => "head"),
148 safe: metrics::counter!("blockchain_tree_reorgs", "commitment" => "safe"),
149 finalized: metrics::counter!("blockchain_tree_reorgs", "commitment" => "finalized"),
150 }
151 }
152}
153
154#[derive(Metrics)]
156#[metrics(scope = "consensus.engine.beacon")]
157pub struct EngineMetrics {
158 #[metric(skip)]
160 pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
161 #[metric(skip)]
163 pub(crate) new_payload: NewPayloadStatusMetrics,
164 pub(crate) executed_blocks: Gauge,
166 pub(crate) inserted_already_executed_blocks: Counter,
168 pub(crate) pipeline_runs: Counter,
170 pub(crate) executed_new_block_cache_miss: Counter,
172 pub(crate) persistence_duration: Histogram,
174 pub(crate) backpressure_active: Gauge,
176 pub(crate) backpressure_stall_duration: Histogram,
178 pub(crate) failed_new_payload_response_deliveries: Counter,
184 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
186 pub(crate) block_insert_total_duration: Histogram,
188}
189
190#[derive(Metrics)]
192#[metrics(scope = "consensus.engine.beacon")]
193pub(crate) struct ForkchoiceUpdatedMetrics {
194 #[metric(skip)]
196 pub(crate) latest_finish_at: Option<Instant>,
197 #[metric(skip)]
199 pub(crate) latest_start_at: Option<Instant>,
200 pub(crate) forkchoice_updated_messages: Counter,
202 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
204 pub(crate) forkchoice_updated_valid: Counter,
207 pub(crate) forkchoice_updated_invalid: Counter,
210 pub(crate) forkchoice_updated_syncing: Counter,
213 pub(crate) forkchoice_updated_error: Counter,
216 pub(crate) forkchoice_updated_latency: Histogram,
218 pub(crate) forkchoice_updated_last: Gauge,
220 pub(crate) new_payload_forkchoice_updated_time_diff: Histogram,
222 pub(crate) time_between_forkchoice_updated: Histogram,
225 pub(crate) forkchoice_updated_interval: Histogram,
228}
229
230impl ForkchoiceUpdatedMetrics {
231 pub(crate) fn update_response_metrics(
233 &mut self,
234 start: Instant,
235 latest_new_payload_at: &mut Option<Instant>,
236 has_attrs: bool,
237 result: &Result<TreeOutcome<OnForkChoiceUpdated>, ProviderError>,
238 ) {
239 let finish = Instant::now();
240 let elapsed = finish - start;
241
242 if let Some(prev_finish) = self.latest_finish_at {
243 self.time_between_forkchoice_updated.record(start - prev_finish);
244 }
245 if let Some(prev_start) = self.latest_start_at {
246 self.forkchoice_updated_interval.record(start - prev_start);
247 }
248 self.latest_finish_at = Some(finish);
249 self.latest_start_at = Some(start);
250
251 match result {
252 Ok(outcome) => match outcome.outcome.forkchoice_status() {
253 ForkchoiceStatus::Valid => self.forkchoice_updated_valid.increment(1),
254 ForkchoiceStatus::Invalid => self.forkchoice_updated_invalid.increment(1),
255 ForkchoiceStatus::Syncing => self.forkchoice_updated_syncing.increment(1),
256 },
257 Err(_) => self.forkchoice_updated_error.increment(1),
258 }
259 self.forkchoice_updated_messages.increment(1);
260 if has_attrs {
261 self.forkchoice_with_attributes_updated_messages.increment(1);
262 }
263 self.forkchoice_updated_latency.record(elapsed);
264 self.forkchoice_updated_last.set(elapsed);
265 if let Some(latest_new_payload_at) = latest_new_payload_at.take() {
266 self.new_payload_forkchoice_updated_time_diff.record(start - latest_new_payload_at);
267 }
268 }
269}
270
271#[derive(Clone, Metrics)]
273#[metrics(scope = "consensus.engine.beacon")]
274pub(crate) struct NewPayloadGasBucketMetrics {
275 pub(crate) new_payload_gas_bucket_latency: Histogram,
277 pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
279}
280
281#[derive(Debug)]
283pub(crate) struct GasBucketMetrics {
284 buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
285}
286
287impl Default for GasBucketMetrics {
288 fn default() -> Self {
289 Self {
290 buckets: std::array::from_fn(|i| {
291 let label = Self::bucket_label(i);
292 NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
293 }),
294 }
295 }
296}
297
298impl GasBucketMetrics {
299 fn record(&self, gas_used: u64, elapsed: Duration) {
300 let idx = Self::bucket_index(gas_used);
301 self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
302 self.buckets[idx]
303 .new_payload_gas_bucket_gas_per_second
304 .record(gas_used as f64 / elapsed.as_secs_f64());
305 }
306
307 pub(crate) fn bucket_index(gas_used: u64) -> usize {
309 GAS_BUCKET_THRESHOLDS
310 .iter()
311 .position(|&threshold| gas_used < threshold)
312 .unwrap_or(GAS_BUCKET_THRESHOLDS.len())
313 }
314
315 pub(crate) fn bucket_label(index: usize) -> String {
317 if index == 0 {
318 let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
319 format!("<{hi}M")
320 } else if index < GAS_BUCKET_THRESHOLDS.len() {
321 let lo = GAS_BUCKET_THRESHOLDS[index - 1] / MEGAGAS;
322 let hi = GAS_BUCKET_THRESHOLDS[index] / MEGAGAS;
323 format!("{lo}-{hi}M")
324 } else {
325 let lo = GAS_BUCKET_THRESHOLDS[GAS_BUCKET_THRESHOLDS.len() - 1] / MEGAGAS;
326 format!(">{lo}M")
327 }
328 }
329}
330
331#[derive(Clone, Metrics)]
333#[metrics(scope = "sync.execution")]
334pub(crate) struct ExecutionGasBucketSeries {
335 pub(crate) execution_gas_bucket_histogram: Histogram,
337}
338
339#[derive(Debug)]
341pub(crate) struct ExecutionGasBucketMetrics {
342 buckets: [ExecutionGasBucketSeries; NUM_GAS_BUCKETS],
343}
344
345impl Default for ExecutionGasBucketMetrics {
346 fn default() -> Self {
347 Self {
348 buckets: std::array::from_fn(|i| {
349 let label = GasBucketMetrics::bucket_label(i);
350 ExecutionGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
351 }),
352 }
353 }
354}
355
356#[derive(Clone, Metrics)]
358#[metrics(scope = "sync.block_validation")]
359pub(crate) struct BlockValidationGasBucketSeries {
360 pub(crate) state_root_gas_bucket_histogram: Histogram,
362}
363
364#[derive(Debug)]
366pub(crate) struct BlockValidationGasBucketMetrics {
367 buckets: [BlockValidationGasBucketSeries; NUM_GAS_BUCKETS],
368}
369
370impl Default for BlockValidationGasBucketMetrics {
371 fn default() -> Self {
372 Self {
373 buckets: std::array::from_fn(|i| {
374 let label = GasBucketMetrics::bucket_label(i);
375 BlockValidationGasBucketSeries::new_with_labels(&[("gas_bucket", label)])
376 }),
377 }
378 }
379}
380
381#[derive(Metrics)]
383#[metrics(scope = "consensus.engine.beacon")]
384pub(crate) struct NewPayloadStatusMetrics {
385 #[metric(skip)]
387 pub(crate) latest_finish_at: Option<Instant>,
388 #[metric(skip)]
390 pub(crate) latest_start_at: Option<Instant>,
391 #[metric(skip)]
393 pub(crate) gas_bucket: GasBucketMetrics,
394 pub(crate) new_payload_messages: Counter,
396 pub(crate) new_payload_valid: Counter,
399 pub(crate) new_payload_invalid: Counter,
402 pub(crate) new_payload_syncing: Counter,
405 pub(crate) new_payload_accepted: Counter,
408 pub(crate) new_payload_error: Counter,
411 pub(crate) new_payload_total_gas: Histogram,
413 pub(crate) new_payload_total_gas_last: Gauge,
415 pub(crate) new_payload_gas_per_second: Histogram,
417 pub(crate) new_payload_gas_per_second_last: Gauge,
419 pub(crate) new_payload_latency: Histogram,
421 pub(crate) new_payload_last: Gauge,
423 pub(crate) time_between_new_payloads: Histogram,
425 pub(crate) new_payload_interval: Histogram,
427 pub(crate) forkchoice_updated_new_payload_time_diff: Histogram,
429}
430
431impl NewPayloadStatusMetrics {
432 pub(crate) fn update_response_metrics(
434 &mut self,
435 start: Instant,
436 latest_forkchoice_updated_at: &mut Option<Instant>,
437 result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
438 gas_used: u64,
439 ) {
440 let finish = Instant::now();
441 let elapsed = finish - start;
442
443 if let Some(prev_finish) = self.latest_finish_at {
444 self.time_between_new_payloads.record(start - prev_finish);
445 }
446 if let Some(prev_start) = self.latest_start_at {
447 self.new_payload_interval.record(start - prev_start);
448 }
449 self.latest_finish_at = Some(finish);
450 self.latest_start_at = Some(start);
451 match result {
452 Ok(outcome) => match outcome.outcome.status {
453 PayloadStatusEnum::Valid => {
454 self.new_payload_valid.increment(1);
455 if !outcome.already_seen {
456 self.new_payload_total_gas.record(gas_used as f64);
457 self.new_payload_total_gas_last.set(gas_used as f64);
458 let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
459 self.new_payload_gas_per_second.record(gas_per_second);
460 self.new_payload_gas_per_second_last.set(gas_per_second);
461
462 self.new_payload_latency.record(elapsed);
463 self.new_payload_last.set(elapsed);
464 self.gas_bucket.record(gas_used, elapsed);
465 }
466 }
467 PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
468 PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
469 PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
470 },
471 Err(_) => self.new_payload_error.increment(1),
472 }
473 self.new_payload_messages.increment(1);
474 if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
475 self.forkchoice_updated_new_payload_time_diff
476 .record(start - latest_forkchoice_updated_at);
477 }
478 }
479}
480
481#[allow(dead_code)]
485#[derive(Metrics, Clone)]
486#[metrics(scope = "execution.block_access_list")]
487pub(crate) struct BalMetrics {
488 pub(crate) size_bytes: Gauge,
490 pub(crate) valid_total: Counter,
492 pub(crate) invalid_total: Counter,
494 pub(crate) validation_time_seconds: Histogram,
496 pub(crate) account_changes: Gauge,
498 pub(crate) storage_changes: Gauge,
500 pub(crate) balance_changes: Gauge,
502 pub(crate) nonce_changes: Gauge,
504 pub(crate) code_changes: Gauge,
506}
507
508#[derive(Metrics, Clone)]
510#[metrics(scope = "sync.block_validation")]
511pub struct BlockValidationMetrics {
512 pub state_root_storage_tries_updated_total: Counter,
514 pub state_root_parallel_fallback_total: Counter,
516 pub state_root_task_fallback_success_total: Counter,
518 pub state_root_task_timeout_total: Counter,
520 pub state_root_duration: Gauge,
522 pub state_root_histogram: Histogram,
524 pub deferred_trie_compute_duration: Histogram,
526 pub payload_validation_duration: Gauge,
528 pub payload_validation_histogram: Histogram,
530 pub spawn_payload_processor: Histogram,
532 pub post_execution_validation_duration: Histogram,
534 pub total_duration: Histogram,
536 pub hashed_post_state_size: Histogram,
538 pub trie_updates_sorted_size: Histogram,
540 pub anchored_overlay_trie_updates_size: Histogram,
542 pub anchored_overlay_hashed_state_size: Histogram,
544}
545
546impl BlockValidationMetrics {
547 pub fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
549 self.state_root_storage_tries_updated_total
550 .increment(trie_output.storage_tries_ref().len() as u64);
551 self.state_root_duration.set(elapsed_as_secs);
552 self.state_root_histogram.record(elapsed_as_secs);
553 }
554
555 pub fn record_payload_validation(&self, elapsed_as_secs: f64) {
558 self.payload_validation_duration.set(elapsed_as_secs);
559 self.payload_validation_histogram.record(elapsed_as_secs);
560 }
561}
562
563#[derive(Metrics)]
565#[metrics(scope = "blockchain_tree.block_buffer")]
566pub(crate) struct BlockBufferMetrics {
567 pub blocks: Gauge,
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use alloy_eips::eip7685::Requests;
575 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
576 use reth_ethereum_primitives::Receipt;
577 use reth_execution_types::BlockExecutionResult;
578 use reth_revm::db::BundleState;
579
580 fn setup_test_recorder() -> Snapshotter {
581 let recorder = DebuggingRecorder::new();
582 let snapshotter = recorder.snapshotter();
583 recorder.install().unwrap();
584 snapshotter
585 }
586
587 #[test]
588 fn test_record_block_execution_metrics() {
589 let snapshotter = setup_test_recorder();
590 let metrics = EngineApiMetrics::default();
591
592 metrics.executor.gas_processed_total.increment(0);
594 metrics.executor.gas_per_second.set(0.0);
595 metrics.executor.gas_used_histogram.record(0.0);
596
597 let output = BlockExecutionOutput::<Receipt> {
598 state: BundleState::default(),
599 result: BlockExecutionResult {
600 receipts: vec![],
601 requests: Requests::default(),
602 gas_used: 21000,
603 blob_gas_used: 0,
604 },
605 };
606
607 metrics.record_block_execution(&output, Duration::from_millis(100));
608
609 let snapshot = snapshotter.snapshot().into_vec();
610
611 let mut found_metrics = false;
613 for (key, _unit, _desc, _value) in snapshot {
614 let metric_name = key.key().name();
615 if metric_name.starts_with("sync.execution") {
616 found_metrics = true;
617 break;
618 }
619 }
620
621 assert!(found_metrics, "Expected to find sync.execution metrics");
622 }
623}