1use crate::tree::{error::InsertBlockFatalError, MeteredStateHook, TreeOutcome};
2use alloy_consensus::transaction::TxHashRef;
3use alloy_evm::{
4 block::{BlockExecutor, ExecutableTx},
5 Evm,
6};
7use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
8use core::borrow::BorrowMut;
9use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
10use reth_errors::{BlockExecutionError, ProviderError};
11use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
12use reth_execution_types::BlockExecutionOutput;
13use reth_metrics::{
14 metrics::{Counter, Gauge, Histogram},
15 Metrics,
16};
17use reth_primitives_traits::SignedTransaction;
18use reth_trie::updates::TrieUpdates;
19use revm::database::{states::bundle_state::BundleRetention, State};
20use revm_primitives::Address;
21use std::time::Instant;
22use tracing::{debug_span, trace};
23
24#[derive(Debug, Default)]
26pub(crate) struct EngineApiMetrics {
27 pub(crate) engine: EngineMetrics,
29 pub(crate) executor: ExecutorMetrics,
31 pub(crate) block_validation: BlockValidationMetrics,
33 pub tree: TreeMetrics,
35}
36
37impl EngineApiMetrics {
38 fn metered<F, R>(&self, f: F) -> R
40 where
41 F: FnOnce() -> (u64, R),
42 {
43 let execute_start = Instant::now();
45 let (gas_used, output) = f();
46 let execution_duration = execute_start.elapsed().as_secs_f64();
47
48 self.executor.gas_processed_total.increment(gas_used);
50 self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
51 self.executor.gas_used_histogram.record(gas_used as f64);
52 self.executor.execution_histogram.record(execution_duration);
53 self.executor.execution_duration.set(execution_duration);
54
55 output
56 }
57
58 pub(crate) fn execute_metered<E, DB>(
64 &self,
65 executor: E,
66 mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
67 state_hook: Box<dyn OnStateHook>,
68 ) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
69 where
70 DB: alloy_evm::Database,
71 E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
72 {
73 let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
77
78 let mut senders = Vec::new();
79 let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
80
81 let f = || {
82 let start = Instant::now();
83 debug_span!(target: "engine::tree", "pre execution")
84 .entered()
85 .in_scope(|| executor.apply_pre_execution_changes())?;
86 self.executor.pre_execution_histogram.record(start.elapsed());
87
88 let exec_span = debug_span!(target: "engine::tree", "execution").entered();
89 loop {
90 let start = Instant::now();
91 let Some(tx) = transactions.next() else { break };
92 self.executor.transaction_wait_histogram.record(start.elapsed());
93
94 let tx = tx?;
95 senders.push(*tx.signer());
96
97 let span =
98 debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
99 let enter = span.entered();
100 trace!(target: "engine::tree", "Executing transaction");
101 let start = Instant::now();
102 let gas_used = executor.execute_transaction(tx)?;
103 self.executor.transaction_execution_histogram.record(start.elapsed());
104
105 enter.record("gas_used", gas_used);
107 }
108 drop(exec_span);
109
110 let start = Instant::now();
111 let result = debug_span!(target: "engine::tree", "finish")
112 .entered()
113 .in_scope(|| executor.finish())
114 .map(|(evm, result)| (evm.into_db(), result));
115 self.executor.post_execution_histogram.record(start.elapsed());
116
117 result
118 };
119
120 let (mut db, result) = self.metered(|| {
122 let res = f();
123 let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
124 (gas_used, res)
125 })?;
126
127 debug_span!(target: "engine::tree", "merge transitions")
129 .entered()
130 .in_scope(|| db.borrow_mut().merge_transitions(BundleRetention::Reverts));
131 let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
132
133 let accounts = output.state.state.len();
135 let storage_slots =
136 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
137 let bytecodes = output.state.contracts.len();
138
139 self.executor.accounts_updated_histogram.record(accounts as f64);
140 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
141 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
142
143 Ok((output, senders))
144 }
145}
146
147#[derive(Metrics)]
149#[metrics(scope = "blockchain_tree")]
150pub(crate) struct TreeMetrics {
151 pub canonical_chain_height: Gauge,
153 pub reorgs: Counter,
155 pub latest_reorg_depth: Gauge,
157 pub safe_block_height: Gauge,
159 pub finalized_block_height: Gauge,
161}
162
163#[derive(Metrics)]
165#[metrics(scope = "consensus.engine.beacon")]
166pub(crate) struct EngineMetrics {
167 #[metric(skip)]
169 pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
170 #[metric(skip)]
172 pub(crate) new_payload: NewPayloadStatusMetrics,
173 pub(crate) executed_blocks: Gauge,
175 pub(crate) inserted_already_executed_blocks: Counter,
177 pub(crate) pipeline_runs: Counter,
179 pub(crate) executed_new_block_cache_miss: Counter,
181 pub(crate) persistence_duration: Histogram,
183 pub(crate) failed_new_payload_response_deliveries: Counter,
189 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
191 pub(crate) block_insert_total_duration: Histogram,
193}
194
195#[derive(Metrics)]
197#[metrics(scope = "consensus.engine.beacon")]
198pub(crate) struct ForkchoiceUpdatedMetrics {
199 pub(crate) forkchoice_updated_messages: Counter,
201 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
203 pub(crate) forkchoice_updated_valid: Counter,
206 pub(crate) forkchoice_updated_invalid: Counter,
209 pub(crate) forkchoice_updated_syncing: Counter,
212 pub(crate) forkchoice_updated_error: Counter,
215 pub(crate) forkchoice_updated_latency: Histogram,
217 pub(crate) forkchoice_updated_last: Gauge,
219 pub(crate) new_payload_forkchoice_updated_time_diff: Histogram,
221}
222
223impl ForkchoiceUpdatedMetrics {
224 pub(crate) fn update_response_metrics(
226 &self,
227 start: Instant,
228 latest_new_payload_at: &mut Option<Instant>,
229 has_attrs: bool,
230 result: &Result<TreeOutcome<OnForkChoiceUpdated>, ProviderError>,
231 ) {
232 let elapsed = start.elapsed();
233 match result {
234 Ok(outcome) => match outcome.outcome.forkchoice_status() {
235 ForkchoiceStatus::Valid => self.forkchoice_updated_valid.increment(1),
236 ForkchoiceStatus::Invalid => self.forkchoice_updated_invalid.increment(1),
237 ForkchoiceStatus::Syncing => self.forkchoice_updated_syncing.increment(1),
238 },
239 Err(_) => self.forkchoice_updated_error.increment(1),
240 }
241 self.forkchoice_updated_messages.increment(1);
242 if has_attrs {
243 self.forkchoice_with_attributes_updated_messages.increment(1);
244 }
245 self.forkchoice_updated_latency.record(elapsed);
246 self.forkchoice_updated_last.set(elapsed);
247 if let Some(latest_new_payload_at) = latest_new_payload_at.take() {
248 self.new_payload_forkchoice_updated_time_diff.record(start - latest_new_payload_at);
249 }
250 }
251}
252
253#[derive(Metrics)]
255#[metrics(scope = "consensus.engine.beacon")]
256pub(crate) struct NewPayloadStatusMetrics {
257 #[metric(skip)]
259 pub(crate) latest_at: Option<Instant>,
260 pub(crate) new_payload_messages: Counter,
262 pub(crate) new_payload_valid: Counter,
265 pub(crate) new_payload_invalid: Counter,
268 pub(crate) new_payload_syncing: Counter,
271 pub(crate) new_payload_accepted: Counter,
274 pub(crate) new_payload_error: Counter,
277 pub(crate) new_payload_total_gas: Histogram,
279 pub(crate) new_payload_gas_per_second: Histogram,
281 pub(crate) new_payload_gas_per_second_last: Gauge,
283 pub(crate) new_payload_latency: Histogram,
285 pub(crate) new_payload_last: Gauge,
287}
288
289impl NewPayloadStatusMetrics {
290 pub(crate) fn update_response_metrics(
292 &mut self,
293 start: Instant,
294 result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
295 gas_used: u64,
296 ) {
297 let finish = Instant::now();
298 let elapsed = finish - start;
299
300 self.latest_at = Some(finish);
301 match result {
302 Ok(outcome) => match outcome.outcome.status {
303 PayloadStatusEnum::Valid => {
304 self.new_payload_valid.increment(1);
305 self.new_payload_total_gas.record(gas_used as f64);
306 let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
307 self.new_payload_gas_per_second.record(gas_per_second);
308 self.new_payload_gas_per_second_last.set(gas_per_second);
309 }
310 PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
311 PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
312 PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
313 },
314 Err(_) => self.new_payload_error.increment(1),
315 }
316 self.new_payload_messages.increment(1);
317 self.new_payload_latency.record(elapsed);
318 self.new_payload_last.set(elapsed);
319 }
320}
321
322#[derive(Metrics)]
324#[metrics(scope = "sync.block_validation")]
325pub(crate) struct BlockValidationMetrics {
326 pub(crate) state_root_storage_tries_updated_total: Counter,
328 pub(crate) state_root_parallel_fallback_total: Counter,
330 pub(crate) state_root_duration: Gauge,
332 pub(crate) state_root_histogram: Histogram,
334 pub(crate) deferred_trie_compute_duration: Histogram,
336 pub(crate) deferred_trie_wait_duration: Histogram,
338 pub(crate) trie_input_duration: Histogram,
340 pub(crate) payload_validation_duration: Gauge,
342 pub(crate) payload_validation_histogram: Histogram,
344 pub(crate) spawn_payload_processor: Histogram,
346 pub(crate) post_execution_validation_duration: Histogram,
348 pub(crate) total_duration: Histogram,
350}
351
352impl BlockValidationMetrics {
353 pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
355 self.state_root_storage_tries_updated_total
356 .increment(trie_output.storage_tries_ref().len() as u64);
357 self.state_root_duration.set(elapsed_as_secs);
358 self.state_root_histogram.record(elapsed_as_secs);
359 }
360
361 pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
364 self.payload_validation_duration.set(elapsed_as_secs);
365 self.payload_validation_histogram.record(elapsed_as_secs);
366 }
367}
368
369#[derive(Metrics)]
371#[metrics(scope = "blockchain_tree.block_buffer")]
372pub(crate) struct BlockBufferMetrics {
373 pub blocks: Gauge,
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use alloy_eips::eip7685::Requests;
381 use alloy_evm::block::StateChangeSource;
382 use alloy_primitives::{B256, U256};
383 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
384 use reth_ethereum_primitives::{Receipt, TransactionSigned};
385 use reth_evm_ethereum::EthEvm;
386 use reth_execution_types::BlockExecutionResult;
387 use reth_primitives_traits::RecoveredBlock;
388 use revm::{
389 context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
390 database::State,
391 database_interface::EmptyDB,
392 inspector::NoOpInspector,
393 state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
394 Context, MainBuilder, MainContext,
395 };
396 use revm_primitives::Bytes;
397 use std::sync::mpsc;
398
399 struct MockExecutor {
401 state: EvmState,
402 hook: Option<Box<dyn OnStateHook>>,
403 }
404
405 impl MockExecutor {
406 fn new(state: EvmState) -> Self {
407 Self { state, hook: None }
408 }
409 }
410
411 type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
413
414 impl BlockExecutor for MockExecutor {
415 type Transaction = TransactionSigned;
416 type Receipt = Receipt;
417 type Evm = MockEvm;
418
419 fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
420 Ok(())
421 }
422
423 fn execute_transaction_without_commit(
424 &mut self,
425 _tx: impl ExecutableTx<Self>,
426 ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
427 if let Some(hook) = self.hook.as_mut() {
429 hook.on_state(StateChangeSource::Transaction(0), &self.state);
430 }
431
432 Ok(ResultAndState::new(
433 ExecutionResult::Success {
434 reason: SuccessReason::Return,
435 gas_used: 1000, gas_refunded: 0,
437 logs: vec![],
438 output: Output::Call(Bytes::from(vec![])),
439 },
440 Default::default(),
441 ))
442 }
443
444 fn commit_transaction(
445 &mut self,
446 _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
447 _tx: impl ExecutableTx<Self>,
448 ) -> Result<u64, BlockExecutionError> {
449 Ok(1000)
450 }
451
452 fn finish(
453 self,
454 ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
455 let Self { hook, state, .. } = self;
456
457 if let Some(mut hook) = hook {
459 hook.on_state(StateChangeSource::Transaction(0), &state);
460 }
461
462 let db = State::builder()
464 .with_database(EmptyDB::default())
465 .with_bundle_update()
466 .without_state_clear()
467 .build();
468 let evm = EthEvm::new(
469 Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
470 false,
471 );
472
473 Ok((
475 evm,
476 BlockExecutionResult {
477 receipts: vec![],
478 requests: Requests::default(),
479 gas_used: 1000,
480 blob_gas_used: 0,
481 },
482 ))
483 }
484
485 fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
486 self.hook = hook;
487 }
488
489 fn evm(&self) -> &Self::Evm {
490 panic!("Mock executor evm() not implemented")
491 }
492
493 fn evm_mut(&mut self) -> &mut Self::Evm {
494 panic!("Mock executor evm_mut() not implemented")
495 }
496 }
497
498 struct ChannelStateHook {
499 output: i32,
500 sender: mpsc::Sender<i32>,
501 }
502
503 impl OnStateHook for ChannelStateHook {
504 fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
505 let _ = self.sender.send(self.output);
506 }
507 }
508
509 fn setup_test_recorder() -> Snapshotter {
510 let recorder = DebuggingRecorder::new();
511 let snapshotter = recorder.snapshotter();
512 recorder.install().unwrap();
513 snapshotter
514 }
515
516 #[test]
517 fn test_executor_metrics_hook_called() {
518 let metrics = EngineApiMetrics::default();
519 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
520
521 let (tx, rx) = mpsc::channel();
522 let expected_output = 42;
523 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
524
525 let state = EvmState::default();
526 let executor = MockExecutor::new(state);
527
528 let _result = metrics.execute_metered::<_, EmptyDB>(
530 executor,
531 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
532 state_hook,
533 );
534
535 match rx.try_recv() {
537 Ok(actual_output) => assert_eq!(actual_output, expected_output),
538 Err(_) => {
539 }
542 }
543 }
544
545 #[test]
546 fn test_executor_metrics_hook_metrics_recorded() {
547 let snapshotter = setup_test_recorder();
548 let metrics = EngineApiMetrics::default();
549
550 metrics.executor.gas_processed_total.increment(0);
552 metrics.executor.gas_per_second.set(0.0);
553 metrics.executor.gas_used_histogram.record(0.0);
554
555 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
556
557 let (tx, _rx) = mpsc::channel();
558 let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
559
560 let state = {
562 let mut state = EvmState::default();
563 let storage =
564 EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
565 state.insert(
566 Default::default(),
567 Account {
568 info: AccountInfo {
569 balance: U256::from(100),
570 nonce: 10,
571 code_hash: B256::random(),
572 code: Default::default(),
573 },
574 storage,
575 status: AccountStatus::default(),
576 transaction_id: 0,
577 },
578 );
579 state
580 };
581
582 let executor = MockExecutor::new(state);
583
584 let _result = metrics.execute_metered::<_, EmptyDB>(
586 executor,
587 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
588 state_hook,
589 );
590
591 let snapshot = snapshotter.snapshot().into_vec();
592
593 let mut found_metrics = false;
595 for (key, _unit, _desc, _value) in snapshot {
596 let metric_name = key.key().name();
597 if metric_name.starts_with("sync.execution") {
598 found_metrics = true;
599 break;
600 }
601 }
602
603 assert!(found_metrics, "Expected to find sync.execution metrics");
604 }
605}