1use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode};
7use reth_provider::{
8 AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9 StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12 updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13 MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16 sync::atomic::{AtomicU64, Ordering},
17 time::{Duration, Instant},
18};
19
20const NANOS_PER_SEC: u32 = 1_000_000_000;
22
23#[derive(Debug, Default)]
26pub(crate) struct AtomicDuration {
27 nanos: AtomicU64,
33}
34
35impl AtomicDuration {
36 pub(crate) const fn zero() -> Self {
38 Self { nanos: AtomicU64::new(0) }
39 }
40
41 pub(crate) fn duration(&self) -> Duration {
43 let nanos = self.nanos.load(Ordering::Relaxed);
44 let seconds = nanos / NANOS_PER_SEC as u64;
45 let nanos = nanos % NANOS_PER_SEC as u64;
46 Duration::new(seconds, nanos as u32)
48 }
49
50 pub(crate) fn add_duration(&self, duration: Duration) {
52 let total_nanos =
55 duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
56 self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
58 }
59}
60
61#[derive(Debug)]
63pub struct InstrumentedStateProvider<S> {
64 state_provider: S,
66
67 metrics: StateProviderMetrics,
69
70 total_storage_fetch_latency: AtomicDuration,
72
73 total_code_fetch_latency: AtomicDuration,
75
76 total_account_fetch_latency: AtomicDuration,
78}
79
80impl<S> InstrumentedStateProvider<S>
81where
82 S: StateProvider,
83{
84 pub fn from_state_provider(state_provider: S, source: &'static str) -> Self {
87 Self {
88 state_provider,
89 metrics: StateProviderMetrics::new_with_labels(&[("source", source)]),
90 total_storage_fetch_latency: AtomicDuration::zero(),
91 total_code_fetch_latency: AtomicDuration::zero(),
92 total_account_fetch_latency: AtomicDuration::zero(),
93 }
94 }
95}
96
97impl<S> InstrumentedStateProvider<S> {
98 fn record_storage_fetch(&self, latency: Duration) {
101 self.metrics.storage_fetch_latency.record(latency);
102 self.total_storage_fetch_latency.add_duration(latency);
103 }
104
105 fn record_code_fetch(&self, latency: Duration) {
108 self.metrics.code_fetch_latency.record(latency);
109 self.total_code_fetch_latency.add_duration(latency);
110 }
111
112 fn record_account_fetch(&self, latency: Duration) {
115 self.metrics.account_fetch_latency.record(latency);
116 self.total_account_fetch_latency.add_duration(latency);
117 }
118
119 pub(crate) fn record_total_latency(&self) {
121 let total_storage_fetch_latency = self.total_storage_fetch_latency.duration();
122 self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
123 self.metrics
124 .total_storage_fetch_latency_gauge
125 .set(total_storage_fetch_latency.as_secs_f64());
126
127 let total_code_fetch_latency = self.total_code_fetch_latency.duration();
128 self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
129 self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
130
131 let total_account_fetch_latency = self.total_account_fetch_latency.duration();
132 self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
133 self.metrics
134 .total_account_fetch_latency_gauge
135 .set(total_account_fetch_latency.as_secs_f64());
136 }
137}
138
139impl<S> Drop for InstrumentedStateProvider<S> {
140 fn drop(&mut self) {
141 self.record_total_latency();
142 }
143}
144
145#[derive(Metrics, Clone)]
147#[metrics(scope = "sync.state_provider")]
148pub(crate) struct StateProviderMetrics {
149 storage_fetch_latency: Histogram,
151
152 code_fetch_latency: Histogram,
154
155 account_fetch_latency: Histogram,
157
158 total_storage_fetch_latency: Histogram,
161
162 total_storage_fetch_latency_gauge: Gauge,
165
166 total_code_fetch_latency: Histogram,
169
170 total_code_fetch_latency_gauge: Gauge,
172
173 total_account_fetch_latency: Histogram,
176
177 total_account_fetch_latency_gauge: Gauge,
180}
181
182impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
183 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
184 let start = Instant::now();
185 let res = self.state_provider.basic_account(address);
186 self.record_account_fetch(start.elapsed());
187 res
188 }
189}
190
191impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
192 fn storage(
193 &self,
194 account: Address,
195 storage_key: StorageKey,
196 ) -> ProviderResult<Option<StorageValue>> {
197 let start = Instant::now();
198 let res = self.state_provider.storage(account, storage_key);
199 self.record_storage_fetch(start.elapsed());
200 res
201 }
202}
203
204impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
205 fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
206 let start = Instant::now();
207 let res = self.state_provider.bytecode_by_hash(code_hash);
208 self.record_code_fetch(start.elapsed());
209 res
210 }
211}
212
213impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
214 fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
215 self.state_provider.state_root(hashed_state)
216 }
217
218 fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
219 self.state_provider.state_root_from_nodes(input)
220 }
221
222 fn state_root_with_updates(
223 &self,
224 hashed_state: HashedPostState,
225 ) -> ProviderResult<(B256, TrieUpdates)> {
226 self.state_provider.state_root_with_updates(hashed_state)
227 }
228
229 fn state_root_from_nodes_with_updates(
230 &self,
231 input: TrieInput,
232 ) -> ProviderResult<(B256, TrieUpdates)> {
233 self.state_provider.state_root_from_nodes_with_updates(input)
234 }
235}
236
237impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
238 fn proof(
239 &self,
240 input: TrieInput,
241 address: Address,
242 slots: &[B256],
243 ) -> ProviderResult<AccountProof> {
244 self.state_provider.proof(input, address, slots)
245 }
246
247 fn multiproof(
248 &self,
249 input: TrieInput,
250 targets: MultiProofTargets,
251 ) -> ProviderResult<MultiProof> {
252 self.state_provider.multiproof(input, targets)
253 }
254
255 fn witness(
256 &self,
257 input: TrieInput,
258 target: HashedPostState,
259 ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
260 self.state_provider.witness(input, target)
261 }
262}
263
264impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
265 fn storage_root(
266 &self,
267 address: Address,
268 hashed_storage: HashedStorage,
269 ) -> ProviderResult<B256> {
270 self.state_provider.storage_root(address, hashed_storage)
271 }
272
273 fn storage_proof(
274 &self,
275 address: Address,
276 slot: B256,
277 hashed_storage: HashedStorage,
278 ) -> ProviderResult<StorageProof> {
279 self.state_provider.storage_proof(address, slot, hashed_storage)
280 }
281
282 fn storage_multiproof(
283 &self,
284 address: Address,
285 slots: &[B256],
286 hashed_storage: HashedStorage,
287 ) -> ProviderResult<StorageMultiProof> {
288 self.state_provider.storage_multiproof(address, slots, hashed_storage)
289 }
290}
291
292impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
293 fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
294 self.state_provider.block_hash(number)
295 }
296
297 fn canonical_hashes_range(
298 &self,
299 start: alloy_primitives::BlockNumber,
300 end: alloy_primitives::BlockNumber,
301 ) -> ProviderResult<Vec<B256>> {
302 self.state_provider.canonical_hashes_range(start, end)
303 }
304}
305
306impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
307 fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
308 self.state_provider.hashed_post_state(bundle_state)
309 }
310}