1use crate::{execute::Executor, Database, OnStateHook};
6use alloy_consensus::BlockHeader;
7use alloy_evm::block::StateChangeSource;
8use metrics::{Counter, Gauge, Histogram};
9use reth_execution_types::BlockExecutionOutput;
10use reth_metrics::Metrics;
11use reth_primitives_traits::{NodePrimitives, RecoveredBlock};
12use revm::state::EvmState;
13use std::time::Instant;
14
15struct MeteredStateHook {
17 metrics: ExecutorMetrics,
18 inner_hook: Box<dyn OnStateHook>,
19}
20
21impl OnStateHook for MeteredStateHook {
22 fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
23 let accounts = state.keys().len();
25 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
26 let bytecodes = state
27 .values()
28 .filter(|account| !account.info.is_empty_code_hash())
29 .collect::<Vec<_>>()
30 .len();
31
32 self.metrics.accounts_loaded_histogram.record(accounts as f64);
33 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
34 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
35
36 self.inner_hook.on_state(source, state);
38 }
39}
40
41#[derive(Metrics, Clone)]
44#[metrics(scope = "sync.execution")]
45pub struct ExecutorMetrics {
46 pub gas_processed_total: Counter,
48 pub gas_per_second: Gauge,
50
51 pub execution_histogram: Histogram,
53 pub execution_duration: Gauge,
55
56 pub accounts_loaded_histogram: Histogram,
58 pub storage_slots_loaded_histogram: Histogram,
60 pub bytecodes_loaded_histogram: Histogram,
62
63 pub accounts_updated_histogram: Histogram,
65 pub storage_slots_updated_histogram: Histogram,
67 pub bytecodes_updated_histogram: Histogram,
69}
70
71impl ExecutorMetrics {
72 fn metered<F, R, B>(&self, block: &RecoveredBlock<B>, f: F) -> R
73 where
74 F: FnOnce() -> R,
75 B: reth_primitives_traits::Block,
76 {
77 let execute_start = Instant::now();
79 let output = f();
80 let execution_duration = execute_start.elapsed().as_secs_f64();
81
82 self.gas_processed_total.increment(block.header().gas_used());
84 self.gas_per_second.set(block.header().gas_used() as f64 / execution_duration);
85 self.execution_histogram.record(execution_duration);
86 self.execution_duration.set(execution_duration);
87
88 output
89 }
90
91 pub fn execute_metered<E, DB>(
99 &self,
100 executor: E,
101 input: &RecoveredBlock<<E::Primitives as NodePrimitives>::Block>,
102 state_hook: Box<dyn OnStateHook>,
103 ) -> Result<BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>, E::Error>
104 where
105 DB: Database,
106 E: Executor<DB>,
107 {
108 let wrapper = MeteredStateHook { metrics: self.clone(), inner_hook: state_hook };
112
113 let output = self.metered(input, || executor.execute_with_state_hook(input, wrapper))?;
115
116 let accounts = output.state.state.len();
118 let storage_slots =
119 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
120 let bytecodes = output.state.contracts.len();
121
122 self.accounts_updated_histogram.record(accounts as f64);
123 self.storage_slots_updated_histogram.record(storage_slots as f64);
124 self.bytecodes_updated_histogram.record(bytecodes as f64);
125
126 Ok(output)
127 }
128
129 pub fn metered_one<F, R, B>(&self, input: &RecoveredBlock<B>, f: F) -> R
131 where
132 F: FnOnce(&RecoveredBlock<B>) -> R,
133 B: reth_primitives_traits::Block,
134 {
135 self.metered(input, || f(input))
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use alloy_eips::eip7685::Requests;
143 use alloy_primitives::{B256, U256};
144 use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
145 use reth_ethereum_primitives::EthPrimitives;
146 use reth_execution_types::BlockExecutionResult;
147 use revm::{
148 database_interface::EmptyDB,
149 state::{Account, AccountInfo, AccountStatus, EvmStorage, EvmStorageSlot},
150 };
151 use revm_database::State;
152 use std::sync::mpsc;
153
154 struct MockExecutor {
156 state: EvmState,
157 }
158
159 impl<DB: Database + Default> Executor<DB> for MockExecutor {
160 type Primitives = EthPrimitives;
161 type Error = std::convert::Infallible;
162
163 fn execute_one(
164 &mut self,
165 _block: &RecoveredBlock<<Self::Primitives as NodePrimitives>::Block>,
166 ) -> Result<BlockExecutionResult<<Self::Primitives as NodePrimitives>::Receipt>, Self::Error>
167 {
168 Ok(BlockExecutionResult {
169 receipts: vec![],
170 requests: Requests::default(),
171 gas_used: 0,
172 })
173 }
174
175 fn execute_one_with_state_hook<F>(
176 &mut self,
177 _block: &RecoveredBlock<<Self::Primitives as NodePrimitives>::Block>,
178 mut hook: F,
179 ) -> Result<BlockExecutionResult<<Self::Primitives as NodePrimitives>::Receipt>, Self::Error>
180 where
181 F: OnStateHook + 'static,
182 {
183 hook.on_state(StateChangeSource::Transaction(0), &self.state);
185
186 Ok(BlockExecutionResult {
187 receipts: vec![],
188 requests: Requests::default(),
189 gas_used: 0,
190 })
191 }
192
193 fn into_state(self) -> revm_database::State<DB> {
194 State::builder().with_database(Default::default()).build()
195 }
196
197 fn size_hint(&self) -> usize {
198 0
199 }
200 }
201
202 struct ChannelStateHook {
203 output: i32,
204 sender: mpsc::Sender<i32>,
205 }
206
207 impl OnStateHook for ChannelStateHook {
208 fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
209 let _ = self.sender.send(self.output);
210 }
211 }
212
213 fn setup_test_recorder() -> Snapshotter {
214 let recorder = DebuggingRecorder::new();
215 let snapshotter = recorder.snapshotter();
216 recorder.install().unwrap();
217 snapshotter
218 }
219
220 #[test]
221 fn test_executor_metrics_hook_metrics_recorded() {
222 let snapshotter = setup_test_recorder();
223 let metrics = ExecutorMetrics::default();
224 let input = RecoveredBlock::default();
225
226 let (tx, _rx) = mpsc::channel();
227 let expected_output = 42;
228 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
229
230 let state = {
231 let mut state = EvmState::default();
232 let storage =
233 EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2)))]);
234 state.insert(
235 Default::default(),
236 Account {
237 info: AccountInfo {
238 balance: U256::from(100),
239 nonce: 10,
240 code_hash: B256::random(),
241 code: Default::default(),
242 },
243 storage,
244 status: AccountStatus::Loaded,
245 },
246 );
247 state
248 };
249 let executor = MockExecutor { state };
250 let _result = metrics.execute_metered::<_, EmptyDB>(executor, &input, state_hook).unwrap();
251
252 let snapshot = snapshotter.snapshot().into_vec();
253
254 for metric in snapshot {
255 let metric_name = metric.0.key().name();
256 if metric_name == "sync.execution.accounts_loaded_histogram" ||
257 metric_name == "sync.execution.storage_slots_loaded_histogram" ||
258 metric_name == "sync.execution.bytecodes_loaded_histogram"
259 {
260 if let DebugValue::Histogram(vs) = metric.3 {
261 assert!(
262 vs.iter().any(|v| v.into_inner() > 0.0),
263 "metric {metric_name} not recorded"
264 );
265 }
266 }
267 }
268 }
269
270 #[test]
271 fn test_executor_metrics_hook_called() {
272 let metrics = ExecutorMetrics::default();
273 let input = RecoveredBlock::default();
274
275 let (tx, rx) = mpsc::channel();
276 let expected_output = 42;
277 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
278
279 let state = EvmState::default();
280
281 let executor = MockExecutor { state };
282 let _result = metrics.execute_metered::<_, EmptyDB>(executor, &input, state_hook).unwrap();
283
284 let actual_output = rx.try_recv().unwrap();
285 assert_eq!(actual_output, expected_output);
286 }
287}