1use crate::tree::MeteredStateHook;
2use alloy_evm::{
3 block::{BlockExecutor, ExecutableTx},
4 Evm,
5};
6use core::borrow::BorrowMut;
7use reth_errors::BlockExecutionError;
8use reth_evm::{metrics::ExecutorMetrics, OnStateHook};
9use reth_execution_types::BlockExecutionOutput;
10use reth_metrics::{
11 metrics::{Counter, Gauge, Histogram},
12 Metrics,
13};
14use reth_trie::updates::TrieUpdates;
15use revm::database::{states::bundle_state::BundleRetention, State};
16use std::time::Instant;
17
18#[derive(Debug, Default)]
20pub(crate) struct EngineApiMetrics {
21 pub(crate) engine: EngineMetrics,
23 pub(crate) executor: ExecutorMetrics,
25 pub(crate) block_validation: BlockValidationMetrics,
27 pub tree: TreeMetrics,
29}
30
31impl EngineApiMetrics {
32 fn metered<F, R>(&self, f: F) -> R
34 where
35 F: FnOnce() -> (u64, R),
36 {
37 let execute_start = Instant::now();
39 let (gas_used, output) = f();
40 let execution_duration = execute_start.elapsed().as_secs_f64();
41
42 self.executor.gas_processed_total.increment(gas_used);
44 self.executor.gas_per_second.set(gas_used as f64 / execution_duration);
45 self.executor.gas_used_histogram.record(gas_used as f64);
46 self.executor.execution_histogram.record(execution_duration);
47 self.executor.execution_duration.set(execution_duration);
48
49 output
50 }
51
52 pub(crate) fn execute_metered<E, DB>(
58 &self,
59 executor: E,
60 transactions: impl Iterator<Item = Result<impl ExecutableTx<E>, BlockExecutionError>>,
61 state_hook: Box<dyn OnStateHook>,
62 ) -> Result<BlockExecutionOutput<E::Receipt>, BlockExecutionError>
63 where
64 DB: alloy_evm::Database,
65 E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>>,
66 {
67 let wrapper = MeteredStateHook { metrics: self.executor.clone(), inner_hook: state_hook };
71
72 let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
73
74 let f = || {
75 executor.apply_pre_execution_changes()?;
76 for tx in transactions {
77 executor.execute_transaction(tx?)?;
78 }
79 executor.finish().map(|(evm, result)| (evm.into_db(), result))
80 };
81
82 let (mut db, result) = self.metered(|| {
84 let res = f();
85 let gas_used = res.as_ref().map(|r| r.1.gas_used).unwrap_or(0);
86 (gas_used, res)
87 })?;
88
89 db.borrow_mut().merge_transitions(BundleRetention::Reverts);
91 let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
92
93 let accounts = output.state.state.len();
95 let storage_slots =
96 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
97 let bytecodes = output.state.contracts.len();
98
99 self.executor.accounts_updated_histogram.record(accounts as f64);
100 self.executor.storage_slots_updated_histogram.record(storage_slots as f64);
101 self.executor.bytecodes_updated_histogram.record(bytecodes as f64);
102
103 Ok(output)
104 }
105}
106
107#[derive(Metrics)]
109#[metrics(scope = "blockchain_tree")]
110pub(crate) struct TreeMetrics {
111 pub canonical_chain_height: Gauge,
113 pub reorgs: Counter,
115 pub latest_reorg_depth: Gauge,
117}
118
119#[derive(Metrics)]
121#[metrics(scope = "consensus.engine.beacon")]
122pub(crate) struct EngineMetrics {
123 pub(crate) executed_blocks: Gauge,
125 pub(crate) inserted_already_executed_blocks: Counter,
127 pub(crate) pipeline_runs: Counter,
129 pub(crate) forkchoice_updated_messages: Counter,
131 pub(crate) forkchoice_with_attributes_updated_messages: Counter,
133 pub(crate) executed_new_block_cache_miss: Counter,
135 pub(crate) new_payload_messages: Counter,
137 pub(crate) persistence_duration: Histogram,
139 pub(crate) failed_new_payload_response_deliveries: Counter,
145 pub(crate) failed_forkchoice_updated_response_deliveries: Counter,
147 pub(crate) block_insert_total_duration: Histogram,
149}
150
151#[derive(Metrics)]
153#[metrics(scope = "sync.block_validation")]
154pub(crate) struct BlockValidationMetrics {
155 pub(crate) state_root_storage_tries_updated_total: Counter,
157 pub(crate) state_root_parallel_fallback_total: Counter,
159 pub(crate) state_root_histogram: Histogram,
161 pub(crate) state_root_duration: Gauge,
163 pub(crate) trie_input_duration: Histogram,
165 pub(crate) payload_validation_duration: Gauge,
167 pub(crate) payload_validation_histogram: Histogram,
169}
170
171impl BlockValidationMetrics {
172 pub(crate) fn record_state_root(&self, trie_output: &TrieUpdates, elapsed_as_secs: f64) {
174 self.state_root_storage_tries_updated_total
175 .increment(trie_output.storage_tries_ref().len() as u64);
176 self.state_root_duration.set(elapsed_as_secs);
177 self.state_root_histogram.record(elapsed_as_secs);
178 }
179
180 pub(crate) fn record_payload_validation(&self, elapsed_as_secs: f64) {
183 self.payload_validation_duration.set(elapsed_as_secs);
184 self.payload_validation_histogram.record(elapsed_as_secs);
185 }
186}
187
188#[derive(Metrics)]
190#[metrics(scope = "blockchain_tree.block_buffer")]
191pub(crate) struct BlockBufferMetrics {
192 pub blocks: Gauge,
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use alloy_eips::eip7685::Requests;
200 use alloy_evm::block::{CommitChanges, StateChangeSource};
201 use alloy_primitives::{B256, U256};
202 use metrics_util::debugging::{DebuggingRecorder, Snapshotter};
203 use reth_ethereum_primitives::{Receipt, TransactionSigned};
204 use reth_evm_ethereum::EthEvm;
205 use reth_execution_types::BlockExecutionResult;
206 use reth_primitives_traits::RecoveredBlock;
207 use revm::{
208 context::result::ExecutionResult,
209 database::State,
210 database_interface::EmptyDB,
211 inspector::NoOpInspector,
212 state::{Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot},
213 Context, MainBuilder, MainContext,
214 };
215 use std::sync::mpsc;
216
217 struct MockExecutor {
219 state: EvmState,
220 hook: Option<Box<dyn OnStateHook>>,
221 }
222
223 impl MockExecutor {
224 fn new(state: EvmState) -> Self {
225 Self { state, hook: None }
226 }
227 }
228
229 type MockEvm = EthEvm<State<EmptyDB>, NoOpInspector>;
231
232 impl BlockExecutor for MockExecutor {
233 type Transaction = TransactionSigned;
234 type Receipt = Receipt;
235 type Evm = MockEvm;
236
237 fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
238 Ok(())
239 }
240
241 fn execute_transaction_with_commit_condition(
242 &mut self,
243 _tx: impl alloy_evm::block::ExecutableTx<Self>,
244 _f: impl FnOnce(&ExecutionResult<<Self::Evm as Evm>::HaltReason>) -> CommitChanges,
245 ) -> Result<Option<u64>, BlockExecutionError> {
246 if let Some(hook) = self.hook.as_mut() {
248 hook.on_state(StateChangeSource::Transaction(0), &self.state);
249 }
250 Ok(Some(1000)) }
252
253 fn finish(
254 self,
255 ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
256 let Self { hook, state, .. } = self;
257
258 if let Some(mut hook) = hook {
260 hook.on_state(StateChangeSource::Transaction(0), &state);
261 }
262
263 let db = State::builder()
265 .with_database(EmptyDB::default())
266 .with_bundle_update()
267 .without_state_clear()
268 .build();
269 let evm = EthEvm::new(
270 Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
271 false,
272 );
273
274 Ok((
276 evm,
277 BlockExecutionResult {
278 receipts: vec![],
279 requests: Requests::default(),
280 gas_used: 1000,
281 },
282 ))
283 }
284
285 fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
286 self.hook = hook;
287 }
288
289 fn evm(&self) -> &Self::Evm {
290 panic!("Mock executor evm() not implemented")
291 }
292
293 fn evm_mut(&mut self) -> &mut Self::Evm {
294 panic!("Mock executor evm_mut() not implemented")
295 }
296 }
297
298 struct ChannelStateHook {
299 output: i32,
300 sender: mpsc::Sender<i32>,
301 }
302
303 impl OnStateHook for ChannelStateHook {
304 fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
305 let _ = self.sender.send(self.output);
306 }
307 }
308
309 fn setup_test_recorder() -> Snapshotter {
310 let recorder = DebuggingRecorder::new();
311 let snapshotter = recorder.snapshotter();
312 recorder.install().unwrap();
313 snapshotter
314 }
315
316 #[test]
317 fn test_executor_metrics_hook_called() {
318 let metrics = EngineApiMetrics::default();
319 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
320
321 let (tx, rx) = mpsc::channel();
322 let expected_output = 42;
323 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
324
325 let state = EvmState::default();
326 let executor = MockExecutor::new(state);
327
328 let _result = metrics.execute_metered::<_, EmptyDB>(
330 executor,
331 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
332 state_hook,
333 );
334
335 match rx.try_recv() {
337 Ok(actual_output) => assert_eq!(actual_output, expected_output),
338 Err(_) => {
339 }
342 }
343 }
344
345 #[test]
346 fn test_executor_metrics_hook_metrics_recorded() {
347 let snapshotter = setup_test_recorder();
348 let metrics = EngineApiMetrics::default();
349
350 metrics.executor.gas_processed_total.increment(0);
352 metrics.executor.gas_per_second.set(0.0);
353 metrics.executor.gas_used_histogram.record(0.0);
354
355 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
356
357 let (tx, _rx) = mpsc::channel();
358 let state_hook = Box::new(ChannelStateHook { sender: tx, output: 42 });
359
360 let state = {
362 let mut state = EvmState::default();
363 let storage =
364 EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2), 0))]);
365 state.insert(
366 Default::default(),
367 Account {
368 info: AccountInfo {
369 balance: U256::from(100),
370 nonce: 10,
371 code_hash: B256::random(),
372 code: Default::default(),
373 },
374 storage,
375 status: AccountStatus::default(),
376 transaction_id: 0,
377 },
378 );
379 state
380 };
381
382 let executor = MockExecutor::new(state);
383
384 let _result = metrics.execute_metered::<_, EmptyDB>(
386 executor,
387 input.clone_transactions_recovered().map(Ok::<_, BlockExecutionError>),
388 state_hook,
389 );
390
391 let snapshot = snapshotter.snapshot().into_vec();
392
393 let mut found_metrics = false;
395 for (key, _unit, _desc, _value) in snapshot {
396 let metric_name = key.key().name();
397 if metric_name.starts_with("sync.execution") {
398 found_metrics = true;
399 break;
400 }
401 }
402
403 assert!(found_metrics, "Expected to find sync.execution metrics");
404 }
405}