1use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode, FastInstant as Instant};
7use reth_provider::{
8 AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9 StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12 updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13 MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16 sync::{
17 atomic::{AtomicU64, AtomicUsize, Ordering},
18 Arc,
19 },
20 time::Duration,
21};
22
23const NANOS_PER_SEC: u32 = 1_000_000_000;
25
26#[derive(Debug, Default)]
29pub(crate) struct AtomicDuration {
30 nanos: AtomicU64,
36}
37
38impl AtomicDuration {
39 pub(crate) fn duration(&self) -> Duration {
41 let nanos = self.nanos.load(Ordering::Relaxed);
42 let seconds = nanos / NANOS_PER_SEC as u64;
43 let nanos = nanos % NANOS_PER_SEC as u64;
44 Duration::new(seconds, nanos as u32)
46 }
47
48 pub(crate) fn add_duration(&self, duration: Duration) {
50 let total_nanos =
53 duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
54 self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
56 }
57}
58
59#[derive(Debug)]
61pub struct InstrumentedStateProvider<S> {
62 state_provider: S,
64 metrics: StateProviderMetrics,
66 stats: Arc<StateProviderStats>,
68}
69
70impl<S> InstrumentedStateProvider<S>
71where
72 S: StateProvider,
73{
74 pub fn new(state_provider: S, source: &'static str) -> Self {
77 Self {
78 state_provider,
79 metrics: StateProviderMetrics::new_with_labels(&[("source", source)]),
80 stats: Arc::new(StateProviderStats::default()),
81 }
82 }
83
84 pub fn stats(&self) -> Arc<StateProviderStats> {
86 Arc::clone(&self.stats)
87 }
88}
89
90impl<S> Drop for InstrumentedStateProvider<S> {
91 fn drop(&mut self) {
92 let total_storage_fetch_latency = self.stats.total_storage_fetch_latency.duration();
93 self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
94 self.metrics
95 .total_storage_fetch_latency_gauge
96 .set(total_storage_fetch_latency.as_secs_f64());
97
98 let total_code_fetch_latency = self.stats.total_code_fetch_latency.duration();
99 self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
100 self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
101
102 let total_account_fetch_latency = self.stats.total_account_fetch_latency.duration();
103 self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
104 self.metrics
105 .total_account_fetch_latency_gauge
106 .set(total_account_fetch_latency.as_secs_f64());
107 }
108}
109
110#[derive(Metrics, Clone)]
112#[metrics(scope = "sync.state_provider")]
113pub(crate) struct StateProviderMetrics {
114 storage_fetch_latency: Histogram,
116
117 code_fetch_latency: Histogram,
119
120 account_fetch_latency: Histogram,
122
123 total_storage_fetch_latency: Histogram,
126
127 total_storage_fetch_latency_gauge: Gauge,
130
131 total_code_fetch_latency: Histogram,
134
135 total_code_fetch_latency_gauge: Gauge,
137
138 total_account_fetch_latency: Histogram,
141
142 total_account_fetch_latency_gauge: Gauge,
145}
146
147impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
148 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
149 let start = Instant::now();
150 let res = self.state_provider.basic_account(address);
151 let elapsed = start.elapsed();
152 self.metrics.account_fetch_latency.record(elapsed);
153 self.stats.total_account_fetches.fetch_add(1, Ordering::Relaxed);
154 self.stats.total_account_fetch_latency.add_duration(elapsed);
155 res
156 }
157}
158
159impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
160 fn storage(
161 &self,
162 account: Address,
163 storage_key: StorageKey,
164 ) -> ProviderResult<Option<StorageValue>> {
165 let start = Instant::now();
166 let res = self.state_provider.storage(account, storage_key);
167 let elapsed = start.elapsed();
168 self.metrics.storage_fetch_latency.record(elapsed);
169 self.stats.total_storage_fetches.fetch_add(1, Ordering::Relaxed);
170 self.stats.total_storage_fetch_latency.add_duration(elapsed);
171 res
172 }
173}
174
175impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
176 fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
177 let start = Instant::now();
178 let res = self.state_provider.bytecode_by_hash(code_hash);
179 let elapsed = start.elapsed();
180 self.metrics.code_fetch_latency.record(elapsed);
181 self.stats.total_code_fetches.fetch_add(1, Ordering::Relaxed);
182 self.stats.total_code_fetch_latency.add_duration(elapsed);
183 self.stats.total_code_fetched_bytes.fetch_add(
184 res.as_ref()
185 .ok()
186 .and_then(|code| code.as_ref().map(|code| code.len()))
187 .unwrap_or_default(),
188 Ordering::Relaxed,
189 );
190 res
191 }
192}
193
194impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
195 fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
196 self.state_provider.state_root(hashed_state)
197 }
198
199 fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
200 self.state_provider.state_root_from_nodes(input)
201 }
202
203 fn state_root_with_updates(
204 &self,
205 hashed_state: HashedPostState,
206 ) -> ProviderResult<(B256, TrieUpdates)> {
207 self.state_provider.state_root_with_updates(hashed_state)
208 }
209
210 fn state_root_from_nodes_with_updates(
211 &self,
212 input: TrieInput,
213 ) -> ProviderResult<(B256, TrieUpdates)> {
214 self.state_provider.state_root_from_nodes_with_updates(input)
215 }
216}
217
218impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
219 fn proof(
220 &self,
221 input: TrieInput,
222 address: Address,
223 slots: &[B256],
224 ) -> ProviderResult<AccountProof> {
225 self.state_provider.proof(input, address, slots)
226 }
227
228 fn multiproof(
229 &self,
230 input: TrieInput,
231 targets: MultiProofTargets,
232 ) -> ProviderResult<MultiProof> {
233 self.state_provider.multiproof(input, targets)
234 }
235
236 fn witness(
237 &self,
238 input: TrieInput,
239 target: HashedPostState,
240 ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
241 self.state_provider.witness(input, target)
242 }
243}
244
245impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
246 fn storage_root(
247 &self,
248 address: Address,
249 hashed_storage: HashedStorage,
250 ) -> ProviderResult<B256> {
251 self.state_provider.storage_root(address, hashed_storage)
252 }
253
254 fn storage_proof(
255 &self,
256 address: Address,
257 slot: B256,
258 hashed_storage: HashedStorage,
259 ) -> ProviderResult<StorageProof> {
260 self.state_provider.storage_proof(address, slot, hashed_storage)
261 }
262
263 fn storage_multiproof(
264 &self,
265 address: Address,
266 slots: &[B256],
267 hashed_storage: HashedStorage,
268 ) -> ProviderResult<StorageMultiProof> {
269 self.state_provider.storage_multiproof(address, slots, hashed_storage)
270 }
271}
272
273impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
274 fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
275 self.state_provider.block_hash(number)
276 }
277
278 fn canonical_hashes_range(
279 &self,
280 start: alloy_primitives::BlockNumber,
281 end: alloy_primitives::BlockNumber,
282 ) -> ProviderResult<Vec<B256>> {
283 self.state_provider.canonical_hashes_range(start, end)
284 }
285}
286
287impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
288 fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
289 self.state_provider.hashed_post_state(bundle_state)
290 }
291}
292
293#[derive(Debug, Default)]
297pub struct StateProviderStats {
298 total_storage_fetches: AtomicUsize,
299 total_storage_fetch_latency: AtomicDuration,
300
301 total_code_fetches: AtomicUsize,
302 total_code_fetch_latency: AtomicDuration,
303 total_code_fetched_bytes: AtomicUsize,
304
305 total_account_fetches: AtomicUsize,
306 total_account_fetch_latency: AtomicDuration,
307}
308
309impl StateProviderStats {
310 pub fn total_storage_fetches(&self) -> usize {
312 self.total_storage_fetches.load(Ordering::Relaxed)
313 }
314
315 pub fn total_storage_fetch_latency(&self) -> Duration {
317 self.total_storage_fetch_latency.duration()
318 }
319
320 pub fn total_code_fetches(&self) -> usize {
322 self.total_code_fetches.load(Ordering::Relaxed)
323 }
324
325 pub fn total_code_fetch_latency(&self) -> Duration {
327 self.total_code_fetch_latency.duration()
328 }
329
330 pub fn total_code_fetched_bytes(&self) -> usize {
332 self.total_code_fetched_bytes.load(Ordering::Relaxed)
333 }
334
335 pub fn total_account_fetches(&self) -> usize {
337 self.total_account_fetches.load(Ordering::Relaxed)
338 }
339
340 pub fn total_account_fetch_latency(&self) -> Duration {
342 self.total_account_fetch_latency.duration()
343 }
344}