1use crate::tree::MeteredStateHook;
2use alloy_consensus::transaction::TxHashRef;
3use alloy_evm::{
4 block::{BlockExecutor, ExecutableTx},
5 Evm,
6};
7use core::borrow::BorrowMut;
8use reth_errors::BlockExecutionError;
9use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
10use reth_execution_types::BlockExecutionOutput;
11use reth_metrics::{
12 metrics::{Counter, Gauge, Histogram},
13 Metrics,
14};
15use reth_primitives_traits::SignedTransaction;
16use reth_trie::updates::TrieUpdates;
17use revm::database::{states::bundle_state::BundleRetention, State};
18use std::time::Instant;
19use tracing::{debug_span, trace};
20
21#[derive(Debug, Default)]
23pub(crate) struct EngineApiMetrics {
24 pub(crate) engine: EngineMetrics,
26 pub(crate) executor: ExecutorMetrics,
28 pub(crate) block_validation: BlockValidationMetrics,
30 pub tree: TreeMetrics,
32}
33
34impl EngineApiMetrics {
35 fn metered<F, R>(&self, f: F) -> R
37 where
38 F: FnOnce() -> (u64, R),
39 {
40 let execute_start = Instant::now();
42 let (gas_used, output) = f();
43 let execution_duration = execute_start.elapsed().as_secs_f64();
44
45 self.executor.gas_processed_total.increment(gas_used);
47 self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
48 self.executor.gas_used_histogram.record(gas_used as f64);
49 self.executor.execution_histogram.record(execution_duration);
50 self.executor.execution_duration.set(execution_duration);
51
52 output
53 }
54
55 pub(crate) fn execute_metered<E, DB>(
61 &self,
62 executor: E,
63 transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
64 state_hook: Box<dyn OnStateHook>,
65 ) -> Result<BlockExecutionOutput<E::Receipt>, BlockExecutionError>
66 where
67 DB: alloy_evm::Database,
68 E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
69 {
70 let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
74
75 let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
76
77 let f = || {
78 executor.apply_pre_execution_changes()?;
79 for tx in transactions {
80 let tx = tx?;
81 let span =
82 debug_span!(target: "engine::tree", "execute_tx", tx_hash=?tx.tx().tx_hash());
83 let _enter = span.enter();
84 trace!(target: "engine::tree", "Executing transaction");
85 executor.execute_transaction(tx)?;
86 }
87 executor.finish().map(|(evm, result)| (evm.into_db(), result))
88 };
89
90 let (mut db, result) = self.metered(|| {
92 let res = f();
93 let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
94 (gas_used, res)
95 })?;
96
97 db.borrow_mut().merge_transitions(BundleRetention::Reverts);
99 let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
100
101 let accounts = output.state.state.len();
103 let storage_slots =
104 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
105 let bytecodes = output.state.contracts.len();
106
107 self.executor.accounts_updated_histogram.record(accounts as f64);
108 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
109 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
110
111 Ok(output)
112 }
113}
114
115#[derive(Metrics)]
117#[metrics(scope = "blockchain_tree")]
118pub(crate) struct TreeMetrics {
119 pub canonical_chain_height: Gauge,
121 pub reorgs: Counter,
123 pub latest_reorg_depth: Gauge,
125}
126
127#[derive(Metrics)]
129#[metrics(scope = "consensus.engine.beacon")]
130pub(crate) struct EngineMetrics {
131 pub(crate) executed_blocks: Gauge,
133 pub(crate) inserted_already_executed_blocks: Counter,
135 pub(crate) pipeline_runs: Counter,
137 pub(crate) forkchoice_updated_messages: Counter,
139 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
141 pub(crate) executed_new_block_cache_miss: Counter,
143 pub(crate) new_payload_messages: Counter,
145 pub(crate) persistence_duration: Histogram,
147 pub(crate) failed_new_payload_response_deliveries: Counter,
153 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
155 pub(crate) block_insert_total_duration: Histogram,
157}
158
159#[derive(Metrics)]
161#[metrics(scope = "sync.block_validation")]
162pub(crate) struct BlockValidationMetrics {
163 pub(crate) state_root_storage_tries_updated_total: Counter,
165 pub(crate) state_root_parallel_fallback_total: Counter,
167 pub(crate) state_root_duration: Gauge,
169 pub(crate) state_root_histogram: Histogram,
171 pub(crate) trie_input_duration: Histogram,
173 pub(crate) payload_validation_duration: Gauge,
175 pub(crate) payload_validation_histogram: Histogram,
177 pub(crate) spawn_payload_processor: Histogram,
179 pub(crate) post_execution_validation_duration: Histogram,
181 pub(crate) total_duration: Histogram,
183}
184
185impl BlockValidationMetrics {
186 pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
188 self.state_root_storage_tries_updated_total
189 .increment(trie_output.storage_tries_ref().len() as u64);
190 self.state_root_duration.set(elapsed_as_secs);
191 self.state_root_histogram.record(elapsed_as_secs);
192 }
193
194 pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
197 self.payload_validation_duration.set(elapsed_as_secs);
198 self.payload_validation_histogram.record(elapsed_as_secs);
199 }
200}
201
202#[derive(Metrics)]
204#[metrics(scope = "blockchain_tree.block_buffer")]
205pub(crate) struct BlockBufferMetrics {
206 pub blocks: Gauge,
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use alloy_eips::eip7685::Requests;
214 use alloy_evm::block::StateChangeSource;
215 use alloy_primitives::{B256, U256};
216 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
217 use reth_ethereum_primitives::{Receipt, TransactionSigned};
218 use reth_evm_ethereum::EthEvm;
219 use reth_execution_types::BlockExecutionResult;
220 use reth_primitives_traits::RecoveredBlock;
221 use revm::{
222 context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
223 database::State,
224 database_interface::EmptyDB,
225 inspector::NoOpInspector,
226 state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
227 Context, MainBuilder, MainContext,
228 };
229 use revm_primitives::Bytes;
230 use std::sync::mpsc;
231
232 struct MockExecutor {
234 state: EvmState,
235 hook: Option<Box<dyn OnStateHook>>,
236 }
237
238 impl MockExecutor {
239 fn new(state: EvmState) -> Self {
240 Self { state, hook: None }
241 }
242 }
243
244 type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
246
247 impl BlockExecutor for MockExecutor {
248 type Transaction = TransactionSigned;
249 type Receipt = Receipt;
250 type Evm = MockEvm;
251
252 fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
253 Ok(())
254 }
255
256 fn execute_transaction_without_commit(
257 &mut self,
258 _tx: impl ExecutableTx<Self>,
259 ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
260 if let Some(hook) = self.hook.as_mut() {
262 hook.on_state(StateChangeSource::Transaction(0), &self.state);
263 }
264
265 Ok(ResultAndState::new(
266 ExecutionResult::Success {
267 reason: SuccessReason::Return,
268 gas_used: 1000, gas_refunded: 0,
270 logs: vec![],
271 output: Output::Call(Bytes::from(vec![])),
272 },
273 Default::default(),
274 ))
275 }
276
277 fn commit_transaction(
278 &mut self,
279 _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
280 _tx: impl ExecutableTx<Self>,
281 ) -> Result<u64, BlockExecutionError> {
282 Ok(1000)
283 }
284
285 fn finish(
286 self,
287 ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
288 let Self { hook, state, .. } = self;
289
290 if let Some(mut hook) = hook {
292 hook.on_state(StateChangeSource::Transaction(0), &state);
293 }
294
295 let db = State::builder()
297 .with_database(EmptyDB::default())
298 .with_bundle_update()
299 .without_state_clear()
300 .build();
301 let evm = EthEvm::new(
302 Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
303 false,
304 );
305
306 Ok((
308 evm,
309 BlockExecutionResult {
310 receipts: vec![],
311 requests: Requests::default(),
312 gas_used: 1000,
313 },
314 ))
315 }
316
317 fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
318 self.hook = hook;
319 }
320
321 fn evm(&self) -> &Self::Evm {
322 panic!("Mock executor evm() not implemented")
323 }
324
325 fn evm_mut(&mut self) -> &mut Self::Evm {
326 panic!("Mock executor evm_mut() not implemented")
327 }
328 }
329
330 struct ChannelStateHook {
331 output: i32,
332 sender: mpsc::Sender<i32>,
333 }
334
335 impl OnStateHook for ChannelStateHook {
336 fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
337 let _ = self.sender.send(self.output);
338 }
339 }
340
341 fn setup_test_recorder() -> Snapshotter {
342 let recorder = DebuggingRecorder::new();
343 let snapshotter = recorder.snapshotter();
344 recorder.install().unwrap();
345 snapshotter
346 }
347
348 #[test]
349 fn test_executor_metrics_hook_called() {
350 let metrics = EngineApiMetrics::default();
351 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
352
353 let (tx, rx) = mpsc::channel();
354 let expected_output = 42;
355 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
356
357 let state = EvmState::default();
358 let executor = MockExecutor::new(state);
359
360 let _result = metrics.execute_metered::<_, EmptyDB>(
362 executor,
363 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
364 state_hook,
365 );
366
367 match rx.try_recv() {
369 Ok(actual_output) => assert_eq!(actual_output, expected_output),
370 Err(_) => {
371 }
374 }
375 }
376
377 #[test]
378 fn test_executor_metrics_hook_metrics_recorded() {
379 let snapshotter = setup_test_recorder();
380 let metrics = EngineApiMetrics::default();
381
382 metrics.executor.gas_processed_total.increment(0);
384 metrics.executor.gas_per_second.set(0.0);
385 metrics.executor.gas_used_histogram.record(0.0);
386
387 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
388
389 let (tx, _rx) = mpsc::channel();
390 let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
391
392 let state = {
394 let mut state = EvmState::default();
395 let storage =
396 EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
397 state.insert(
398 Default::default(),
399 Account {
400 info: AccountInfo {
401 balance: U256::from(100),
402 nonce: 10,
403 code_hash: B256::random(),
404 code: Default::default(),
405 },
406 storage,
407 status: AccountStatus::default(),
408 transaction_id: 0,
409 },
410 );
411 state
412 };
413
414 let executor = MockExecutor::new(state);
415
416 let _result = metrics.execute_metered::<_, EmptyDB>(
418 executor,
419 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
420 state_hook,
421 );
422
423 let snapshot = snapshotter.snapshot().into_vec();
424
425 let mut found_metrics = false;
427 for (key, _unit, _desc, _value) in snapshot {
428 let metric_name = key.key().name();
429 if metric_name.starts_with("sync.execution") {
430 found_metrics = true;
431 break;
432 }
433 }
434
435 assert!(found_metrics, "Expected to find sync.execution metrics");
436 }
437}