reth_engine_tree/tree/
metrics.rs

1use crate::tree::MeteredStateHook;
2use alloy_consensus::transaction::TxHashRef;
3use alloy_evm::{
4    block::{BlockExecutor, ExecutableTx},
5    Evm,
6};
7use core::borrow::BorrowMut;
8use reth_errors::BlockExecutionError;
9use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
10use reth_execution_types::BlockExecutionOutput;
11use reth_metrics::{
12    metrics::{Counter, Gauge, Histogram},
13    Metrics,
14};
15use reth_primitives_traits::SignedTransaction;
16use reth_trie::updates::TrieUpdates;
17use revm::database::{states::bundle_state::BundleRetention, State};
18use std::time::Instant;
19use tracing::{debug_span, trace};
20
21/// Metrics for the `EngineApi`.
22#[derive(Debug, Default)]
23pub(crate) struct EngineApiMetrics {
24    /// Engine API-specific metrics.
25    pub(crate) engine: EngineMetrics,
26    /// Block executor metrics.
27    pub(crate) executor: ExecutorMetrics,
28    /// Metrics for block validation
29    pub(crate) block_validation: BlockValidationMetrics,
30    /// Canonical chain and reorg related metrics
31    pub tree: TreeMetrics,
32}
33
34impl EngineApiMetrics {
35    /// Helper function for metered execution
36    fn metered<F, R>(&self, f: F) -> R
37    where
38        F: FnOnce() -> (u64, R),
39    {
40        // Execute the block and record the elapsed time.
41        let execute_start = Instant::now();
42        let (gas_used, output) = f();
43        let execution_duration = execute_start.elapsed().as_secs_f64();
44
45        // Update gas metrics.
46        self.executor.gas_processed_total.increment(gas_used);
47        self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
48        self.executor.gas_used_histogram.record(gas_used as f64);
49        self.executor.execution_histogram.record(execution_duration);
50        self.executor.execution_duration.set(execution_duration);
51
52        output
53    }
54
55    /// Execute the given block using the provided [`BlockExecutor`] and update metrics for the
56    /// execution.
57    ///
58    /// This method updates metrics for execution time, gas usage, and the number
59    /// of accounts, storage slots and bytecodes loaded and updated.
60    pub(crate) fn execute_metered<E, DB>(
61        &self,
62        executor: E,
63        transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
64        state_hook: Box<dyn OnStateHook>,
65    ) -> Result<BlockExecutionOutput<E::Receipt>, BlockExecutionError>
66    where
67        DB: alloy_evm::Database,
68        E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
69    {
70        // clone here is cheap, all the metrics are Option<Arc<_>>. additionally
71        // they are globally registered so that the data recorded in the hook will
72        // be accessible.
73        let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
74
75        let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
76
77        let f = || {
78            executor.apply_pre_execution_changes()?;
79            for tx in transactions {
80                let tx = tx?;
81                let span =
82                    debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
83                let _enter = span.enter();
84                trace!(target: "engine::tree", "Executing transaction");
85                executor.execute_transaction(tx)?;
86            }
87            executor.finish().map(|(evm, result)| (evm.into_db(), result))
88        };
89
90        // Use metered to execute and track timing/gas metrics
91        let (mut db, result) = self.metered(|| {
92            let res = f();
93            let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
94            (gas_used, res)
95        })?;
96
97        // merge transitions into bundle state
98        db.borrow_mut().merge_transitions(BundleRetention::Reverts);
99        let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
100
101        // Update the metrics for the number of accounts, storage slots and bytecodes updated
102        let accounts = output.state.state.len();
103        let storage_slots =
104            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
105        let bytecodes = output.state.contracts.len();
106
107        self.executor.accounts_updated_histogram.record(accounts as f64);
108        self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
109        self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
110
111        Ok(output)
112    }
113}
114
115/// Metrics for the entire blockchain tree
116#[derive(Metrics)]
117#[metrics(scope = "blockchain_tree")]
118pub(crate) struct TreeMetrics {
119    /// The highest block number in the canonical chain
120    pub canonical_chain_height: Gauge,
121    /// The number of reorgs
122    pub reorgs: Counter,
123    /// The latest reorg depth
124    pub latest_reorg_depth: Gauge,
125    /// The current safe block height (this is required by optimism)
126    pub safe_block_height: Gauge,
127    /// The current finalized block height (this is required by optimism)
128    pub finalized_block_height: Gauge,
129}
130
131/// Metrics for the `EngineApi`.
132#[derive(Metrics)]
133#[metrics(scope = "consensus.engine.beacon")]
134pub(crate) struct EngineMetrics {
135    /// How many executed blocks are currently stored.
136    pub(crate) executed_blocks: Gauge,
137    /// How many already executed blocks were directly inserted into the tree.
138    pub(crate) inserted_already_executed_blocks: Counter,
139    /// The number of times the pipeline was run.
140    pub(crate) pipeline_runs: Counter,
141    /// The total count of forkchoice updated messages received.
142    pub(crate) forkchoice_updated_messages: Counter,
143    /// The total count of forkchoice updated messages with payload received.
144    pub(crate) forkchoice_with_attributes_updated_messages: Counter,
145    /// Newly arriving block hash is not present in executed blocks cache storage
146    pub(crate) executed_new_block_cache_miss: Counter,
147    /// The total count of new payload messages received.
148    pub(crate) new_payload_messages: Counter,
149    /// Histogram of persistence operation durations (in seconds)
150    pub(crate) persistence_duration: Histogram,
151    /// Tracks the how often we failed to deliver a newPayload response.
152    ///
153    /// This effectively tracks how often the message sender dropped the channel and indicates a CL
154    /// request timeout (e.g. it took more than 8s to send the response and the CL terminated the
155    /// request which resulted in a closed channel).
156    pub(crate) failed_new_payload_response_deliveries: Counter,
157    /// Tracks the how often we failed to deliver a forkchoice update response.
158    pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
159    /// block insert duration
160    pub(crate) block_insert_total_duration: Histogram,
161}
162
163/// Metrics for non-execution related block validation.
164#[derive(Metrics)]
165#[metrics(scope = "sync.block_validation")]
166pub(crate) struct BlockValidationMetrics {
167    /// Total number of storage tries updated in the state root calculation
168    pub(crate) state_root_storage_tries_updated_total: Counter,
169    /// Total number of times the parallel state root computation fell back to regular.
170    pub(crate) state_root_parallel_fallback_total: Counter,
171    /// Latest state root duration, ie the time spent blocked waiting for the state root.
172    pub(crate) state_root_duration: Gauge,
173    /// Histogram for state root duration ie the time spent blocked waiting for the state root
174    pub(crate) state_root_histogram: Histogram,
175    /// Trie input computation duration
176    pub(crate) trie_input_duration: Histogram,
177    /// Payload conversion and validation latency
178    pub(crate) payload_validation_duration: Gauge,
179    /// Histogram of payload validation latency
180    pub(crate) payload_validation_histogram: Histogram,
181    /// Payload processor spawning duration
182    pub(crate) spawn_payload_processor: Histogram,
183    /// Post-execution validation duration
184    pub(crate) post_execution_validation_duration: Histogram,
185    /// Total duration of the new payload call
186    pub(crate) total_duration: Histogram,
187}
188
189impl BlockValidationMetrics {
190    /// Records a new state root time, updating both the histogram and state root gauge
191    pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
192        self.state_root_storage_tries_updated_total
193            .increment(trie_output.storage_tries_ref().len() as u64);
194        self.state_root_duration.set(elapsed_as_secs);
195        self.state_root_histogram.record(elapsed_as_secs);
196    }
197
198    /// Records a new payload validation time, updating both the histogram and the payload
199    /// validation gauge
200    pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
201        self.payload_validation_duration.set(elapsed_as_secs);
202        self.payload_validation_histogram.record(elapsed_as_secs);
203    }
204}
205
206/// Metrics for the blockchain tree block buffer
207#[derive(Metrics)]
208#[metrics(scope = "blockchain_tree.block_buffer")]
209pub(crate) struct BlockBufferMetrics {
210    /// Total blocks in the block buffer
211    pub blocks: Gauge,
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use alloy_eips::eip7685::Requests;
218    use alloy_evm::block::StateChangeSource;
219    use alloy_primitives::{B256, U256};
220    use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
221    use reth_ethereum_primitives::{Receipt, TransactionSigned};
222    use reth_evm_ethereum::EthEvm;
223    use reth_execution_types::BlockExecutionResult;
224    use reth_primitives_traits::RecoveredBlock;
225    use revm::{
226        context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
227        database::State,
228        database_interface::EmptyDB,
229        inspector::NoOpInspector,
230        state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
231        Context, MainBuilder, MainContext,
232    };
233    use revm_primitives::Bytes;
234    use std::sync::mpsc;
235
236    /// A simple mock executor for testing that doesn't require complex EVM setup
237    struct MockExecutor {
238        state: EvmState,
239        hook: Option<Box<dyn OnStateHook>>,
240    }
241
242    impl MockExecutor {
243        fn new(state: EvmState) -> Self {
244            Self { state, hook: None }
245        }
246    }
247
248    // Mock Evm type for testing
249    type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
250
251    impl BlockExecutor for MockExecutor {
252        type Transaction = TransactionSigned;
253        type Receipt = Receipt;
254        type Evm = MockEvm;
255
256        fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
257            Ok(())
258        }
259
260        fn execute_transaction_without_commit(
261            &mut self,
262            _tx: impl ExecutableTx<Self>,
263        ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
264            // Call hook with our mock state for each transaction
265            if let Some(hook) = self.hook.as_mut() {
266                hook.on_state(StateChangeSource::Transaction(0), &self.state);
267            }
268
269            Ok(ResultAndState::new(
270                ExecutionResult::Success {
271                    reason: SuccessReason::Return,
272                    gas_used: 1000, // Mock gas used
273                    gas_refunded: 0,
274                    logs: vec![],
275                    output: Output::Call(Bytes::from(vec![])),
276                },
277                Default::default(),
278            ))
279        }
280
281        fn commit_transaction(
282            &mut self,
283            _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
284            _tx: impl ExecutableTx<Self>,
285        ) -> Result<u64, BlockExecutionError> {
286            Ok(1000)
287        }
288
289        fn finish(
290            self,
291        ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
292            let Self { hook, state, .. } = self;
293
294            // Call hook with our mock state
295            if let Some(mut hook) = hook {
296                hook.on_state(StateChangeSource::Transaction(0), &state);
297            }
298
299            // Create a mock EVM
300            let db = State::builder()
301                .with_database(EmptyDB::default())
302                .with_bundle_update()
303                .without_state_clear()
304                .build();
305            let evm = EthEvm::new(
306                Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
307                false,
308            );
309
310            // Return successful result like the original tests
311            Ok((
312                evm,
313                BlockExecutionResult {
314                    receipts: vec![],
315                    requests: Requests::default(),
316                    gas_used: 1000,
317                    blob_gas_used: 0,
318                },
319            ))
320        }
321
322        fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
323            self.hook = hook;
324        }
325
326        fn evm(&self) -> &Self::Evm {
327            panic!("Mock executor evm() not implemented")
328        }
329
330        fn evm_mut(&mut self) -> &mut Self::Evm {
331            panic!("Mock executor evm_mut() not implemented")
332        }
333    }
334
335    struct ChannelStateHook {
336        output: i32,
337        sender: mpsc::Sender<i32>,
338    }
339
340    impl OnStateHook for ChannelStateHook {
341        fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
342            let _ = self.sender.send(self.output);
343        }
344    }
345
346    fn setup_test_recorder() -> Snapshotter {
347        let recorder = DebuggingRecorder::new();
348        let snapshotter = recorder.snapshotter();
349        recorder.install().unwrap();
350        snapshotter
351    }
352
353    #[test]
354    fn test_executor_metrics_hook_called() {
355        let metrics = EngineApiMetrics::default();
356        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
357
358        let (tx, rx) = mpsc::channel();
359        let expected_output = 42;
360        let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
361
362        let state = EvmState::default();
363        let executor = MockExecutor::new(state);
364
365        // This will fail to create the EVM but should still call the hook
366        let _result = metrics.execute_metered::<_, EmptyDB>(
367            executor,
368            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
369            state_hook,
370        );
371
372        // Check if hook was called (it might not be if finish() fails early)
373        match rx.try_recv() {
374            Ok(actual_output) => assert_eq!(actual_output, expected_output),
375            Err(_) => {
376                // Hook wasn't called, which is expected if the mock fails early
377                // The test still validates that the code compiles and runs
378            }
379        }
380    }
381
382    #[test]
383    fn test_executor_metrics_hook_metrics_recorded() {
384        let snapshotter = setup_test_recorder();
385        let metrics = EngineApiMetrics::default();
386
387        // Pre-populate some metrics to ensure they exist
388        metrics.executor.gas_processed_total.increment(0);
389        metrics.executor.gas_per_second.set(0.0);
390        metrics.executor.gas_used_histogram.record(0.0);
391
392        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
393
394        let (tx, _rx) = mpsc::channel();
395        let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
396
397        // Create a state with some data
398        let state = {
399            let mut state = EvmState::default();
400            let storage =
401                EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
402            state.insert(
403                Default::default(),
404                Account {
405                    info: AccountInfo {
406                        balance: U256::from(100),
407                        nonce: 10,
408                        code_hash: B256::random(),
409                        code: Default::default(),
410                    },
411                    storage,
412                    status: AccountStatus::default(),
413                    transaction_id: 0,
414                },
415            );
416            state
417        };
418
419        let executor = MockExecutor::new(state);
420
421        // Execute (will fail but should still update some metrics)
422        let _result = metrics.execute_metered::<_, EmptyDB>(
423            executor,
424            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
425            state_hook,
426        );
427
428        let snapshot = snapshotter.snapshot().into_vec();
429
430        // Verify that metrics were registered
431        let mut found_metrics = false;
432        for (key, _unit, _desc, _value) in snapshot {
433            let metric_name = key.key().name();
434            if metric_name.starts_with("sync.execution") {
435                found_metrics = true;
436                break;
437            }
438        }
439
440        assert!(found_metrics, "Expected to find sync.execution metrics");
441    }
442}