1use crate::tree::{error::InsertBlockFatalError, MeteredStateHook, TreeOutcome};
2use alloy_consensus::transaction::TxHashRef;
3use alloy_evm::{
4 block::{BlockExecutor, ExecutableTx},
5 Evm,
6};
7use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
8use core::borrow::BorrowMut;
9use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
10use reth_errors::{BlockExecutionError, ProviderError};
11use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
12use reth_execution_types::BlockExecutionOutput;
13use reth_metrics::{
14 metrics::{Counter, Gauge, Histogram},
15 Metrics,
16};
17use reth_primitives_traits::SignedTransaction;
18use reth_trie::updates::TrieUpdates;
19use revm::database::{states::bundle_state::BundleRetention, State};
20use revm_primitives::Address;
21use std::time::Instant;
22use tracing::{debug_span, trace};
23
24#[derive(Debug, Default)]
26pub(crate) struct EngineApiMetrics {
27 pub(crate) engine: EngineMetrics,
29 pub(crate) executor: ExecutorMetrics,
31 pub(crate) block_validation: BlockValidationMetrics,
33 pub tree: TreeMetrics,
35}
36
37impl EngineApiMetrics {
38 fn metered<F, R>(&self, f: F) -> R
40 where
41 F: FnOnce() -> (u64, R),
42 {
43 let execute_start = Instant::now();
45 let (gas_used, output) = f();
46 let execution_duration = execute_start.elapsed().as_secs_f64();
47
48 self.executor.gas_processed_total.increment(gas_used);
50 self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
51 self.executor.gas_used_histogram.record(gas_used as f64);
52 self.executor.execution_histogram.record(execution_duration);
53 self.executor.execution_duration.set(execution_duration);
54
55 output
56 }
57
58 pub(crate) fn execute_metered<E, DB>(
64 &self,
65 executor: E,
66 mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
67 transaction_count: usize,
68 state_hook: Box<dyn OnStateHook>,
69 ) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
70 where
71 DB: alloy_evm::Database,
72 E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
73 {
74 let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
78
79 let mut senders = Vec::with_capacity(transaction_count);
80 let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
81
82 let f = || {
83 let start = Instant::now();
84 debug_span!(target: "engine::tree", "pre execution")
85 .entered()
86 .in_scope(|| executor.apply_pre_execution_changes())?;
87 self.executor.pre_execution_histogram.record(start.elapsed());
88
89 let exec_span = debug_span!(target: "engine::tree", "execution").entered();
90 loop {
91 let start = Instant::now();
92 let Some(tx) = transactions.next() else { break };
93 self.executor.transaction_wait_histogram.record(start.elapsed());
94
95 let tx = tx?;
96 senders.push(*tx.signer());
97
98 let span =
99 debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
100 let enter = span.entered();
101 trace!(target: "engine::tree", "Executing transaction");
102 let start = Instant::now();
103 let gas_used = executor.execute_transaction(tx)?;
104 self.executor.transaction_execution_histogram.record(start.elapsed());
105
106 enter.record("gas_used", gas_used);
108 }
109 drop(exec_span);
110
111 let start = Instant::now();
112 let result = debug_span!(target: "engine::tree", "finish")
113 .entered()
114 .in_scope(|| executor.finish())
115 .map(|(evm, result)| (evm.into_db(), result));
116 self.executor.post_execution_histogram.record(start.elapsed());
117
118 result
119 };
120
121 let (mut db, result) = self.metered(|| {
123 let res = f();
124 let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
125 (gas_used, res)
126 })?;
127
128 debug_span!(target: "engine::tree", "merge transitions")
130 .entered()
131 .in_scope(|| db.borrow_mut().merge_transitions(BundleRetention::Reverts));
132 let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
133
134 let accounts = output.state.state.len();
136 let storage_slots =
137 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
138 let bytecodes = output.state.contracts.len();
139
140 self.executor.accounts_updated_histogram.record(accounts as f64);
141 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
142 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
143
144 Ok((output, senders))
145 }
146}
147
148#[derive(Metrics)]
150#[metrics(scope = "blockchain_tree")]
151pub(crate) struct TreeMetrics {
152 pub canonical_chain_height: Gauge,
154 pub reorgs: Counter,
156 pub latest_reorg_depth: Gauge,
158 pub safe_block_height: Gauge,
160 pub finalized_block_height: Gauge,
162}
163
164#[derive(Metrics)]
166#[metrics(scope = "consensus.engine.beacon")]
167pub(crate) struct EngineMetrics {
168 #[metric(skip)]
170 pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
171 #[metric(skip)]
173 pub(crate) new_payload: NewPayloadStatusMetrics,
174 pub(crate) executed_blocks: Gauge,
176 pub(crate) inserted_already_executed_blocks: Counter,
178 pub(crate) pipeline_runs: Counter,
180 pub(crate) executed_new_block_cache_miss: Counter,
182 pub(crate) persistence_duration: Histogram,
184 pub(crate) failed_new_payload_response_deliveries: Counter,
190 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
192 pub(crate) block_insert_total_duration: Histogram,
194}
195
196#[derive(Metrics)]
198#[metrics(scope = "consensus.engine.beacon")]
199pub(crate) struct ForkchoiceUpdatedMetrics {
200 pub(crate) forkchoice_updated_messages: Counter,
202 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
204 pub(crate) forkchoice_updated_valid: Counter,
207 pub(crate) forkchoice_updated_invalid: Counter,
210 pub(crate) forkchoice_updated_syncing: Counter,
213 pub(crate) forkchoice_updated_error: Counter,
216 pub(crate) forkchoice_updated_latency: Histogram,
218 pub(crate) forkchoice_updated_last: Gauge,
220 pub(crate) new_payload_forkchoice_updated_time_diff: Histogram,
222}
223
224impl ForkchoiceUpdatedMetrics {
225 pub(crate) fn update_response_metrics(
227 &self,
228 start: Instant,
229 latest_new_payload_at: &mut Option<Instant>,
230 has_attrs: bool,
231 result: &Result<TreeOutcome<OnForkChoiceUpdated>, ProviderError>,
232 ) {
233 let elapsed = start.elapsed();
234 match result {
235 Ok(outcome) => match outcome.outcome.forkchoice_status() {
236 ForkchoiceStatus::Valid => self.forkchoice_updated_valid.increment(1),
237 ForkchoiceStatus::Invalid => self.forkchoice_updated_invalid.increment(1),
238 ForkchoiceStatus::Syncing => self.forkchoice_updated_syncing.increment(1),
239 },
240 Err(_) => self.forkchoice_updated_error.increment(1),
241 }
242 self.forkchoice_updated_messages.increment(1);
243 if has_attrs {
244 self.forkchoice_with_attributes_updated_messages.increment(1);
245 }
246 self.forkchoice_updated_latency.record(elapsed);
247 self.forkchoice_updated_last.set(elapsed);
248 if let Some(latest_new_payload_at) = latest_new_payload_at.take() {
249 self.new_payload_forkchoice_updated_time_diff.record(start - latest_new_payload_at);
250 }
251 }
252}
253
254#[derive(Metrics)]
256#[metrics(scope = "consensus.engine.beacon")]
257pub(crate) struct NewPayloadStatusMetrics {
258 #[metric(skip)]
260 pub(crate) latest_at: Option<Instant>,
261 pub(crate) new_payload_messages: Counter,
263 pub(crate) new_payload_valid: Counter,
266 pub(crate) new_payload_invalid: Counter,
269 pub(crate) new_payload_syncing: Counter,
272 pub(crate) new_payload_accepted: Counter,
275 pub(crate) new_payload_error: Counter,
278 pub(crate) new_payload_total_gas: Histogram,
280 pub(crate) new_payload_gas_per_second: Histogram,
282 pub(crate) new_payload_gas_per_second_last: Gauge,
284 pub(crate) new_payload_latency: Histogram,
286 pub(crate) new_payload_last: Gauge,
288}
289
290impl NewPayloadStatusMetrics {
291 pub(crate) fn update_response_metrics(
293 &mut self,
294 start: Instant,
295 result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
296 gas_used: u64,
297 ) {
298 let finish = Instant::now();
299 let elapsed = finish - start;
300
301 self.latest_at = Some(finish);
302 match result {
303 Ok(outcome) => match outcome.outcome.status {
304 PayloadStatusEnum::Valid => {
305 self.new_payload_valid.increment(1);
306 self.new_payload_total_gas.record(gas_used as f64);
307 let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
308 self.new_payload_gas_per_second.record(gas_per_second);
309 self.new_payload_gas_per_second_last.set(gas_per_second);
310 }
311 PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
312 PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
313 PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
314 },
315 Err(_) => self.new_payload_error.increment(1),
316 }
317 self.new_payload_messages.increment(1);
318 self.new_payload_latency.record(elapsed);
319 self.new_payload_last.set(elapsed);
320 }
321}
322
323#[derive(Metrics)]
325#[metrics(scope = "sync.block_validation")]
326pub(crate) struct BlockValidationMetrics {
327 pub(crate) state_root_storage_tries_updated_total: Counter,
329 pub(crate) state_root_parallel_fallback_total: Counter,
331 pub(crate) state_root_duration: Gauge,
333 pub(crate) state_root_histogram: Histogram,
335 pub(crate) deferred_trie_compute_duration: Histogram,
337 pub(crate) deferred_trie_wait_duration: Histogram,
339 pub(crate) trie_input_duration: Histogram,
341 pub(crate) payload_validation_duration: Gauge,
343 pub(crate) payload_validation_histogram: Histogram,
345 pub(crate) spawn_payload_processor: Histogram,
347 pub(crate) post_execution_validation_duration: Histogram,
349 pub(crate) total_duration: Histogram,
351}
352
353impl BlockValidationMetrics {
354 pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
356 self.state_root_storage_tries_updated_total
357 .increment(trie_output.storage_tries_ref().len() as u64);
358 self.state_root_duration.set(elapsed_as_secs);
359 self.state_root_histogram.record(elapsed_as_secs);
360 }
361
362 pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
365 self.payload_validation_duration.set(elapsed_as_secs);
366 self.payload_validation_histogram.record(elapsed_as_secs);
367 }
368}
369
370#[derive(Metrics)]
372#[metrics(scope = "blockchain_tree.block_buffer")]
373pub(crate) struct BlockBufferMetrics {
374 pub blocks: Gauge,
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use alloy_eips::eip7685::Requests;
382 use alloy_evm::block::StateChangeSource;
383 use alloy_primitives::{B256, U256};
384 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
385 use reth_ethereum_primitives::{Receipt, TransactionSigned};
386 use reth_evm_ethereum::EthEvm;
387 use reth_execution_types::BlockExecutionResult;
388 use reth_primitives_traits::RecoveredBlock;
389 use revm::{
390 context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
391 database::State,
392 database_interface::EmptyDB,
393 inspector::NoOpInspector,
394 state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
395 Context, MainBuilder, MainContext,
396 };
397 use revm_primitives::Bytes;
398 use std::sync::mpsc;
399
400 struct MockExecutor {
402 state: EvmState,
403 hook: Option<Box<dyn OnStateHook>>,
404 }
405
406 impl MockExecutor {
407 fn new(state: EvmState) -> Self {
408 Self { state, hook: None }
409 }
410 }
411
412 type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
414
415 impl BlockExecutor for MockExecutor {
416 type Transaction = TransactionSigned;
417 type Receipt = Receipt;
418 type Evm = MockEvm;
419
420 fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
421 Ok(())
422 }
423
424 fn execute_transaction_without_commit(
425 &mut self,
426 _tx: impl ExecutableTx<Self>,
427 ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
428 if let Some(hook) = self.hook.as_mut() {
430 hook.on_state(StateChangeSource::Transaction(0), &self.state);
431 }
432
433 Ok(ResultAndState::new(
434 ExecutionResult::Success {
435 reason: SuccessReason::Return,
436 gas_used: 1000, gas_refunded: 0,
438 logs: vec![],
439 output: Output::Call(Bytes::from(vec![])),
440 },
441 Default::default(),
442 ))
443 }
444
445 fn commit_transaction(
446 &mut self,
447 _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
448 _tx: impl ExecutableTx<Self>,
449 ) -> Result<u64, BlockExecutionError> {
450 Ok(1000)
451 }
452
453 fn finish(
454 self,
455 ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
456 let Self { hook, state, .. } = self;
457
458 if let Some(mut hook) = hook {
460 hook.on_state(StateChangeSource::Transaction(0), &state);
461 }
462
463 let db = State::builder()
465 .with_database(EmptyDB::default())
466 .with_bundle_update()
467 .without_state_clear()
468 .build();
469 let evm = EthEvm::new(
470 Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
471 false,
472 );
473
474 Ok((
476 evm,
477 BlockExecutionResult {
478 receipts: vec![],
479 requests: Requests::default(),
480 gas_used: 1000,
481 blob_gas_used: 0,
482 },
483 ))
484 }
485
486 fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
487 self.hook = hook;
488 }
489
490 fn evm(&self) -> &Self::Evm {
491 panic!("Mock executor evm() not implemented")
492 }
493
494 fn evm_mut(&mut self) -> &mut Self::Evm {
495 panic!("Mock executor evm_mut() not implemented")
496 }
497 }
498
499 struct ChannelStateHook {
500 output: i32,
501 sender: mpsc::Sender<i32>,
502 }
503
504 impl OnStateHook for ChannelStateHook {
505 fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
506 let _ = self.sender.send(self.output);
507 }
508 }
509
510 fn setup_test_recorder() -> Snapshotter {
511 let recorder = DebuggingRecorder::new();
512 let snapshotter = recorder.snapshotter();
513 recorder.install().unwrap();
514 snapshotter
515 }
516
517 #[test]
518 fn test_executor_metrics_hook_called() {
519 let metrics = EngineApiMetrics::default();
520 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
521
522 let (tx, rx) = mpsc::channel();
523 let expected_output = 42;
524 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
525
526 let state = EvmState::default();
527 let executor = MockExecutor::new(state);
528
529 let _result = metrics.execute_metered::<_, EmptyDB>(
531 executor,
532 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
533 input.transaction_count(),
534 state_hook,
535 );
536
537 match rx.try_recv() {
539 Ok(actual_output) => assert_eq!(actual_output, expected_output),
540 Err(_) => {
541 }
544 }
545 }
546
547 #[test]
548 fn test_executor_metrics_hook_metrics_recorded() {
549 let snapshotter = setup_test_recorder();
550 let metrics = EngineApiMetrics::default();
551
552 metrics.executor.gas_processed_total.increment(0);
554 metrics.executor.gas_per_second.set(0.0);
555 metrics.executor.gas_used_histogram.record(0.0);
556
557 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
558
559 let (tx, _rx) = mpsc::channel();
560 let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
561
562 let state = {
564 let mut state = EvmState::default();
565 let storage =
566 EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
567 state.insert(
568 Default::default(),
569 Account {
570 info: AccountInfo {
571 balance: U256::from(100),
572 nonce: 10,
573 code_hash: B256::random(),
574 code: Default::default(),
575 },
576 storage,
577 status: AccountStatus::default(),
578 transaction_id: 0,
579 },
580 );
581 state
582 };
583
584 let executor = MockExecutor::new(state);
585
586 let _result = metrics.execute_metered::<_, EmptyDB>(
588 executor,
589 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
590 input.transaction_count(),
591 state_hook,
592 );
593
594 let snapshot = snapshotter.snapshot().into_vec();
595
596 let mut found_metrics = false;
598 for (key, _unit, _desc, _value) in snapshot {
599 let metric_name = key.key().name();
600 if metric_name.starts_with("sync.execution") {
601 found_metrics = true;
602 break;
603 }
604 }
605
606 assert!(found_metrics, "Expected to find sync.execution metrics");
607 }
608}