1use crate::tree::MeteredStateHook;
2use alloy_evm::{
3 block::{BlockExecutor, ExecutableTx},
4 Evm,
5};
6use core::borrow::BorrowMut;
7use reth_errors::BlockExecutionError;
8use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
9use reth_execution_types::BlockExecutionOutput;
10use reth_metrics::{
11 metrics::{Counter, Gauge, Histogram},
12 Metrics,
13};
14use reth_primitives_traits::SignedTransaction;
15use reth_trie::updates::TrieUpdates;
16use revm::database::{states::bundle_state::BundleRetention, State};
17use std::time::Instant;
18use tracing::{debug_span, trace};
19
20#[derive(Debug, Default)]
22pub(crate) struct EngineApiMetrics {
23 pub(crate) engine: EngineMetrics,
25 pub(crate) executor: ExecutorMetrics,
27 pub(crate) block_validation: BlockValidationMetrics,
29 pub tree: TreeMetrics,
31}
32
33impl EngineApiMetrics {
34 fn metered<F, R>(&self, f: F) -> R
36 where
37 F: FnOnce() -> (u64, R),
38 {
39 let execute_start = Instant::now();
41 let (gas_used, output) = f();
42 let execution_duration = execute_start.elapsed().as_secs_f64();
43
44 self.executor.gas_processed_total.increment(gas_used);
46 self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
47 self.executor.gas_used_histogram.record(gas_used as f64);
48 self.executor.execution_histogram.record(execution_duration);
49 self.executor.execution_duration.set(execution_duration);
50
51 output
52 }
53
54 pub(crate) fn execute_metered<E, DB>(
60 &self,
61 executor: E,
62 transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
63 state_hook: Box<dyn OnStateHook>,
64 ) -> Result<BlockExecutionOutput<E::Receipt>, BlockExecutionError>
65 where
66 DB: alloy_evm::Database,
67 E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>, Transaction: SignedTransaction>,
68 {
69 let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
73
74 let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
75
76 let f = || {
77 executor.apply_pre_execution_changes()?;
78 for tx in transactions {
79 let tx = tx?;
80 let span =
81 debug_span!(target: "engine::tree", "execute_tx", tx_hash=?tx.tx().tx_hash());
82 let _enter = span.enter();
83 trace!(target: "engine::tree", "Executing transaction");
84 executor.execute_transaction(tx)?;
85 }
86 executor.finish().map(|(evm, result)| (evm.into_db(), result))
87 };
88
89 let (mut db, result) = self.metered(|| {
91 let res = f();
92 let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
93 (gas_used, res)
94 })?;
95
96 db.borrow_mut().merge_transitions(BundleRetention::Reverts);
98 let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
99
100 let accounts = output.state.state.len();
102 let storage_slots =
103 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
104 let bytecodes = output.state.contracts.len();
105
106 self.executor.accounts_updated_histogram.record(accounts as f64);
107 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
108 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
109
110 Ok(output)
111 }
112}
113
114#[derive(Metrics)]
116#[metrics(scope = "blockchain_tree")]
117pub(crate) struct TreeMetrics {
118 pub canonical_chain_height: Gauge,
120 pub reorgs: Counter,
122 pub latest_reorg_depth: Gauge,
124}
125
126#[derive(Metrics)]
128#[metrics(scope = "consensus.engine.beacon")]
129pub(crate) struct EngineMetrics {
130 pub(crate) executed_blocks: Gauge,
132 pub(crate) inserted_already_executed_blocks: Counter,
134 pub(crate) pipeline_runs: Counter,
136 pub(crate) forkchoice_updated_messages: Counter,
138 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
140 pub(crate) executed_new_block_cache_miss: Counter,
142 pub(crate) new_payload_messages: Counter,
144 pub(crate) persistence_duration: Histogram,
146 pub(crate) failed_new_payload_response_deliveries: Counter,
152 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
154 pub(crate) block_insert_total_duration: Histogram,
156}
157
158#[derive(Metrics)]
160#[metrics(scope = "sync.block_validation")]
161pub(crate) struct BlockValidationMetrics {
162 pub(crate) state_root_storage_tries_updated_total: Counter,
164 pub(crate) state_root_parallel_fallback_total: Counter,
166 pub(crate) state_root_duration: Gauge,
168 pub(crate) state_root_histogram: Histogram,
170 pub(crate) trie_input_duration: Histogram,
172 pub(crate) payload_validation_duration: Gauge,
174 pub(crate) payload_validation_histogram: Histogram,
176 pub(crate) spawn_payload_processor: Histogram,
178 pub(crate) post_execution_validation_duration: Histogram,
180 pub(crate) total_duration: Histogram,
182}
183
184impl BlockValidationMetrics {
185 pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
187 self.state_root_storage_tries_updated_total
188 .increment(trie_output.storage_tries_ref().len() as u64);
189 self.state_root_duration.set(elapsed_as_secs);
190 self.state_root_histogram.record(elapsed_as_secs);
191 }
192
193 pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
196 self.payload_validation_duration.set(elapsed_as_secs);
197 self.payload_validation_histogram.record(elapsed_as_secs);
198 }
199}
200
201#[derive(Metrics)]
203#[metrics(scope = "blockchain_tree.block_buffer")]
204pub(crate) struct BlockBufferMetrics {
205 pub blocks: Gauge,
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use alloy_eips::eip7685::Requests;
213 use alloy_evm::block::StateChangeSource;
214 use alloy_primitives::{B256, U256};
215 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
216 use reth_ethereum_primitives::{Receipt, TransactionSigned};
217 use reth_evm_ethereum::EthEvm;
218 use reth_execution_types::BlockExecutionResult;
219 use reth_primitives_traits::RecoveredBlock;
220 use revm::{
221 context::result::{ExecutionResult, Output, ResultAndState, SuccessReason},
222 database::State,
223 database_interface::EmptyDB,
224 inspector::NoOpInspector,
225 state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
226 Context, MainBuilder, MainContext,
227 };
228 use revm_primitives::Bytes;
229 use std::sync::mpsc;
230
231 struct MockExecutor {
233 state: EvmState,
234 hook: Option<Box<dyn OnStateHook>>,
235 }
236
237 impl MockExecutor {
238 fn new(state: EvmState) -> Self {
239 Self { state, hook: None }
240 }
241 }
242
243 type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
245
246 impl BlockExecutor for MockExecutor {
247 type Transaction = TransactionSigned;
248 type Receipt = Receipt;
249 type Evm = MockEvm;
250
251 fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
252 Ok(())
253 }
254
255 fn execute_transaction_without_commit(
256 &mut self,
257 _tx: impl ExecutableTx<Self>,
258 ) -> Result<ResultAndState<<Self::Evm as Evm>::HaltReason>, BlockExecutionError> {
259 if let Some(hook) = self.hook.as_mut() {
261 hook.on_state(StateChangeSource::Transaction(0), &self.state);
262 }
263
264 Ok(ResultAndState::new(
265 ExecutionResult::Success {
266 reason: SuccessReason::Return,
267 gas_used: 1000, gas_refunded: 0,
269 logs: vec![],
270 output: Output::Call(Bytes::from(vec![])),
271 },
272 Default::default(),
273 ))
274 }
275
276 fn commit_transaction(
277 &mut self,
278 _output: ResultAndState<<Self::Evm as Evm>::HaltReason>,
279 _tx: impl ExecutableTx<Self>,
280 ) -> Result<u64, BlockExecutionError> {
281 Ok(1000)
282 }
283
284 fn finish(
285 self,
286 ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
287 let Self { hook, state, .. } = self;
288
289 if let Some(mut hook) = hook {
291 hook.on_state(StateChangeSource::Transaction(0), &state);
292 }
293
294 let db = State::builder()
296 .with_database(EmptyDB::default())
297 .with_bundle_update()
298 .without_state_clear()
299 .build();
300 let evm = EthEvm::new(
301 Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
302 false,
303 );
304
305 Ok((
307 evm,
308 BlockExecutionResult {
309 receipts: vec![],
310 requests: Requests::default(),
311 gas_used: 1000,
312 },
313 ))
314 }
315
316 fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
317 self.hook = hook;
318 }
319
320 fn evm(&self) -> &Self::Evm {
321 panic!("Mock executor evm() not implemented")
322 }
323
324 fn evm_mut(&mut self) -> &mut Self::Evm {
325 panic!("Mock executor evm_mut() not implemented")
326 }
327 }
328
329 struct ChannelStateHook {
330 output: i32,
331 sender: mpsc::Sender<i32>,
332 }
333
334 impl OnStateHook for ChannelStateHook {
335 fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
336 let _ = self.sender.send(self.output);
337 }
338 }
339
340 fn setup_test_recorder() -> Snapshotter {
341 let recorder = DebuggingRecorder::new();
342 let snapshotter = recorder.snapshotter();
343 recorder.install().unwrap();
344 snapshotter
345 }
346
347 #[test]
348 fn test_executor_metrics_hook_called() {
349 let metrics = EngineApiMetrics::default();
350 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
351
352 let (tx, rx) = mpsc::channel();
353 let expected_output = 42;
354 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
355
356 let state = EvmState::default();
357 let executor = MockExecutor::new(state);
358
359 let _result = metrics.execute_metered::<_, EmptyDB>(
361 executor,
362 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
363 state_hook,
364 );
365
366 match rx.try_recv() {
368 Ok(actual_output) => assert_eq!(actual_output, expected_output),
369 Err(_) => {
370 }
373 }
374 }
375
376 #[test]
377 fn test_executor_metrics_hook_metrics_recorded() {
378 let snapshotter = setup_test_recorder();
379 let metrics = EngineApiMetrics::default();
380
381 metrics.executor.gas_processed_total.increment(0);
383 metrics.executor.gas_per_second.set(0.0);
384 metrics.executor.gas_used_histogram.record(0.0);
385
386 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
387
388 let (tx, _rx) = mpsc::channel();
389 let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
390
391 let state = {
393 let mut state = EvmState::default();
394 let storage =
395 EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
396 state.insert(
397 Default::default(),
398 Account {
399 info: AccountInfo {
400 balance: U256::from(100),
401 nonce: 10,
402 code_hash: B256::random(),
403 code: Default::default(),
404 },
405 storage,
406 status: AccountStatus::default(),
407 transaction_id: 0,
408 },
409 );
410 state
411 };
412
413 let executor = MockExecutor::new(state);
414
415 let _result = metrics.execute_metered::<_, EmptyDB>(
417 executor,
418 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
419 state_hook,
420 );
421
422 let snapshot = snapshotter.snapshot().into_vec();
423
424 let mut found_metrics = false;
426 for (key, _unit, _desc, _value) in snapshot {
427 let metric_name = key.key().name();
428 if metric_name.starts_with("sync.execution") {
429 found_metrics = true;
430 break;
431 }
432 }
433
434 assert!(found_metrics, "Expected to find sync.execution metrics");
435 }
436}