1use crate::tree::{error::InsertBlockFatalError, TreeOutcome};
2use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
3use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
4use reth_errors::ProviderError;
5use reth_evm::metrics::ExecutorMetrics;
6use reth_execution_types::BlockExecutionOutput;
7use reth_metrics::{
8 metrics::{Counter, Gauge, Histogram},
9 Metrics,
10};
11use reth_primitives_traits::constants::gas_units::MEGAGAS;
12use reth_trie::updates::TrieUpdates;
13use std::time::{Duration, Instant};
14
15const GAS_BUCKET_THRESHOLDS: [u64; 5] =
18 [5 * MEGAGAS, 10 * MEGAGAS, 20 * MEGAGAS, 30 * MEGAGAS, 40 * MEGAGAS];
19
20const NUM_GAS_BUCKETS: usize = GAS_BUCKET_THRESHOLDS.len() + 1;
22
23#[derive(Debug, Default)]
25pub struct EngineApiMetrics {
26 pub engine: EngineMetrics,
28 pub executor: ExecutorMetrics,
30 pub block_validation: BlockValidationMetrics,
32 pub tree: TreeMetrics,
34 #[allow(dead_code)]
36 pub(crate) bal: BalMetrics,
37}
38
39impl EngineApiMetrics {
40 pub fn record_block_execution<R>(
45 &self,
46 output: &BlockExecutionOutput<R>,
47 execution_duration: Duration,
48 ) {
49 let execution_secs = execution_duration.as_secs_f64();
50 let gas_used = output.result.gas_used;
51
52 self.executor.gas_processed_total.increment(gas_used);
54 self.executor.gas_per_second.set(gas_used as f64 / execution_secs);
55 self.executor.gas_used_histogram.record(gas_used as f64);
56 self.executor.execution_histogram.record(execution_secs);
57 self.executor.execution_duration.set(execution_secs);
58
59 let accounts = output.state.state.len();
61 let storage_slots =
62 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
63 let bytecodes = output.state.contracts.len();
64
65 self.executor.accounts_updated_histogram.record(accounts as f64);
66 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
67 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
68 }
69
70 pub const fn executor_metrics(&self) -> &ExecutorMetrics {
72 &self.executor
73 }
74
75 pub fn record_pre_execution(&self, elapsed: Duration) {
77 self.executor.pre_execution_histogram.record(elapsed);
78 }
79
80 pub fn record_post_execution(&self, elapsed: Duration) {
82 self.executor.post_execution_histogram.record(elapsed);
83 }
84
85 pub fn record_transaction_wait(&self, elapsed: Duration) {
87 self.executor.transaction_wait_histogram.record(elapsed);
88 }
89
90 pub fn record_transaction_execution(&self, elapsed: Duration) {
92 self.executor.transaction_execution_histogram.record(elapsed);
93 }
94}
95
96#[derive(Metrics)]
98#[metrics(scope = "blockchain_tree")]
99pub struct TreeMetrics {
100 pub canonical_chain_height: Gauge,
102 #[metric(skip)]
104 pub reorgs: ReorgMetrics,
105 pub latest_reorg_depth: Gauge,
107 pub safe_block_height: Gauge,
109 pub finalized_block_height: Gauge,
111}
112
113#[derive(Debug)]
115pub struct ReorgMetrics {
116 pub head: Counter,
118 pub safe: Counter,
120 pub finalized: Counter,
122}
123
124impl Default for ReorgMetrics {
125 fn default() -> Self {
126 Self {
127 head: metrics::counter!("blockchain_tree_reorgs", "commitment" => "head"),
128 safe: metrics::counter!("blockchain_tree_reorgs", "commitment" => "safe"),
129 finalized: metrics::counter!("blockchain_tree_reorgs", "commitment" => "finalized"),
130 }
131 }
132}
133
134#[derive(Metrics)]
136#[metrics(scope = "consensus.engine.beacon")]
137pub struct EngineMetrics {
138 #[metric(skip)]
140 pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
141 #[metric(skip)]
143 pub(crate) new_payload: NewPayloadStatusMetrics,
144 pub(crate) executed_blocks: Gauge,
146 pub(crate) inserted_already_executed_blocks: Counter,
148 pub(crate) pipeline_runs: Counter,
150 pub(crate) executed_new_block_cache_miss: Counter,
152 pub(crate) persistence_duration: Histogram,
154 pub(crate) failed_new_payload_response_deliveries: Counter,
160 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
162 pub(crate) block_insert_total_duration: Histogram,
164}
165
166#[derive(Metrics)]
168#[metrics(scope = "consensus.engine.beacon")]
169pub(crate) struct ForkchoiceUpdatedMetrics {
170 #[metric(skip)]
172 pub(crate) latest_finish_at: Option<Instant>,
173 #[metric(skip)]
175 pub(crate) latest_start_at: Option<Instant>,
176 pub(crate) forkchoice_updated_messages: Counter,
178 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
180 pub(crate) forkchoice_updated_valid: Counter,
183 pub(crate) forkchoice_updated_invalid: Counter,
186 pub(crate) forkchoice_updated_syncing: Counter,
189 pub(crate) forkchoice_updated_error: Counter,
192 pub(crate) forkchoice_updated_latency: Histogram,
194 pub(crate) forkchoice_updated_last: Gauge,
196 pub(crate) new_payload_forkchoice_updated_time_diff: Histogram,
198 pub(crate) time_between_forkchoice_updated: Histogram,
201 pub(crate) forkchoice_updated_interval: Histogram,
204}
205
206impl ForkchoiceUpdatedMetrics {
207 pub(crate) fn update_response_metrics(
209 &mut self,
210 start: Instant,
211 latest_new_payload_at: &mut Option<Instant>,
212 has_attrs: bool,
213 result: &Result<TreeOutcome<OnForkChoiceUpdated>, ProviderError>,
214 ) {
215 let finish = Instant::now();
216 let elapsed = finish - start;
217
218 if let Some(prev_finish) = self.latest_finish_at {
219 self.time_between_forkchoice_updated.record(start - prev_finish);
220 }
221 if let Some(prev_start) = self.latest_start_at {
222 self.forkchoice_updated_interval.record(start - prev_start);
223 }
224 self.latest_finish_at = Some(finish);
225 self.latest_start_at = Some(start);
226
227 match result {
228 Ok(outcome) => match outcome.outcome.forkchoice_status() {
229 ForkchoiceStatus::Valid => self.forkchoice_updated_valid.increment(1),
230 ForkchoiceStatus::Invalid => self.forkchoice_updated_invalid.increment(1),
231 ForkchoiceStatus::Syncing => self.forkchoice_updated_syncing.increment(1),
232 },
233 Err(_) => self.forkchoice_updated_error.increment(1),
234 }
235 self.forkchoice_updated_messages.increment(1);
236 if has_attrs {
237 self.forkchoice_with_attributes_updated_messages.increment(1);
238 }
239 self.forkchoice_updated_latency.record(elapsed);
240 self.forkchoice_updated_last.set(elapsed);
241 if let Some(latest_new_payload_at) = latest_new_payload_at.take() {
242 self.new_payload_forkchoice_updated_time_diff.record(start - latest_new_payload_at);
243 }
244 }
245}
246
247#[derive(Clone, Metrics)]
249#[metrics(scope = "consensus.engine.beacon")]
250pub(crate) struct NewPayloadGasBucketMetrics {
251 pub(crate) new_payload_gas_bucket_latency: Histogram,
253 pub(crate) new_payload_gas_bucket_gas_per_second: Histogram,
255}
256
257#[derive(Debug)]
259pub(crate) struct GasBucketMetrics {
260 buckets: [NewPayloadGasBucketMetrics; NUM_GAS_BUCKETS],
261}
262
263impl Default for GasBucketMetrics {
264 fn default() -> Self {
265 Self {
266 buckets: std::array::from_fn(|i| {
267 let label = Self::bucket_label(i);
268 NewPayloadGasBucketMetrics::new_with_labels(&[("gas_bucket", label)])
269 }),
270 }
271 }
272}
273
274impl GasBucketMetrics {
275 fn record(&self, gas_used: u64, elapsed: Duration) {
276 let idx = Self::bucket_index(gas_used);
277 self.buckets[idx].new_payload_gas_bucket_latency.record(elapsed);
278 self.buckets[idx]
279 .new_payload_gas_bucket_gas_per_second
280 .record(gas_used as f64 / elapsed.as_secs_f64());
281 }
282
283 fn bucket_index(gas_used: u64) -> usize {
284 GAS_BUCKET_THRESHOLDS
285 .iter()
286 .position(|&threshold| gas_used < threshold)
287 .unwrap_or(GAS_BUCKET_THRESHOLDS.len())
288 }
289
290 fn bucket_label(index: usize) -> String {
292 if index == 0 {
293 let hi = GAS_BUCKET_THRESHOLDS[0] / MEGAGAS;
294 format!("<{hi}M")
295 } else if index < GAS_BUCKET_THRESHOLDS.len() {
296 let lo = GAS_BUCKET_THRESHOLDS[index - 1] / MEGAGAS;
297 let hi = GAS_BUCKET_THRESHOLDS[index] / MEGAGAS;
298 format!("{lo}-{hi}M")
299 } else {
300 let lo = GAS_BUCKET_THRESHOLDS[GAS_BUCKET_THRESHOLDS.len() - 1] / MEGAGAS;
301 format!(">{lo}M")
302 }
303 }
304}
305
306#[derive(Metrics)]
308#[metrics(scope = "consensus.engine.beacon")]
309pub(crate) struct NewPayloadStatusMetrics {
310 #[metric(skip)]
312 pub(crate) latest_finish_at: Option<Instant>,
313 #[metric(skip)]
315 pub(crate) latest_start_at: Option<Instant>,
316 #[metric(skip)]
318 pub(crate) gas_bucket: GasBucketMetrics,
319 pub(crate) new_payload_messages: Counter,
321 pub(crate) new_payload_valid: Counter,
324 pub(crate) new_payload_invalid: Counter,
327 pub(crate) new_payload_syncing: Counter,
330 pub(crate) new_payload_accepted: Counter,
333 pub(crate) new_payload_error: Counter,
336 pub(crate) new_payload_total_gas: Histogram,
338 pub(crate) new_payload_total_gas_last: Gauge,
340 pub(crate) new_payload_gas_per_second: Histogram,
342 pub(crate) new_payload_gas_per_second_last: Gauge,
344 pub(crate) new_payload_latency: Histogram,
346 pub(crate) new_payload_last: Gauge,
348 pub(crate) time_between_new_payloads: Histogram,
350 pub(crate) new_payload_interval: Histogram,
352 pub(crate) forkchoice_updated_new_payload_time_diff: Histogram,
354}
355
356impl NewPayloadStatusMetrics {
357 pub(crate) fn update_response_metrics(
359 &mut self,
360 start: Instant,
361 latest_forkchoice_updated_at: &mut Option<Instant>,
362 result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
363 gas_used: u64,
364 ) {
365 let finish = Instant::now();
366 let elapsed = finish - start;
367
368 if let Some(prev_finish) = self.latest_finish_at {
369 self.time_between_new_payloads.record(start - prev_finish);
370 }
371 if let Some(prev_start) = self.latest_start_at {
372 self.new_payload_interval.record(start - prev_start);
373 }
374 self.latest_finish_at = Some(finish);
375 self.latest_start_at = Some(start);
376 match result {
377 Ok(outcome) => match outcome.outcome.status {
378 PayloadStatusEnum::Valid => {
379 self.new_payload_valid.increment(1);
380 self.new_payload_total_gas.record(gas_used as f64);
381 self.new_payload_total_gas_last.set(gas_used as f64);
382 let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
383 self.new_payload_gas_per_second.record(gas_per_second);
384 self.new_payload_gas_per_second_last.set(gas_per_second);
385 }
386 PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
387 PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
388 PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
389 },
390 Err(_) => self.new_payload_error.increment(1),
391 }
392 self.new_payload_messages.increment(1);
393 self.new_payload_latency.record(elapsed);
394 self.new_payload_last.set(elapsed);
395 self.gas_bucket.record(gas_used, elapsed);
396 if let Some(latest_forkchoice_updated_at) = latest_forkchoice_updated_at.take() {
397 self.forkchoice_updated_new_payload_time_diff
398 .record(start - latest_forkchoice_updated_at);
399 }
400 }
401}
402
403#[allow(dead_code)]
407#[derive(Metrics, Clone)]
408#[metrics(scope = "execution.block_access_list")]
409pub(crate) struct BalMetrics {
410 pub(crate) size_bytes: Gauge,
412 pub(crate) valid_total: Counter,
414 pub(crate) invalid_total: Counter,
416 pub(crate) validation_time_seconds: Histogram,
418 pub(crate) account_changes: Gauge,
420 pub(crate) storage_changes: Gauge,
422 pub(crate) balance_changes: Gauge,
424 pub(crate) nonce_changes: Gauge,
426 pub(crate) code_changes: Gauge,
428}
429
430#[derive(Metrics, Clone)]
432#[metrics(scope = "sync.block_validation")]
433pub struct BlockValidationMetrics {
434 pub state_root_storage_tries_updated_total: Counter,
436 pub state_root_parallel_fallback_total: Counter,
438 pub state_root_task_fallback_success_total: Counter,
440 pub state_root_task_timeout_total: Counter,
442 pub state_root_duration: Gauge,
444 pub state_root_histogram: Histogram,
446 pub deferred_trie_compute_duration: Histogram,
448 pub payload_validation_duration: Gauge,
450 pub payload_validation_histogram: Histogram,
452 pub spawn_payload_processor: Histogram,
454 pub post_execution_validation_duration: Histogram,
456 pub total_duration: Histogram,
458 pub hashed_post_state_size: Histogram,
460 pub trie_updates_sorted_size: Histogram,
462 pub anchored_overlay_trie_updates_size: Histogram,
464 pub anchored_overlay_hashed_state_size: Histogram,
466}
467
468impl BlockValidationMetrics {
469 pub fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
471 self.state_root_storage_tries_updated_total
472 .increment(trie_output.storage_tries_ref().len() as u64);
473 self.state_root_duration.set(elapsed_as_secs);
474 self.state_root_histogram.record(elapsed_as_secs);
475 }
476
477 pub fn record_payload_validation(&self, elapsed_as_secs: f64) {
480 self.payload_validation_duration.set(elapsed_as_secs);
481 self.payload_validation_histogram.record(elapsed_as_secs);
482 }
483}
484
485#[derive(Metrics)]
487#[metrics(scope = "blockchain_tree.block_buffer")]
488pub(crate) struct BlockBufferMetrics {
489 pub blocks: Gauge,
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use alloy_eips::eip7685::Requests;
497 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
498 use reth_ethereum_primitives::Receipt;
499 use reth_execution_types::BlockExecutionResult;
500 use reth_revm::db::BundleState;
501
502 fn setup_test_recorder() -> Snapshotter {
503 let recorder = DebuggingRecorder::new();
504 let snapshotter = recorder.snapshotter();
505 recorder.install().unwrap();
506 snapshotter
507 }
508
509 #[test]
510 fn test_record_block_execution_metrics() {
511 let snapshotter = setup_test_recorder();
512 let metrics = EngineApiMetrics::default();
513
514 metrics.executor.gas_processed_total.increment(0);
516 metrics.executor.gas_per_second.set(0.0);
517 metrics.executor.gas_used_histogram.record(0.0);
518
519 let output = BlockExecutionOutput::<Receipt> {
520 state: BundleState::default(),
521 result: BlockExecutionResult {
522 receipts: vec![],
523 requests: Requests::default(),
524 gas_used: 21000,
525 blob_gas_used: 0,
526 },
527 };
528
529 metrics.record_block_execution(&output, Duration::from_millis(100));
530
531 let snapshot = snapshotter.snapshot().into_vec();
532
533 let mut found_metrics = false;
535 for (key, _unit, _desc, _value) in snapshot {
536 let metric_name = key.key().name();
537 if metric_name.starts_with("sync.execution") {
538 found_metrics = true;
539 break;
540 }
541 }
542
543 assert!(found_metrics, "Expected to find sync.execution metrics");
544 }
545}