1use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode};
7use reth_provider::{
8 AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9 StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12 updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13 MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16 sync::atomic::{AtomicU64, Ordering},
17 time::{Duration, Instant},
18};
19
20const NANOS_PER_SEC: u32 = 1_000_000_000;
22
23#[derive(Debug, Default)]
26pub(crate) struct AtomicDuration {
27 nanos: AtomicU64,
33}
34
35impl AtomicDuration {
36 pub(crate) const fn zero() -> Self {
38 Self { nanos: AtomicU64::new(0) }
39 }
40
41 pub(crate) fn duration(&self) -> Duration {
43 let nanos = self.nanos.load(Ordering::Relaxed);
44 let seconds = nanos / NANOS_PER_SEC as u64;
45 let nanos = nanos % NANOS_PER_SEC as u64;
46 Duration::new(seconds, nanos as u32)
48 }
49
50 pub(crate) fn add_duration(&self, duration: Duration) {
52 let total_nanos =
55 duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
56 self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
58 }
59}
60
61#[derive(Debug)]
63pub struct InstrumentedStateProvider<S> {
64 state_provider: S,
66
67 metrics: StateProviderMetrics,
69
70 total_storage_fetch_latency: AtomicDuration,
72
73 total_code_fetch_latency: AtomicDuration,
75
76 total_account_fetch_latency: AtomicDuration,
78}
79
80impl<S> InstrumentedStateProvider<S>
81where
82 S: StateProvider,
83{
84 pub fn new(state_provider: S, source: &'static str) -> Self {
87 Self {
88 state_provider,
89 metrics: StateProviderMetrics::new_with_labels(&[("source", source)]),
90 total_storage_fetch_latency: AtomicDuration::zero(),
91 total_code_fetch_latency: AtomicDuration::zero(),
92 total_account_fetch_latency: AtomicDuration::zero(),
93 }
94 }
95}
96
97impl<S> InstrumentedStateProvider<S> {
98 fn record_storage_fetch(&self, latency: Duration) {
101 self.metrics.storage_fetch_latency.record(latency);
102 self.total_storage_fetch_latency.add_duration(latency);
103 }
104
105 fn record_code_fetch(&self, latency: Duration) {
108 self.metrics.code_fetch_latency.record(latency);
109 self.total_code_fetch_latency.add_duration(latency);
110 }
111
112 fn record_account_fetch(&self, latency: Duration) {
115 self.metrics.account_fetch_latency.record(latency);
116 self.total_account_fetch_latency.add_duration(latency);
117 }
118
119 pub(crate) fn record_total_latency(&self) {
121 let total_storage_fetch_latency = self.total_storage_fetch_latency.duration();
122 self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
123 self.metrics
124 .total_storage_fetch_latency_gauge
125 .set(total_storage_fetch_latency.as_secs_f64());
126
127 let total_code_fetch_latency = self.total_code_fetch_latency.duration();
128 self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
129 self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
130
131 let total_account_fetch_latency = self.total_account_fetch_latency.duration();
132 self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
133 self.metrics
134 .total_account_fetch_latency_gauge
135 .set(total_account_fetch_latency.as_secs_f64());
136 }
137}
138
139impl<S> Drop for InstrumentedStateProvider<S> {
140 fn drop(&mut self) {
141 self.record_total_latency();
142 }
143}
144
145#[derive(Metrics, Clone)]
147#[metrics(scope = "sync.state_provider")]
148pub(crate) struct StateProviderMetrics {
149 storage_fetch_latency: Histogram,
151
152 code_fetch_latency: Histogram,
154
155 account_fetch_latency: Histogram,
157
158 total_storage_fetch_latency: Histogram,
161
162 total_storage_fetch_latency_gauge: Gauge,
165
166 total_code_fetch_latency: Histogram,
169
170 total_code_fetch_latency_gauge: Gauge,
172
173 total_account_fetch_latency: Histogram,
176
177 total_account_fetch_latency_gauge: Gauge,
180}
181
182impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
183 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
184 let start = Instant::now();
185 let res = self.state_provider.basic_account(address);
186 self.record_account_fetch(start.elapsed());
187 res
188 }
189}
190
191impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
192 fn storage(
193 &self,
194 account: Address,
195 storage_key: StorageKey,
196 ) -> ProviderResult<Option<StorageValue>> {
197 let start = Instant::now();
198 let res = self.state_provider.storage(account, storage_key);
199 self.record_storage_fetch(start.elapsed());
200 res
201 }
202
203 fn storage_by_hashed_key(
204 &self,
205 address: Address,
206 hashed_storage_key: StorageKey,
207 ) -> ProviderResult<Option<StorageValue>> {
208 let start = Instant::now();
209 let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
210 self.record_storage_fetch(start.elapsed());
211 res
212 }
213}
214
215impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
216 fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
217 let start = Instant::now();
218 let res = self.state_provider.bytecode_by_hash(code_hash);
219 self.record_code_fetch(start.elapsed());
220 res
221 }
222}
223
224impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
225 fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
226 self.state_provider.state_root(hashed_state)
227 }
228
229 fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
230 self.state_provider.state_root_from_nodes(input)
231 }
232
233 fn state_root_with_updates(
234 &self,
235 hashed_state: HashedPostState,
236 ) -> ProviderResult<(B256, TrieUpdates)> {
237 self.state_provider.state_root_with_updates(hashed_state)
238 }
239
240 fn state_root_from_nodes_with_updates(
241 &self,
242 input: TrieInput,
243 ) -> ProviderResult<(B256, TrieUpdates)> {
244 self.state_provider.state_root_from_nodes_with_updates(input)
245 }
246}
247
248impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
249 fn proof(
250 &self,
251 input: TrieInput,
252 address: Address,
253 slots: &[B256],
254 ) -> ProviderResult<AccountProof> {
255 self.state_provider.proof(input, address, slots)
256 }
257
258 fn multiproof(
259 &self,
260 input: TrieInput,
261 targets: MultiProofTargets,
262 ) -> ProviderResult<MultiProof> {
263 self.state_provider.multiproof(input, targets)
264 }
265
266 fn witness(
267 &self,
268 input: TrieInput,
269 target: HashedPostState,
270 ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
271 self.state_provider.witness(input, target)
272 }
273}
274
275impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
276 fn storage_root(
277 &self,
278 address: Address,
279 hashed_storage: HashedStorage,
280 ) -> ProviderResult<B256> {
281 self.state_provider.storage_root(address, hashed_storage)
282 }
283
284 fn storage_proof(
285 &self,
286 address: Address,
287 slot: B256,
288 hashed_storage: HashedStorage,
289 ) -> ProviderResult<StorageProof> {
290 self.state_provider.storage_proof(address, slot, hashed_storage)
291 }
292
293 fn storage_multiproof(
294 &self,
295 address: Address,
296 slots: &[B256],
297 hashed_storage: HashedStorage,
298 ) -> ProviderResult<StorageMultiProof> {
299 self.state_provider.storage_multiproof(address, slots, hashed_storage)
300 }
301}
302
303impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
304 fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
305 self.state_provider.block_hash(number)
306 }
307
308 fn canonical_hashes_range(
309 &self,
310 start: alloy_primitives::BlockNumber,
311 end: alloy_primitives::BlockNumber,
312 ) -> ProviderResult<Vec<B256>> {
313 self.state_provider.canonical_hashes_range(start, end)
314 }
315}
316
317impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
318 fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
319 self.state_provider.hashed_post_state(bundle_state)
320 }
321}