1use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode, FastInstant as Instant};
7use reth_provider::{
8 AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9 StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12 updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13 MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16 sync::{
17 atomic::{AtomicU64, AtomicUsize, Ordering},
18 Arc,
19 },
20 time::Duration,
21};
22
23const NANOS_PER_SEC: u32 = 1_000_000_000;
25
26#[derive(Debug, Default)]
29pub(crate) struct AtomicDuration {
30 nanos: AtomicU64,
36}
37
38impl AtomicDuration {
39 pub(crate) fn duration(&self) -> Duration {
41 let nanos = self.nanos.load(Ordering::Relaxed);
42 let seconds = nanos / NANOS_PER_SEC as u64;
43 let nanos = nanos % NANOS_PER_SEC as u64;
44 Duration::new(seconds, nanos as u32)
46 }
47
48 pub(crate) fn add_duration(&self, duration: Duration) {
50 let total_nanos =
53 duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
54 self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
56 }
57}
58
59#[derive(Debug)]
61pub struct InstrumentedStateProvider<S> {
62 state_provider: S,
64 metrics: StateProviderMetrics,
66 stats: Arc<StateProviderStats>,
68}
69
70impl<S> InstrumentedStateProvider<S>
71where
72 S: StateProvider,
73{
74 pub fn new(state_provider: S, source: &'static str) -> Self {
77 Self::with_stats(
78 state_provider,
79 StateProviderMetrics::with_source(source),
80 Arc::new(StateProviderStats::default()),
81 )
82 }
83
84 pub(crate) const fn with_stats(
86 state_provider: S,
87 metrics: StateProviderMetrics,
88 stats: Arc<StateProviderStats>,
89 ) -> Self {
90 Self { state_provider, metrics, stats }
91 }
92
93 pub fn stats(&self) -> Arc<StateProviderStats> {
95 Arc::clone(&self.stats)
96 }
97}
98
99#[derive(Metrics, Clone)]
101#[metrics(scope = "sync.state_provider")]
102pub(crate) struct StateProviderMetrics {
103 storage_fetch_latency: Histogram,
105
106 code_fetch_latency: Histogram,
108
109 account_fetch_latency: Histogram,
111
112 total_storage_fetch_latency: Histogram,
115
116 total_storage_fetch_latency_gauge: Gauge,
119
120 total_code_fetch_latency: Histogram,
123
124 total_code_fetch_latency_gauge: Gauge,
126
127 total_account_fetch_latency: Histogram,
130
131 total_account_fetch_latency_gauge: Gauge,
134}
135
136impl StateProviderMetrics {
137 pub(crate) fn with_source(source: &'static str) -> Self {
139 Self::new_with_labels(&[("source", source)])
140 }
141
142 pub(crate) fn record_totals(&self, stats: &StateProviderStats) {
144 let total_storage_fetch_latency = stats.total_storage_fetch_latency.duration();
145 self.total_storage_fetch_latency.record(total_storage_fetch_latency);
146 self.total_storage_fetch_latency_gauge.set(total_storage_fetch_latency.as_secs_f64());
147
148 let total_code_fetch_latency = stats.total_code_fetch_latency.duration();
149 self.total_code_fetch_latency.record(total_code_fetch_latency);
150 self.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
151
152 let total_account_fetch_latency = stats.total_account_fetch_latency.duration();
153 self.total_account_fetch_latency.record(total_account_fetch_latency);
154 self.total_account_fetch_latency_gauge.set(total_account_fetch_latency.as_secs_f64());
155 }
156}
157
158impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
159 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
160 let start = Instant::now();
161 let res = self.state_provider.basic_account(address);
162 let elapsed = start.elapsed();
163 self.metrics.account_fetch_latency.record(elapsed);
164 self.stats.total_account_fetches.fetch_add(1, Ordering::Relaxed);
165 self.stats.total_account_fetch_latency.add_duration(elapsed);
166 res
167 }
168}
169
170impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
171 fn storage(
172 &self,
173 account: Address,
174 storage_key: StorageKey,
175 ) -> ProviderResult<Option<StorageValue>> {
176 let start = Instant::now();
177 let res = self.state_provider.storage(account, storage_key);
178 let elapsed = start.elapsed();
179 self.metrics.storage_fetch_latency.record(elapsed);
180 self.stats.total_storage_fetches.fetch_add(1, Ordering::Relaxed);
181 self.stats.total_storage_fetch_latency.add_duration(elapsed);
182 res
183 }
184}
185
186impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
187 fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
188 let start = Instant::now();
189 let res = self.state_provider.bytecode_by_hash(code_hash);
190 let elapsed = start.elapsed();
191 self.metrics.code_fetch_latency.record(elapsed);
192 self.stats.total_code_fetches.fetch_add(1, Ordering::Relaxed);
193 self.stats.total_code_fetch_latency.add_duration(elapsed);
194 self.stats.total_code_fetched_bytes.fetch_add(
195 res.as_ref()
196 .ok()
197 .and_then(|code| code.as_ref().map(|code| code.len()))
198 .unwrap_or_default(),
199 Ordering::Relaxed,
200 );
201 res
202 }
203}
204
205impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
206 fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
207 self.state_provider.state_root(hashed_state)
208 }
209
210 fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
211 self.state_provider.state_root_from_nodes(input)
212 }
213
214 fn state_root_with_updates(
215 &self,
216 hashed_state: HashedPostState,
217 ) -> ProviderResult<(B256, TrieUpdates)> {
218 self.state_provider.state_root_with_updates(hashed_state)
219 }
220
221 fn state_root_from_nodes_with_updates(
222 &self,
223 input: TrieInput,
224 ) -> ProviderResult<(B256, TrieUpdates)> {
225 self.state_provider.state_root_from_nodes_with_updates(input)
226 }
227}
228
229impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
230 fn proof(
231 &self,
232 input: TrieInput,
233 address: Address,
234 slots: &[B256],
235 ) -> ProviderResult<AccountProof> {
236 self.state_provider.proof(input, address, slots)
237 }
238
239 fn multiproof(
240 &self,
241 input: TrieInput,
242 targets: MultiProofTargets,
243 ) -> ProviderResult<MultiProof> {
244 self.state_provider.multiproof(input, targets)
245 }
246
247 fn witness(
248 &self,
249 input: TrieInput,
250 target: HashedPostState,
251 mode: reth_trie::ExecutionWitnessMode,
252 ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
253 self.state_provider.witness(input, target, mode)
254 }
255}
256
257impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
258 fn storage_root(
259 &self,
260 address: Address,
261 hashed_storage: HashedStorage,
262 ) -> ProviderResult<B256> {
263 self.state_provider.storage_root(address, hashed_storage)
264 }
265
266 fn storage_proof(
267 &self,
268 address: Address,
269 slot: B256,
270 hashed_storage: HashedStorage,
271 ) -> ProviderResult<StorageProof> {
272 self.state_provider.storage_proof(address, slot, hashed_storage)
273 }
274
275 fn storage_multiproof(
276 &self,
277 address: Address,
278 slots: &[B256],
279 hashed_storage: HashedStorage,
280 ) -> ProviderResult<StorageMultiProof> {
281 self.state_provider.storage_multiproof(address, slots, hashed_storage)
282 }
283}
284
285impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
286 fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
287 self.state_provider.block_hash(number)
288 }
289
290 fn canonical_hashes_range(
291 &self,
292 start: alloy_primitives::BlockNumber,
293 end: alloy_primitives::BlockNumber,
294 ) -> ProviderResult<Vec<B256>> {
295 self.state_provider.canonical_hashes_range(start, end)
296 }
297}
298
299impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
300 fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
301 self.state_provider.hashed_post_state(bundle_state)
302 }
303}
304
305#[derive(Debug, Default)]
309pub struct StateProviderStats {
310 total_storage_fetches: AtomicUsize,
311 total_storage_fetch_latency: AtomicDuration,
312
313 total_code_fetches: AtomicUsize,
314 total_code_fetch_latency: AtomicDuration,
315 total_code_fetched_bytes: AtomicUsize,
316
317 total_account_fetches: AtomicUsize,
318 total_account_fetch_latency: AtomicDuration,
319}
320
321impl StateProviderStats {
322 pub fn total_storage_fetches(&self) -> usize {
324 self.total_storage_fetches.load(Ordering::Relaxed)
325 }
326
327 pub fn total_storage_fetch_latency(&self) -> Duration {
329 self.total_storage_fetch_latency.duration()
330 }
331
332 pub fn total_code_fetches(&self) -> usize {
334 self.total_code_fetches.load(Ordering::Relaxed)
335 }
336
337 pub fn total_code_fetch_latency(&self) -> Duration {
339 self.total_code_fetch_latency.duration()
340 }
341
342 pub fn total_code_fetched_bytes(&self) -> usize {
344 self.total_code_fetched_bytes.load(Ordering::Relaxed)
345 }
346
347 pub fn total_account_fetches(&self) -> usize {
349 self.total_account_fetches.load(Ordering::Relaxed)
350 }
351
352 pub fn total_account_fetch_latency(&self) -> Duration {
354 self.total_account_fetch_latency.duration()
355 }
356}