reth_engine_tree/tree/
metrics.rs

1use crate::tree::{error::InsertBlockFatalError, MeteredStateHook, TreeOutcome};
2use alloy_consensus::transaction::TxHashRef;
3use alloy_evm::{
4    block::{BlockExecutor, ExecutableTx},
5    Evm,
6};
7use alloy_rpc_types_engine::{PayloadStatus, PayloadStatusEnum};
8use core::borrow::BorrowMut;
9use reth_engine_primitives::{ForkchoiceStatus, OnForkChoiceUpdated};
10use reth_errors::{BlockExecutionError, ProviderError};
11use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
12use reth_execution_types::BlockExecutionOutput;
13use reth_metrics::{
14    metrics::{Counter, Gauge, Histogram},
15    Metrics,
16};
17use reth_primitives_traits::SignedTransaction;
18use reth_trie::updates::TrieUpdates;
19use revm::database::{states::bundle_state::BundleRetention, State};
20use revm_primitives::Address;
21use std::time::Instant;
22use tracing::{debug_span, trace};
23
24/// Metrics for the `EngineApi`.
25#[derive(Debug, Default)]
26pub(crate) struct EngineApiMetrics {
27    /// Engine API-specific metrics.
28    pub(crate) engine: EngineMetrics,
29    /// Block executor metrics.
30    pub(crate) executor: ExecutorMetrics,
31    /// Metrics for block validation
32    pub(crate) block_validation: BlockValidationMetrics,
33    /// Canonical chain and reorg related metrics
34    pub tree: TreeMetrics,
35}
36
37impl EngineApiMetrics {
38    /// Helper function for metered execution
39    fn metered<F, R>(&self, f: F) -> R
40    where
41        F: FnOnce() -> (u64, R),
42    {
43        // Execute the block and record the elapsed time.
44        let execute_start = Instant::now();
45        let (gas_used, output) = f();
46        let execution_duration = execute_start.elapsed().as_secs_f64();
47
48        // Update gas metrics.
49        self.executor.gas_processed_total.increment(gas_used);
50        self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
51        self.executor.gas_used_histogram.record(gas_used as f64);
52        self.executor.execution_histogram.record(execution_duration);
53        self.executor.execution_duration.set(execution_duration);
54
55        output
56    }
57
58    /// Execute the given block using the provided [`BlockExecutor`] and update metrics for the
59    /// execution.
60    ///
61    /// This method updates metrics for execution time, gas usage, and the number
62    /// of accounts, storage slots and bytecodes loaded and updated.
63    pub(crate) fn execute_metered<E, DB>(
64        &self,
65        executor: E,
66        mut transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
67        state_hook: Box<dyn OnStateHook>,
68    ) -> Result<(BlockExecutionOutput<E::Receipt>, Vec<Address>), BlockExecutionError>
69    where
70        DB: alloy_evm::Database,
71        E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
72    {
73        // clone here is cheap, all the metrics are Option<Arc<_>>. additionally
74        // they are globally registered so that the data recorded in the hook will
75        // be accessible.
76        let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
77
78        let mut senders = Vec::new();
79        let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
80
81        let f = || {
82            let start = Instant::now();
83            debug_span!(target: "engine::tree", "pre execution")
84                .entered()
85                .in_scope(|| executor.apply_pre_execution_changes())?;
86            self.executor.pre_execution_histogram.record(start.elapsed());
87
88            let exec_span = debug_span!(target: "engine::tree", "execution").entered();
89            loop {
90                let start = Instant::now();
91                let Some(tx) = transactions.next() else { break };
92                self.executor.transaction_wait_histogram.record(start.elapsed());
93
94                let tx = tx?;
95                senders.push(*tx.signer());
96
97                let span =
98                    debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
99                let enter = span.entered();
100                trace!(target: "engine::tree", "Executing transaction");
101                let start = Instant::now();
102                let gas_used = executor.execute_transaction(tx)?;
103                self.executor.transaction_execution_histogram.record(start.elapsed());
104
105                // record the tx gas used
106                enter.record("gas_used", gas_used);
107            }
108            drop(exec_span);
109
110            let start = Instant::now();
111            let result = debug_span!(target: "engine::tree", "finish")
112                .entered()
113                .in_scope(|| executor.finish())
114                .map(|(evm, result)| (evm.into_db(), result));
115            self.executor.post_execution_histogram.record(start.elapsed());
116
117            result
118        };
119
120        // Use metered to execute and track timing/gas metrics
121        let (mut db, result) = self.metered(|| {
122            let res = f();
123            let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
124            (gas_used, res)
125        })?;
126
127        // merge transitions into bundle state
128        debug_span!(target: "engine::tree", "merge transitions")
129            .entered()
130            .in_scope(|| db.borrow_mut().merge_transitions(BundleRetention::Reverts));
131        let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
132
133        // Update the metrics for the number of accounts, storage slots and bytecodes updated
134        let accounts = output.state.state.len();
135        let storage_slots =
136            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
137        let bytecodes = output.state.contracts.len();
138
139        self.executor.accounts_updated_histogram.record(accounts as f64);
140        self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
141        self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
142
143        Ok((output, senders))
144    }
145}
146
147/// Metrics for the entire blockchain tree
148#[derive(Metrics)]
149#[metrics(scope = "blockchain_tree")]
150pub(crate) struct TreeMetrics {
151    /// The highest block number in the canonical chain
152    pub canonical_chain_height: Gauge,
153    /// The number of reorgs
154    pub reorgs: Counter,
155    /// The latest reorg depth
156    pub latest_reorg_depth: Gauge,
157    /// The current safe block height (this is required by optimism)
158    pub safe_block_height: Gauge,
159    /// The current finalized block height (this is required by optimism)
160    pub finalized_block_height: Gauge,
161}
162
163/// Metrics for the `EngineApi`.
164#[derive(Metrics)]
165#[metrics(scope = "consensus.engine.beacon")]
166pub(crate) struct EngineMetrics {
167    /// Engine API forkchoiceUpdated response type metrics
168    #[metric(skip)]
169    pub(crate) forkchoice_updated: ForkchoiceUpdatedMetrics,
170    /// Engine API newPayload response type metrics
171    #[metric(skip)]
172    pub(crate) new_payload: NewPayloadStatusMetrics,
173    /// How many executed blocks are currently stored.
174    pub(crate) executed_blocks: Gauge,
175    /// How many already executed blocks were directly inserted into the tree.
176    pub(crate) inserted_already_executed_blocks: Counter,
177    /// The number of times the pipeline was run.
178    pub(crate) pipeline_runs: Counter,
179    /// Newly arriving block hash is not present in executed blocks cache storage
180    pub(crate) executed_new_block_cache_miss: Counter,
181    /// Histogram of persistence operation durations (in seconds)
182    pub(crate) persistence_duration: Histogram,
183    /// Tracks the how often we failed to deliver a newPayload response.
184    ///
185    /// This effectively tracks how often the message sender dropped the channel and indicates a CL
186    /// request timeout (e.g. it took more than 8s to send the response and the CL terminated the
187    /// request which resulted in a closed channel).
188    pub(crate) failed_new_payload_response_deliveries: Counter,
189    /// Tracks the how often we failed to deliver a forkchoice update response.
190    pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
191    /// block insert duration
192    pub(crate) block_insert_total_duration: Histogram,
193}
194
195/// Metrics for engine forkchoiceUpdated responses.
196#[derive(Metrics)]
197#[metrics(scope = "consensus.engine.beacon")]
198pub(crate) struct ForkchoiceUpdatedMetrics {
199    /// The total count of forkchoice updated messages received.
200    pub(crate) forkchoice_updated_messages: Counter,
201    /// The total count of forkchoice updated messages with payload received.
202    pub(crate) forkchoice_with_attributes_updated_messages: Counter,
203    /// The total count of forkchoice updated messages that we responded to with
204    /// [`Valid`](ForkchoiceStatus::Valid).
205    pub(crate) forkchoice_updated_valid: Counter,
206    /// The total count of forkchoice updated messages that we responded to with
207    /// [`Invalid`](ForkchoiceStatus::Invalid).
208    pub(crate) forkchoice_updated_invalid: Counter,
209    /// The total count of forkchoice updated messages that we responded to with
210    /// [`Syncing`](ForkchoiceStatus::Syncing).
211    pub(crate) forkchoice_updated_syncing: Counter,
212    /// The total count of forkchoice updated messages that were unsuccessful, i.e. we responded
213    /// with an error type that is not a [`PayloadStatusEnum`].
214    pub(crate) forkchoice_updated_error: Counter,
215    /// Latency for the forkchoice updated calls.
216    pub(crate) forkchoice_updated_latency: Histogram,
217    /// Latency for the last forkchoice updated call.
218    pub(crate) forkchoice_updated_last: Gauge,
219    /// Time diff between new payload call response and the next forkchoice updated call request.
220    pub(crate) new_payload_forkchoice_updated_time_diff: Histogram,
221}
222
223impl ForkchoiceUpdatedMetrics {
224    /// Increment the forkchoiceUpdated counter based on the given result
225    pub(crate) fn update_response_metrics(
226        &self,
227        start: Instant,
228        latest_new_payload_at: &mut Option<Instant>,
229        has_attrs: bool,
230        result: &Result<TreeOutcome<OnForkChoiceUpdated>, ProviderError>,
231    ) {
232        let elapsed = start.elapsed();
233        match result {
234            Ok(outcome) => match outcome.outcome.forkchoice_status() {
235                ForkchoiceStatus::Valid => self.forkchoice_updated_valid.increment(1),
236                ForkchoiceStatus::Invalid => self.forkchoice_updated_invalid.increment(1),
237                ForkchoiceStatus::Syncing => self.forkchoice_updated_syncing.increment(1),
238            },
239            Err(_) => self.forkchoice_updated_error.increment(1),
240        }
241        self.forkchoice_updated_messages.increment(1);
242        if has_attrs {
243            self.forkchoice_with_attributes_updated_messages.increment(1);
244        }
245        self.forkchoice_updated_latency.record(elapsed);
246        self.forkchoice_updated_last.set(elapsed);
247        if let Some(latest_new_payload_at) = latest_new_payload_at.take() {
248            self.new_payload_forkchoice_updated_time_diff.record(start - latest_new_payload_at);
249        }
250    }
251}
252
253/// Metrics for engine newPayload responses.
254#[derive(Metrics)]
255#[metrics(scope = "consensus.engine.beacon")]
256pub(crate) struct NewPayloadStatusMetrics {
257    /// Finish time of the latest new payload call.
258    #[metric(skip)]
259    pub(crate) latest_at: Option<Instant>,
260    /// The total count of new payload messages received.
261    pub(crate) new_payload_messages: Counter,
262    /// The total count of new payload messages that we responded to with
263    /// [Valid](PayloadStatusEnum::Valid).
264    pub(crate) new_payload_valid: Counter,
265    /// The total count of new payload messages that we responded to with
266    /// [Invalid](PayloadStatusEnum::Invalid).
267    pub(crate) new_payload_invalid: Counter,
268    /// The total count of new payload messages that we responded to with
269    /// [Syncing](PayloadStatusEnum::Syncing).
270    pub(crate) new_payload_syncing: Counter,
271    /// The total count of new payload messages that we responded to with
272    /// [Accepted](PayloadStatusEnum::Accepted).
273    pub(crate) new_payload_accepted: Counter,
274    /// The total count of new payload messages that were unsuccessful, i.e. we responded with an
275    /// error type that is not a [`PayloadStatusEnum`].
276    pub(crate) new_payload_error: Counter,
277    /// The total gas of valid new payload messages received.
278    pub(crate) new_payload_total_gas: Histogram,
279    /// The gas per second of valid new payload messages received.
280    pub(crate) new_payload_gas_per_second: Histogram,
281    /// The gas per second for the last new payload call.
282    pub(crate) new_payload_gas_per_second_last: Gauge,
283    /// Latency for the new payload calls.
284    pub(crate) new_payload_latency: Histogram,
285    /// Latency for the last new payload call.
286    pub(crate) new_payload_last: Gauge,
287}
288
289impl NewPayloadStatusMetrics {
290    /// Increment the newPayload counter based on the given result
291    pub(crate) fn update_response_metrics(
292        &mut self,
293        start: Instant,
294        result: &Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError>,
295        gas_used: u64,
296    ) {
297        let finish = Instant::now();
298        let elapsed = finish - start;
299
300        self.latest_at = Some(finish);
301        match result {
302            Ok(outcome) => match outcome.outcome.status {
303                PayloadStatusEnum::Valid => {
304                    self.new_payload_valid.increment(1);
305                    self.new_payload_total_gas.record(gas_used as f64);
306                    let gas_per_second = gas_used as f64 / elapsed.as_secs_f64();
307                    self.new_payload_gas_per_second.record(gas_per_second);
308                    self.new_payload_gas_per_second_last.set(gas_per_second);
309                }
310                PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
311                PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
312                PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
313            },
314            Err(_) => self.new_payload_error.increment(1),
315        }
316        self.new_payload_messages.increment(1);
317        self.new_payload_latency.record(elapsed);
318        self.new_payload_last.set(elapsed);
319    }
320}
321
322/// Metrics for non-execution related block validation.
323#[derive(Metrics)]
324#[metrics(scope = "sync.block_validation")]
325pub(crate) struct BlockValidationMetrics {
326    /// Total number of storage tries updated in the state root calculation
327    pub(crate) state_root_storage_tries_updated_total: Counter,
328    /// Total number of times the parallel state root computation fell back to regular.
329    pub(crate) state_root_parallel_fallback_total: Counter,
330    /// Latest state root duration, ie the time spent blocked waiting for the state root.
331    pub(crate) state_root_duration: Gauge,
332    /// Histogram for state root duration ie the time spent blocked waiting for the state root
333    pub(crate) state_root_histogram: Histogram,
334    /// Histogram of deferred trie computation duration.
335    pub(crate) deferred_trie_compute_duration: Histogram,
336    /// Histogram of time spent waiting for deferred trie data to become available.
337    pub(crate) deferred_trie_wait_duration: Histogram,
338    /// Trie input computation duration
339    pub(crate) trie_input_duration: Histogram,
340    /// Payload conversion and validation latency
341    pub(crate) payload_validation_duration: Gauge,
342    /// Histogram of payload validation latency
343    pub(crate) payload_validation_histogram: Histogram,
344    /// Payload processor spawning duration
345    pub(crate) spawn_payload_processor: Histogram,
346    /// Post-execution validation duration
347    pub(crate) post_execution_validation_duration: Histogram,
348    /// Total duration of the new payload call
349    pub(crate) total_duration: Histogram,
350}
351
352impl BlockValidationMetrics {
353    /// Records a new state root time, updating both the histogram and state root gauge
354    pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
355        self.state_root_storage_tries_updated_total
356            .increment(trie_output.storage_tries_ref().len() as u64);
357        self.state_root_duration.set(elapsed_as_secs);
358        self.state_root_histogram.record(elapsed_as_secs);
359    }
360
361    /// Records a new payload validation time, updating both the histogram and the payload
362    /// validation gauge
363    pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
364        self.payload_validation_duration.set(elapsed_as_secs);
365        self.payload_validation_histogram.record(elapsed_as_secs);
366    }
367}
368
369/// Metrics for the blockchain tree block buffer
370#[derive(Metrics)]
371#[metrics(scope = "blockchain_tree.block_buffer")]
372pub(crate) struct BlockBufferMetrics {
373    /// Total blocks in the block buffer
374    pub blocks: Gauge,
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use alloy_eips::eip7685::Requests;
381    use alloy_evm::block::StateChangeSource;
382    use alloy_primitives::{B256, U256};
383    use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
384    use reth_ethereum_primitives::{Receipt, TransactionSigned};
385    use reth_evm_ethereum::EthEvm;
386    use reth_execution_types::BlockExecutionResult;
387    use reth_primitives_traits::RecoveredBlock;
388    use revm::{
389        context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
390        database::State,
391        database_interface::EmptyDB,
392        inspector::NoOpInspector,
393        state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
394        Context, MainBuilder, MainContext,
395    };
396    use revm_primitives::Bytes;
397    use std::sync::mpsc;
398
399    /// A simple mock executor for testing that doesn't require complex EVM setup
400    struct MockExecutor {
401        state: EvmState,
402        hook: Option<Box<dyn OnStateHook>>,
403    }
404
405    impl MockExecutor {
406        fn new(state: EvmState) -> Self {
407            Self { state, hook: None }
408        }
409    }
410
411    // Mock Evm type for testing
412    type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
413
414    impl BlockExecutor for MockExecutor {
415        type Transaction = TransactionSigned;
416        type Receipt = Receipt;
417        type Evm = MockEvm;
418
419        fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
420            Ok(())
421        }
422
423        fn execute_transaction_without_commit(
424            &mut self,
425            _tx: impl ExecutableTx<Self>,
426        ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
427            // Call hook with our mock state for each transaction
428            if let Some(hook) = self.hook.as_mut() {
429                hook.on_state(StateChangeSource::Transaction(0), &self.state);
430            }
431
432            Ok(ResultAndState::new(
433                ExecutionResult::Success {
434                    reason: SuccessReason::Return,
435                    gas_used: 1000, // Mock gas used
436                    gas_refunded: 0,
437                    logs: vec![],
438                    output: Output::Call(Bytes::from(vec![])),
439                },
440                Default::default(),
441            ))
442        }
443
444        fn commit_transaction(
445            &mut self,
446            _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
447            _tx: impl ExecutableTx<Self>,
448        ) -> Result<u64, BlockExecutionError> {
449            Ok(1000)
450        }
451
452        fn finish(
453            self,
454        ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
455            let Self { hook, state, .. } = self;
456
457            // Call hook with our mock state
458            if let Some(mut hook) = hook {
459                hook.on_state(StateChangeSource::Transaction(0), &state);
460            }
461
462            // Create a mock EVM
463            let db = State::builder()
464                .with_database(EmptyDB::default())
465                .with_bundle_update()
466                .without_state_clear()
467                .build();
468            let evm = EthEvm::new(
469                Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
470                false,
471            );
472
473            // Return successful result like the original tests
474            Ok((
475                evm,
476                BlockExecutionResult {
477                    receipts: vec![],
478                    requests: Requests::default(),
479                    gas_used: 1000,
480                    blob_gas_used: 0,
481                },
482            ))
483        }
484
485        fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
486            self.hook = hook;
487        }
488
489        fn evm(&self) -> &Self::Evm {
490            panic!("Mock executor evm() not implemented")
491        }
492
493        fn evm_mut(&mut self) -> &mut Self::Evm {
494            panic!("Mock executor evm_mut() not implemented")
495        }
496    }
497
498    struct ChannelStateHook {
499        output: i32,
500        sender: mpsc::Sender<i32>,
501    }
502
503    impl OnStateHook for ChannelStateHook {
504        fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
505            let _ = self.sender.send(self.output);
506        }
507    }
508
509    fn setup_test_recorder() -> Snapshotter {
510        let recorder = DebuggingRecorder::new();
511        let snapshotter = recorder.snapshotter();
512        recorder.install().unwrap();
513        snapshotter
514    }
515
516    #[test]
517    fn test_executor_metrics_hook_called() {
518        let metrics = EngineApiMetrics::default();
519        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
520
521        let (tx, rx) = mpsc::channel();
522        let expected_output = 42;
523        let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
524
525        let state = EvmState::default();
526        let executor = MockExecutor::new(state);
527
528        // This will fail to create the EVM but should still call the hook
529        let _result = metrics.execute_metered::<_, EmptyDB>(
530            executor,
531            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
532            state_hook,
533        );
534
535        // Check if hook was called (it might not be if finish() fails early)
536        match rx.try_recv() {
537            Ok(actual_output) => assert_eq!(actual_output, expected_output),
538            Err(_) => {
539                // Hook wasn't called, which is expected if the mock fails early
540                // The test still validates that the code compiles and runs
541            }
542        }
543    }
544
545    #[test]
546    fn test_executor_metrics_hook_metrics_recorded() {
547        let snapshotter = setup_test_recorder();
548        let metrics = EngineApiMetrics::default();
549
550        // Pre-populate some metrics to ensure they exist
551        metrics.executor.gas_processed_total.increment(0);
552        metrics.executor.gas_per_second.set(0.0);
553        metrics.executor.gas_used_histogram.record(0.0);
554
555        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
556
557        let (tx, _rx) = mpsc::channel();
558        let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
559
560        // Create a state with some data
561        let state = {
562            let mut state = EvmState::default();
563            let storage =
564                EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
565            state.insert(
566                Default::default(),
567                Account {
568                    info: AccountInfo {
569                        balance: U256::from(100),
570                        nonce: 10,
571                        code_hash: B256::random(),
572                        code: Default::default(),
573                    },
574                    storage,
575                    status: AccountStatus::default(),
576                    transaction_id: 0,
577                },
578            );
579            state
580        };
581
582        let executor = MockExecutor::new(state);
583
584        // Execute (will fail but should still update some metrics)
585        let _result = metrics.execute_metered::<_, EmptyDB>(
586            executor,
587            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
588            state_hook,
589        );
590
591        let snapshot = snapshotter.snapshot().into_vec();
592
593        // Verify that metrics were registered
594        let mut found_metrics = false;
595        for (key, _unit, _desc, _value) in snapshot {
596            let metric_name = key.key().name();
597            if metric_name.starts_with("sync.execution") {
598                found_metrics = true;
599                break;
600            }
601        }
602
603        assert!(found_metrics, "Expected to find sync.execution metrics");
604    }
605}