1use crate::tree::MeteredStateHook;
2use alloy_consensus::transaction::TxHashRef;
3use alloy_evm::{
4    block::{BlockExecutor, ExecutableTx},
5    Evm,
6};
7use core::borrow::BorrowMut;
8use reth_errors::BlockExecutionError;
9use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
10use reth_execution_types::BlockExecutionOutput;
11use reth_metrics::{
12    metrics::{Counter, Gauge, Histogram},
13    Metrics,
14};
15use reth_primitives_traits::SignedTransaction;
16use reth_trie::updates::TrieUpdates;
17use revm::database::{states::bundle_state::BundleRetention, State};
18use std::time::Instant;
19use tracing::{debug_span, trace};
20
21#[derive(Debug, Default)]
23pub(crate) struct EngineApiMetrics {
24    pub(crate) engine: EngineMetrics,
26    pub(crate) executor: ExecutorMetrics,
28    pub(crate) block_validation: BlockValidationMetrics,
30    pub tree: TreeMetrics,
32}
33
34impl EngineApiMetrics {
35    fn metered<F, R>(&self, f: F) -> R
37    where
38        F: FnOnce() -> (u64, R),
39    {
40        let execute_start = Instant::now();
42        let (gas_used, output) = f();
43        let execution_duration = execute_start.elapsed().as_secs_f64();
44
45        self.executor.gas_processed_total.increment(gas_used);
47        self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
48        self.executor.gas_used_histogram.record(gas_used as f64);
49        self.executor.execution_histogram.record(execution_duration);
50        self.executor.execution_duration.set(execution_duration);
51
52        output
53    }
54
55    pub(crate) fn execute_metered<E, DB>(
61        &self,
62        executor: E,
63        transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
64        state_hook: Box<dyn OnStateHook>,
65    ) -> Result<BlockExecutionOutput<E::Receipt>, BlockExecutionError>
66    where
67        DB: alloy_evm::Database,
68        E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
69    {
70        let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
74
75        let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
76
77        let f = || {
78            executor.apply_pre_execution_changes()?;
79            for tx in transactions {
80                let tx = tx?;
81                let span =
82                    debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
83                let _enter = span.enter();
84                trace!(target: "engine::tree", "Executing transaction");
85                executor.execute_transaction(tx)?;
86            }
87            executor.finish().map(|(evm, result)| (evm.into_db(), result))
88        };
89
90        let (mut db, result) = self.metered(|| {
92            let res = f();
93            let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
94            (gas_used, res)
95        })?;
96
97        db.borrow_mut().merge_transitions(BundleRetention::Reverts);
99        let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
100
101        let accounts = output.state.state.len();
103        let storage_slots =
104            output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
105        let bytecodes = output.state.contracts.len();
106
107        self.executor.accounts_updated_histogram.record(accounts as f64);
108        self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
109        self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
110
111        Ok(output)
112    }
113}
114
115#[derive(Metrics)]
117#[metrics(scope = "blockchain_tree")]
118pub(crate) struct TreeMetrics {
119    pub canonical_chain_height: Gauge,
121    pub reorgs: Counter,
123    pub latest_reorg_depth: Gauge,
125    pub safe_block_height: Gauge,
127    pub finalized_block_height: Gauge,
129}
130
131#[derive(Metrics)]
133#[metrics(scope = "consensus.engine.beacon")]
134pub(crate) struct EngineMetrics {
135    pub(crate) executed_blocks: Gauge,
137    pub(crate) inserted_already_executed_blocks: Counter,
139    pub(crate) pipeline_runs: Counter,
141    pub(crate) forkchoice_updated_messages: Counter,
143    pub(crate) forkchoice_with_attributes_updated_messages: Counter,
145    pub(crate) executed_new_block_cache_miss: Counter,
147    pub(crate) new_payload_messages: Counter,
149    pub(crate) persistence_duration: Histogram,
151    pub(crate) failed_new_payload_response_deliveries: Counter,
157    pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
159    pub(crate) block_insert_total_duration: Histogram,
161}
162
163#[derive(Metrics)]
165#[metrics(scope = "sync.block_validation")]
166pub(crate) struct BlockValidationMetrics {
167    pub(crate) state_root_storage_tries_updated_total: Counter,
169    pub(crate) state_root_parallel_fallback_total: Counter,
171    pub(crate) state_root_duration: Gauge,
173    pub(crate) state_root_histogram: Histogram,
175    pub(crate) trie_input_duration: Histogram,
177    pub(crate) payload_validation_duration: Gauge,
179    pub(crate) payload_validation_histogram: Histogram,
181    pub(crate) spawn_payload_processor: Histogram,
183    pub(crate) post_execution_validation_duration: Histogram,
185    pub(crate) total_duration: Histogram,
187}
188
189impl BlockValidationMetrics {
190    pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
192        self.state_root_storage_tries_updated_total
193            .increment(trie_output.storage_tries_ref().len() as u64);
194        self.state_root_duration.set(elapsed_as_secs);
195        self.state_root_histogram.record(elapsed_as_secs);
196    }
197
198    pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
201        self.payload_validation_duration.set(elapsed_as_secs);
202        self.payload_validation_histogram.record(elapsed_as_secs);
203    }
204}
205
206#[derive(Metrics)]
208#[metrics(scope = "blockchain_tree.block_buffer")]
209pub(crate) struct BlockBufferMetrics {
210    pub blocks: Gauge,
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use alloy_eips::eip7685::Requests;
218    use alloy_evm::block::StateChangeSource;
219    use alloy_primitives::{B256, U256};
220    use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
221    use reth_ethereum_primitives::{Receipt, TransactionSigned};
222    use reth_evm_ethereum::EthEvm;
223    use reth_execution_types::BlockExecutionResult;
224    use reth_primitives_traits::RecoveredBlock;
225    use revm::{
226        context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
227        database::State,
228        database_interface::EmptyDB,
229        inspector::NoOpInspector,
230        state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
231        Context, MainBuilder, MainContext,
232    };
233    use revm_primitives::Bytes;
234    use std::sync::mpsc;
235
236    struct MockExecutor {
238        state: EvmState,
239        hook: Option<Box<dyn OnStateHook>>,
240    }
241
242    impl MockExecutor {
243        fn new(state: EvmState) -> Self {
244            Self { state, hook: None }
245        }
246    }
247
248    type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
250
251    impl BlockExecutor for MockExecutor {
252        type Transaction = TransactionSigned;
253        type Receipt = Receipt;
254        type Evm = MockEvm;
255
256        fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
257            Ok(())
258        }
259
260        fn execute_transaction_without_commit(
261            &mut self,
262            _tx: impl ExecutableTx<Self>,
263        ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
264            if let Some(hook) = self.hook.as_mut() {
266                hook.on_state(StateChangeSource::Transaction(0), &self.state);
267            }
268
269            Ok(ResultAndState::new(
270                ExecutionResult::Success {
271                    reason: SuccessReason::Return,
272                    gas_used: 1000, gas_refunded: 0,
274                    logs: vec![],
275                    output: Output::Call(Bytes::from(vec![])),
276                },
277                Default::default(),
278            ))
279        }
280
281        fn commit_transaction(
282            &mut self,
283            _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
284            _tx: impl ExecutableTx<Self>,
285        ) -> Result<u64, BlockExecutionError> {
286            Ok(1000)
287        }
288
289        fn finish(
290            self,
291        ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
292            let Self { hook, state, .. } = self;
293
294            if let Some(mut hook) = hook {
296                hook.on_state(StateChangeSource::Transaction(0), &state);
297            }
298
299            let db = State::builder()
301                .with_database(EmptyDB::default())
302                .with_bundle_update()
303                .without_state_clear()
304                .build();
305            let evm = EthEvm::new(
306                Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
307                false,
308            );
309
310            Ok((
312                evm,
313                BlockExecutionResult {
314                    receipts: vec![],
315                    requests: Requests::default(),
316                    gas_used: 1000,
317                    blob_gas_used: 0,
318                },
319            ))
320        }
321
322        fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
323            self.hook = hook;
324        }
325
326        fn evm(&self) -> &Self::Evm {
327            panic!("Mock executor evm() not implemented")
328        }
329
330        fn evm_mut(&mut self) -> &mut Self::Evm {
331            panic!("Mock executor evm_mut() not implemented")
332        }
333    }
334
335    struct ChannelStateHook {
336        output: i32,
337        sender: mpsc::Sender<i32>,
338    }
339
340    impl OnStateHook for ChannelStateHook {
341        fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
342            let _ = self.sender.send(self.output);
343        }
344    }
345
346    fn setup_test_recorder() -> Snapshotter {
347        let recorder = DebuggingRecorder::new();
348        let snapshotter = recorder.snapshotter();
349        recorder.install().unwrap();
350        snapshotter
351    }
352
353    #[test]
354    fn test_executor_metrics_hook_called() {
355        let metrics = EngineApiMetrics::default();
356        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
357
358        let (tx, rx) = mpsc::channel();
359        let expected_output = 42;
360        let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
361
362        let state = EvmState::default();
363        let executor = MockExecutor::new(state);
364
365        let _result = metrics.execute_metered::<_, EmptyDB>(
367            executor,
368            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
369            state_hook,
370        );
371
372        match rx.try_recv() {
374            Ok(actual_output) => assert_eq!(actual_output, expected_output),
375            Err(_) => {
376                }
379        }
380    }
381
382    #[test]
383    fn test_executor_metrics_hook_metrics_recorded() {
384        let snapshotter = setup_test_recorder();
385        let metrics = EngineApiMetrics::default();
386
387        metrics.executor.gas_processed_total.increment(0);
389        metrics.executor.gas_per_second.set(0.0);
390        metrics.executor.gas_used_histogram.record(0.0);
391
392        let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
393
394        let (tx, _rx) = mpsc::channel();
395        let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
396
397        let state = {
399            let mut state = EvmState::default();
400            let storage =
401                EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
402            state.insert(
403                Default::default(),
404                Account {
405                    info: AccountInfo {
406                        balance: U256::from(100),
407                        nonce: 10,
408                        code_hash: B256::random(),
409                        code: Default::default(),
410                    },
411                    storage,
412                    status: AccountStatus::default(),
413                    transaction_id: 0,
414                },
415            );
416            state
417        };
418
419        let executor = MockExecutor::new(state);
420
421        let _result = metrics.execute_metered::<_, EmptyDB>(
423            executor,
424            input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
425            state_hook,
426        );
427
428        let snapshot = snapshotter.snapshot().into_vec();
429
430        let mut found_metrics = false;
432        for (key, _unit, _desc, _value) in snapshot {
433            let metric_name = key.key().name();
434            if metric_name.starts_with("sync.execution") {
435                found_metrics = true;
436                break;
437            }
438        }
439
440        assert!(found_metrics, "Expected to find sync.execution metrics");
441    }
442}