reth_engine_tree/tree/
metrics.rs

1use crate::tree::{error::InsertBlockFatalError, MeteredStateHook, TreeOutcome};
2use alloy_consensus::transaction::TxHashRef;
3use alloy_evm::{
4    block::{BlockExecutor, ExecutableTx},
5    Evm,
6};
7use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
8use core::borrow::BorrowMut;
9use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
10use reth_errors::{BlockExecutionError, ProviderError};
11use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
12use reth_execution_types::BlockExecutionOutput;
13use reth_metrics::{
14    metrics::{Counter, Gauge, Histogram},
15    Metrics,
16};
17use reth_primitives_traits::SignedTransaction;
18use reth_trie::updates::TrieUpdates;
19use revm::database::{states::bundle_state::BundleRetention, State};
20use revm_primitives::Address;
21use std::time::Instant;
22use tracing::{debug_span, trace};
23
24/// Metrics for the `EngineApi`.
25#[derive(Debug, Default)]
26pub(crate) struct EngineApiMetrics {
27    /// Engine API-specific metrics.
28    pub(crate) engine: EngineMetrics,
29    /// Block executor metrics.
30    pub(crate) executor: ExecutorMetrics,
31    /// Metrics for block validation
32    pub(crate) block_validation: BlockValidationMetrics,
33    /// Canonical chain and reorg related metrics
34    pub tree: TreeMetrics,
35}
36
37impl EngineApiMetrics {
38    /// Helper function for metered execution
39    fn metered<F, R>(&self, f: F) -> R
40    where
41        F: FnOnce() -> (u64, R),
42    {
43        // Execute the block and record the elapsed time.
44        let execute_start = Instant::now();
45        let (gas_used, output) = f();
46        let execution_duration = execute_start.elapsed().as_secs_f64();
47
48        // Update gas metrics.
49        self.executor.gas_processed_total.increment(gas_used);
50        self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
51        self.executor.gas_used_histogram.record(gas_used as f64);
52        self.executor.execution_histogram.record(execution_duration);
53        self.executor.execution_duration.set(execution_duration);
54
55        output
56    }
57
58    /// Execute the given block using the provided [`BlockExecutor`] and update metrics for the
59    /// execution.
60    ///
61    /// This method updates metrics for execution time, gas usage, and the number
62    /// of accounts, storage slots and bytecodes loaded and updated.
63    pub(crate) fn execute_metered<E, DB>(
64        &self,
65        executor: E,
66        mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
67        transaction_count: usize,
68        state_hook: Box<dyn OnStateHook>,
69    ) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
70    where
71        DB: alloy_evm::Database,
72        E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
73    {
74        // clone here is cheap, all the metrics are Option<Arc<_>>. additionally
75        // they are globally registered so that the data recorded in the hook will
76        // be accessible.
77        let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
78
79        let mut senders = Vec::with_capacity(transaction_count);
80        let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
81
82        let f = || {
83            let start = Instant::now();
84            debug_span!(target: "engine::tree", "pre execution")
85                .entered()
86                .in_scope(|| executor.apply_pre_execution_changes())?;
87            self.executor.pre_execution_histogram.record(start.elapsed());
88
89            let exec_span = debug_span!(target: "engine::tree", "execution").entered();
90            loop {
91                let start = Instant::now();
92                let Some(tx) = transactions.next() else { break };
93                self.executor.transaction_wait_histogram.record(start.elapsed());
94
95                let tx = tx?;
96                senders.push(*tx.signer());
97
98                let span =
99                    debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
100                let enter = span.entered();
101                trace!(target: "engine::tree", "Executing transaction");
102                let start = Instant::now();
103                let gas_used = executor.execute_transaction(tx)?;
104                self.executor.transaction_execution_histogram.record(start.elapsed());
105
106                // record the tx gas used
107                enter.record("gas_used", gas_used);
108            }
109            drop(exec_span);
110
111            let start = Instant::now();
112            let result = debug_span!(target: "engine::tree", "finish")
113                .entered()
114                .in_scope(|| executor.finish())
115                .map(|(evm, result)| (evm.into_db(), result));
116            self.executor.post_execution_histogram.record(start.elapsed());
117
118            result
119        };
120
121        // Use metered to execute and track timing/gas metrics
122        let (mut db, result) = self.metered(|| {
123            let res = f();
124            let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
125            (gas_used, res)
126        })?;
127
128        // merge transitions into bundle state
129        debug_span!(target: "engine::tree", "merge transitions")
130            .entered()
131            .in_scope(|| db.borrow_mut().merge_transitions(BundleRetention::Reverts));
132        let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
133
134        // Update the metrics for the number of accounts, storage slots and bytecodes updated
135        let accounts = output.state.state.len();
136        let storage_slots =
137            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
138        let bytecodes = output.state.contracts.len();
139
140        self.executor.accounts_updated_histogram.record(accounts as f64);
141        self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
142        self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
143
144        Ok((output, senders))
145    }
146}
147
148/// Metrics for the entire blockchain tree
149#[derive(Metrics)]
150#[metrics(scope = "blockchain_tree")]
151pub(crate) struct TreeMetrics {
152    /// The highest block number in the canonical chain
153    pub canonical_chain_height: Gauge,
154    /// The number of reorgs
155    pub reorgs: Counter,
156    /// The latest reorg depth
157    pub latest_reorg_depth: Gauge,
158    /// The current safe block height (this is required by optimism)
159    pub safe_block_height: Gauge,
160    /// The current finalized block height (this is required by optimism)
161    pub finalized_block_height: Gauge,
162}
163
164/// Metrics for the `EngineApi`.
165#[derive(Metrics)]
166#[metrics(scope = "consensus.engine.beacon")]
167pub(crate) struct EngineMetrics {
168    /// Engine API forkchoiceUpdated response type metrics
169    #[metric(skip)]
170    pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
171    /// Engine API newPayload response type metrics
172    #[metric(skip)]
173    pub(crate) new_payload: NewPayloadStatusMetrics,
174    /// How many executed blocks are currently stored.
175    pub(crate) executed_blocks: Gauge,
176    /// How many already executed blocks were directly inserted into the tree.
177    pub(crate) inserted_already_executed_blocks: Counter,
178    /// The number of times the pipeline was run.
179    pub(crate) pipeline_runs: Counter,
180    /// Newly arriving block hash is not present in executed blocks cache storage
181    pub(crate) executed_new_block_cache_miss: Counter,
182    /// Histogram of persistence operation durations (in seconds)
183    pub(crate) persistence_duration: Histogram,
184    /// Tracks the how often we failed to deliver a newPayload response.
185    ///
186    /// This effectively tracks how often the message sender dropped the channel and indicates a CL
187    /// request timeout (e.g. it took more than 8s to send the response and the CL terminated the
188    /// request which resulted in a closed channel).
189    pub(crate) failed_new_payload_response_deliveries: Counter,
190    /// Tracks the how often we failed to deliver a forkchoice update response.
191    pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
192    /// block insert duration
193    pub(crate) block_insert_total_duration: Histogram,
194}
195
196/// Metrics for engine forkchoiceUpdated responses.
197#[derive(Metrics)]
198#[metrics(scope = "consensus.engine.beacon")]
199pub(crate) struct ForkchoiceUpdatedMetrics {
200    /// The total count of forkchoice updated messages received.
201    pub(crate) forkchoice_updated_messages: Counter,
202    /// The total count of forkchoice updated messages with payload received.
203    pub(crate) forkchoice_with_attributes_updated_messages: Counter,
204    /// The total count of forkchoice updated messages that we responded to with
205    /// [`Valid`](ForkchoiceStatus::Valid).
206    pub(crate) forkchoice_updated_valid: Counter,
207    /// The total count of forkchoice updated messages that we responded to with
208    /// [`Invalid`](ForkchoiceStatus::Invalid).
209    pub(crate) forkchoice_updated_invalid: Counter,
210    /// The total count of forkchoice updated messages that we responded to with
211    /// [`Syncing`](ForkchoiceStatus::Syncing).
212    pub(crate) forkchoice_updated_syncing: Counter,
213    /// The total count of forkchoice updated messages that were unsuccessful, i.e. we responded
214    /// with an error type that is not a [`PayloadStatusEnum`].
215    pub(crate) forkchoice_updated_error: Counter,
216    /// Latency for the forkchoice updated calls.
217    pub(crate) forkchoice_updated_latency: Histogram,
218    /// Latency for the last forkchoice updated call.
219    pub(crate) forkchoice_updated_last: Gauge,
220    /// Time diff between new payload call response and the next forkchoice updated call request.
221    pub(crate) new_payload_forkchoice_updated_time_diff: Histogram,
222}
223
224impl ForkchoiceUpdatedMetrics {
225    /// Increment the forkchoiceUpdated counter based on the given result
226    pub(crate) fn update_response_metrics(
227        &self,
228        start: Instant,
229        latest_new_payload_at: &mut Option<Instant>,
230        has_attrs: bool,
231        result: &Result<TreeOutcome<OnForkChoiceUpdated>, ProviderError>,
232    ) {
233        let elapsed = start.elapsed();
234        match result {
235            Ok(outcome) => match outcome.outcome.forkchoice_status() {
236                ForkchoiceStatus::Valid => self.forkchoice_updated_valid.increment(1),
237                ForkchoiceStatus::Invalid => self.forkchoice_updated_invalid.increment(1),
238                ForkchoiceStatus::Syncing => self.forkchoice_updated_syncing.increment(1),
239            },
240            Err(_) => self.forkchoice_updated_error.increment(1),
241        }
242        self.forkchoice_updated_messages.increment(1);
243        if has_attrs {
244            self.forkchoice_with_attributes_updated_messages.increment(1);
245        }
246        self.forkchoice_updated_latency.record(elapsed);
247        self.forkchoice_updated_last.set(elapsed);
248        if let Some(latest_new_payload_at) = latest_new_payload_at.take() {
249            self.new_payload_forkchoice_updated_time_diff.record(start - latest_new_payload_at);
250        }
251    }
252}
253
254/// Metrics for engine newPayload responses.
255#[derive(Metrics)]
256#[metrics(scope = "consensus.engine.beacon")]
257pub(crate) struct NewPayloadStatusMetrics {
258    /// Finish time of the latest new payload call.
259    #[metric(skip)]
260    pub(crate) latest_at: Option<Instant>,
261    /// The total count of new payload messages received.
262    pub(crate) new_payload_messages: Counter,
263    /// The total count of new payload messages that we responded to with
264    /// [Valid](PayloadStatusEnum::Valid).
265    pub(crate) new_payload_valid: Counter,
266    /// The total count of new payload messages that we responded to with
267    /// [Invalid](PayloadStatusEnum::Invalid).
268    pub(crate) new_payload_invalid: Counter,
269    /// The total count of new payload messages that we responded to with
270    /// [Syncing](PayloadStatusEnum::Syncing).
271    pub(crate) new_payload_syncing: Counter,
272    /// The total count of new payload messages that we responded to with
273    /// [Accepted](PayloadStatusEnum::Accepted).
274    pub(crate) new_payload_accepted: Counter,
275    /// The total count of new payload messages that were unsuccessful, i.e. we responded with an
276    /// error type that is not a [`PayloadStatusEnum`].
277    pub(crate) new_payload_error: Counter,
278    /// The total gas of valid new payload messages received.
279    pub(crate) new_payload_total_gas: Histogram,
280    /// The gas per second of valid new payload messages received.
281    pub(crate) new_payload_gas_per_second: Histogram,
282    /// The gas per second for the last new payload call.
283    pub(crate) new_payload_gas_per_second_last: Gauge,
284    /// Latency for the new payload calls.
285    pub(crate) new_payload_latency: Histogram,
286    /// Latency for the last new payload call.
287    pub(crate) new_payload_last: Gauge,
288}
289
290impl NewPayloadStatusMetrics {
291    /// Increment the newPayload counter based on the given result
292    pub(crate) fn update_response_metrics(
293        &mut self,
294        start: Instant,
295        result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
296        gas_used: u64,
297    ) {
298        let finish = Instant::now();
299        let elapsed = finish - start;
300
301        self.latest_at = Some(finish);
302        match result {
303            Ok(outcome) => match outcome.outcome.status {
304                PayloadStatusEnum::Valid => {
305                    self.new_payload_valid.increment(1);
306                    self.new_payload_total_gas.record(gas_used as f64);
307                    let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
308                    self.new_payload_gas_per_second.record(gas_per_second);
309                    self.new_payload_gas_per_second_last.set(gas_per_second);
310                }
311                PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
312                PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
313                PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
314            },
315            Err(_) => self.new_payload_error.increment(1),
316        }
317        self.new_payload_messages.increment(1);
318        self.new_payload_latency.record(elapsed);
319        self.new_payload_last.set(elapsed);
320    }
321}
322
323/// Metrics for non-execution related block validation.
324#[derive(Metrics)]
325#[metrics(scope = "sync.block_validation")]
326pub(crate) struct BlockValidationMetrics {
327    /// Total number of storage tries updated in the state root calculation
328    pub(crate) state_root_storage_tries_updated_total: Counter,
329    /// Total number of times the parallel state root computation fell back to regular.
330    pub(crate) state_root_parallel_fallback_total: Counter,
331    /// Latest state root duration, ie the time spent blocked waiting for the state root.
332    pub(crate) state_root_duration: Gauge,
333    /// Histogram for state root duration ie the time spent blocked waiting for the state root
334    pub(crate) state_root_histogram: Histogram,
335    /// Histogram of deferred trie computation duration.
336    pub(crate) deferred_trie_compute_duration: Histogram,
337    /// Histogram of time spent waiting for deferred trie data to become available.
338    pub(crate) deferred_trie_wait_duration: Histogram,
339    /// Trie input computation duration
340    pub(crate) trie_input_duration: Histogram,
341    /// Payload conversion and validation latency
342    pub(crate) payload_validation_duration: Gauge,
343    /// Histogram of payload validation latency
344    pub(crate) payload_validation_histogram: Histogram,
345    /// Payload processor spawning duration
346    pub(crate) spawn_payload_processor: Histogram,
347    /// Post-execution validation duration
348    pub(crate) post_execution_validation_duration: Histogram,
349    /// Total duration of the new payload call
350    pub(crate) total_duration: Histogram,
351}
352
353impl BlockValidationMetrics {
354    /// Records a new state root time, updating both the histogram and state root gauge
355    pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
356        self.state_root_storage_tries_updated_total
357            .increment(trie_output.storage_tries_ref().len() as u64);
358        self.state_root_duration.set(elapsed_as_secs);
359        self.state_root_histogram.record(elapsed_as_secs);
360    }
361
362    /// Records a new payload validation time, updating both the histogram and the payload
363    /// validation gauge
364    pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
365        self.payload_validation_duration.set(elapsed_as_secs);
366        self.payload_validation_histogram.record(elapsed_as_secs);
367    }
368}
369
370/// Metrics for the blockchain tree block buffer
371#[derive(Metrics)]
372#[metrics(scope = "blockchain_tree.block_buffer")]
373pub(crate) struct BlockBufferMetrics {
374    /// Total blocks in the block buffer
375    pub blocks: Gauge,
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use alloy_eips::eip7685::Requests;
382    use alloy_evm::block::StateChangeSource;
383    use alloy_primitives::{B256, U256};
384    use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
385    use reth_ethereum_primitives::{Receipt, TransactionSigned};
386    use reth_evm_ethereum::EthEvm;
387    use reth_execution_types::BlockExecutionResult;
388    use reth_primitives_traits::RecoveredBlock;
389    use revm::{
390        context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
391        database::State,
392        database_interface::EmptyDB,
393        inspector::NoOpInspector,
394        state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
395        Context, MainBuilder, MainContext,
396    };
397    use revm_primitives::Bytes;
398    use std::sync::mpsc;
399
400    /// A simple mock executor for testing that doesn't require complex EVM setup
401    struct MockExecutor {
402        state: EvmState,
403        hook: Option<Box<dyn OnStateHook>>,
404    }
405
406    impl MockExecutor {
407        fn new(state: EvmState) -> Self {
408            Self { state, hook: None }
409        }
410    }
411
412    // Mock Evm type for testing
413    type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
414
415    impl BlockExecutor for MockExecutor {
416        type Transaction = TransactionSigned;
417        type Receipt = Receipt;
418        type Evm = MockEvm;
419
420        fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
421            Ok(())
422        }
423
424        fn execute_transaction_without_commit(
425            &mut self,
426            _tx: impl ExecutableTx<Self>,
427        ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
428            // Call hook with our mock state for each transaction
429            if let Some(hook) = self.hook.as_mut() {
430                hook.on_state(StateChangeSource::Transaction(0), &self.state);
431            }
432
433            Ok(ResultAndState::new(
434                ExecutionResult::Success {
435                    reason: SuccessReason::Return,
436                    gas_used: 1000, // Mock gas used
437                    gas_refunded: 0,
438                    logs: vec![],
439                    output: Output::Call(Bytes::from(vec![])),
440                },
441                Default::default(),
442            ))
443        }
444
445        fn commit_transaction(
446            &mut self,
447            _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
448            _tx: impl ExecutableTx<Self>,
449        ) -> Result<u64, BlockExecutionError> {
450            Ok(1000)
451        }
452
453        fn finish(
454            self,
455        ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
456            let Self { hook, state, .. } = self;
457
458            // Call hook with our mock state
459            if let Some(mut hook) = hook {
460                hook.on_state(StateChangeSource::Transaction(0), &state);
461            }
462
463            // Create a mock EVM
464            let db = State::builder()
465                .with_database(EmptyDB::default())
466                .with_bundle_update()
467                .without_state_clear()
468                .build();
469            let evm = EthEvm::new(
470                Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
471                false,
472            );
473
474            // Return successful result like the original tests
475            Ok((
476                evm,
477                BlockExecutionResult {
478                    receipts: vec![],
479                    requests: Requests::default(),
480                    gas_used: 1000,
481                    blob_gas_used: 0,
482                },
483            ))
484        }
485
486        fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
487            self.hook = hook;
488        }
489
490        fn evm(&self) -> &Self::Evm {
491            panic!("Mock executor evm() not implemented")
492        }
493
494        fn evm_mut(&mut self) -> &mut Self::Evm {
495            panic!("Mock executor evm_mut() not implemented")
496        }
497    }
498
499    struct ChannelStateHook {
500        output: i32,
501        sender: mpsc::Sender<i32>,
502    }
503
504    impl OnStateHook for ChannelStateHook {
505        fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
506            let _ = self.sender.send(self.output);
507        }
508    }
509
510    fn setup_test_recorder() -> Snapshotter {
511        let recorder = DebuggingRecorder::new();
512        let snapshotter = recorder.snapshotter();
513        recorder.install().unwrap();
514        snapshotter
515    }
516
517    #[test]
518    fn test_executor_metrics_hook_called() {
519        let metrics = EngineApiMetrics::default();
520        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
521
522        let (tx, rx) = mpsc::channel();
523        let expected_output = 42;
524        let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
525
526        let state = EvmState::default();
527        let executor = MockExecutor::new(state);
528
529        // This will fail to create the EVM but should still call the hook
530        let _result = metrics.execute_metered::<_, EmptyDB>(
531            executor,
532            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
533            input.transaction_count(),
534            state_hook,
535        );
536
537        // Check if hook was called (it might not be if finish() fails early)
538        match rx.try_recv() {
539            Ok(actual_output) => assert_eq!(actual_output, expected_output),
540            Err(_) => {
541                // Hook wasn't called, which is expected if the mock fails early
542                // The test still validates that the code compiles and runs
543            }
544        }
545    }
546
547    #[test]
548    fn test_executor_metrics_hook_metrics_recorded() {
549        let snapshotter = setup_test_recorder();
550        let metrics = EngineApiMetrics::default();
551
552        // Pre-populate some metrics to ensure they exist
553        metrics.executor.gas_processed_total.increment(0);
554        metrics.executor.gas_per_second.set(0.0);
555        metrics.executor.gas_used_histogram.record(0.0);
556
557        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
558
559        let (tx, _rx) = mpsc::channel();
560        let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
561
562        // Create a state with some data
563        let state = {
564            let mut state = EvmState::default();
565            let storage =
566                EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
567            state.insert(
568                Default::default(),
569                Account {
570                    info: AccountInfo {
571                        balance: U256::from(100),
572                        nonce: 10,
573                        code_hash: B256::random(),
574                        code: Default::default(),
575                    },
576                    storage,
577                    status: AccountStatus::default(),
578                    transaction_id: 0,
579                },
580            );
581            state
582        };
583
584        let executor = MockExecutor::new(state);
585
586        // Execute (will fail but should still update some metrics)
587        let _result = metrics.execute_metered::<_, EmptyDB>(
588            executor,
589            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
590            input.transaction_count(),
591            state_hook,
592        );
593
594        let snapshot = snapshotter.snapshot().into_vec();
595
596        // Verify that metrics were registered
597        let mut found_metrics = false;
598        for (key, _unit, _desc, _value) in snapshot {
599            let metric_name = key.key().name();
600            if metric_name.starts_with("sync.execution") {
601                found_metrics = true;
602                break;
603            }
604        }
605
606        assert!(found_metrics, "Expected to find sync.execution metrics");
607    }
608}