1use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode};
7use reth_provider::{
8 AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9 StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12 updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13 MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16 sync::atomic::{AtomicU64, Ordering},
17 time::{Duration, Instant},
18};
19
20const NANOS_PER_SEC: u32 = 1_000_000_000;
22
23#[derive(Default)]
26pub(crate) struct AtomicDuration {
27 nanos: AtomicU64,
33}
34
35impl AtomicDuration {
36 pub(crate) const fn zero() -> Self {
38 Self { nanos: AtomicU64::new(0) }
39 }
40
41 pub(crate) fn duration(&self) -> Duration {
43 let nanos = self.nanos.load(Ordering::Relaxed);
44 let seconds = nanos / NANOS_PER_SEC as u64;
45 let nanos = nanos % NANOS_PER_SEC as u64;
46 Duration::new(seconds, nanos as u32)
48 }
49
50 pub(crate) fn add_duration(&self, duration: Duration) {
52 let total_nanos =
55 duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
56 self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
58 }
59}
60
61pub(crate) struct InstrumentedStateProvider<S> {
63 state_provider: S,
65
66 metrics: StateProviderMetrics,
68
69 total_storage_fetch_latency: AtomicDuration,
71
72 total_code_fetch_latency: AtomicDuration,
74
75 total_account_fetch_latency: AtomicDuration,
77}
78
79impl<S> InstrumentedStateProvider<S>
80where
81 S: StateProvider,
82{
83 pub(crate) fn from_state_provider(state_provider: S) -> Self {
85 Self {
86 state_provider,
87 metrics: StateProviderMetrics::default(),
88 total_storage_fetch_latency: AtomicDuration::zero(),
89 total_code_fetch_latency: AtomicDuration::zero(),
90 total_account_fetch_latency: AtomicDuration::zero(),
91 }
92 }
93}
94
95impl<S> InstrumentedStateProvider<S> {
96 fn record_storage_fetch(&self, latency: Duration) {
99 self.metrics.storage_fetch_latency.record(latency);
100 self.total_storage_fetch_latency.add_duration(latency);
101 }
102
103 fn record_code_fetch(&self, latency: Duration) {
106 self.metrics.code_fetch_latency.record(latency);
107 self.total_code_fetch_latency.add_duration(latency);
108 }
109
110 fn record_account_fetch(&self, latency: Duration) {
113 self.metrics.account_fetch_latency.record(latency);
114 self.total_account_fetch_latency.add_duration(latency);
115 }
116
117 pub(crate) fn record_total_latency(&self) {
119 let total_storage_fetch_latency = self.total_storage_fetch_latency.duration();
120 self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
121 self.metrics
122 .total_storage_fetch_latency_gauge
123 .set(total_storage_fetch_latency.as_secs_f64());
124
125 let total_code_fetch_latency = self.total_code_fetch_latency.duration();
126 self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
127 self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
128
129 let total_account_fetch_latency = self.total_account_fetch_latency.duration();
130 self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
131 self.metrics
132 .total_account_fetch_latency_gauge
133 .set(total_account_fetch_latency.as_secs_f64());
134 }
135}
136
137#[derive(Metrics, Clone)]
139#[metrics(scope = "sync.state_provider")]
140pub(crate) struct StateProviderMetrics {
141 storage_fetch_latency: Histogram,
143
144 code_fetch_latency: Histogram,
146
147 account_fetch_latency: Histogram,
149
150 total_storage_fetch_latency: Histogram,
153
154 total_storage_fetch_latency_gauge: Gauge,
157
158 total_code_fetch_latency: Histogram,
161
162 total_code_fetch_latency_gauge: Gauge,
164
165 total_account_fetch_latency: Histogram,
168
169 total_account_fetch_latency_gauge: Gauge,
172}
173
174impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
175 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
176 let start = Instant::now();
177 let res = self.state_provider.basic_account(address);
178 self.record_account_fetch(start.elapsed());
179 res
180 }
181}
182
183impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
184 fn storage(
185 &self,
186 account: Address,
187 storage_key: StorageKey,
188 ) -> ProviderResult<Option<StorageValue>> {
189 let start = Instant::now();
190 let res = self.state_provider.storage(account, storage_key);
191 self.record_storage_fetch(start.elapsed());
192 res
193 }
194}
195
196impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
197 fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
198 let start = Instant::now();
199 let res = self.state_provider.bytecode_by_hash(code_hash);
200 self.record_code_fetch(start.elapsed());
201 res
202 }
203}
204
205impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
206 fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
207 self.state_provider.state_root(hashed_state)
208 }
209
210 fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
211 self.state_provider.state_root_from_nodes(input)
212 }
213
214 fn state_root_with_updates(
215 &self,
216 hashed_state: HashedPostState,
217 ) -> ProviderResult<(B256, TrieUpdates)> {
218 self.state_provider.state_root_with_updates(hashed_state)
219 }
220
221 fn state_root_from_nodes_with_updates(
222 &self,
223 input: TrieInput,
224 ) -> ProviderResult<(B256, TrieUpdates)> {
225 self.state_provider.state_root_from_nodes_with_updates(input)
226 }
227}
228
229impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
230 fn proof(
231 &self,
232 input: TrieInput,
233 address: Address,
234 slots: &[B256],
235 ) -> ProviderResult<AccountProof> {
236 self.state_provider.proof(input, address, slots)
237 }
238
239 fn multiproof(
240 &self,
241 input: TrieInput,
242 targets: MultiProofTargets,
243 ) -> ProviderResult<MultiProof> {
244 self.state_provider.multiproof(input, targets)
245 }
246
247 fn witness(
248 &self,
249 input: TrieInput,
250 target: HashedPostState,
251 ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
252 self.state_provider.witness(input, target)
253 }
254}
255
256impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
257 fn storage_root(
258 &self,
259 address: Address,
260 hashed_storage: HashedStorage,
261 ) -> ProviderResult<B256> {
262 self.state_provider.storage_root(address, hashed_storage)
263 }
264
265 fn storage_proof(
266 &self,
267 address: Address,
268 slot: B256,
269 hashed_storage: HashedStorage,
270 ) -> ProviderResult<StorageProof> {
271 self.state_provider.storage_proof(address, slot, hashed_storage)
272 }
273
274 fn storage_multiproof(
275 &self,
276 address: Address,
277 slots: &[B256],
278 hashed_storage: HashedStorage,
279 ) -> ProviderResult<StorageMultiProof> {
280 self.state_provider.storage_multiproof(address, slots, hashed_storage)
281 }
282}
283
284impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
285 fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
286 self.state_provider.block_hash(number)
287 }
288
289 fn canonical_hashes_range(
290 &self,
291 start: alloy_primitives::BlockNumber,
292 end: alloy_primitives::BlockNumber,
293 ) -> ProviderResult<Vec<B256>> {
294 self.state_provider.canonical_hashes_range(start, end)
295 }
296}
297
298impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
299 fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
300 self.state_provider.hashed_post_state(bundle_state)
301 }
302}