Skip to main content

reth_engine_tree/tree/
instrumented_state.rs

1//! Implements a state provider that tracks latency metrics.
2use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode, FastInstant as Instant};
7use reth_provider::{
8    AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9    StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12    updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13    MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16    sync::{
17        atomic::{AtomicU64, AtomicUsize, Ordering},
18        Arc,
19    },
20    time::Duration,
21};
22
23/// Nanoseconds per second
24const NANOS_PER_SEC: u32 = 1_000_000_000;
25
26/// An atomic version of [`Duration`], using an [`AtomicU64`] to store the total nanoseconds in the
27/// duration.
28#[derive(Debug, Default)]
29pub(crate) struct AtomicDuration {
30    /// The nanoseconds part of the duration
31    ///
32    /// We would have to accumulate 584 years of nanoseconds to overflow a u64, so this is
33    /// sufficiently large for our use case. We don't expect to be adding arbitrary durations to
34    /// this value.
35    nanos: AtomicU64,
36}
37
38impl AtomicDuration {
39    /// Returns the duration as a [`Duration`]
40    pub(crate) fn duration(&self) -> Duration {
41        let nanos = self.nanos.load(Ordering::Relaxed);
42        let seconds = nanos / NANOS_PER_SEC as u64;
43        let nanos = nanos % NANOS_PER_SEC as u64;
44        // `as u32` is ok because we did a mod by u32 const
45        Duration::new(seconds, nanos as u32)
46    }
47
48    /// Adds a [`Duration`] to the atomic duration.
49    pub(crate) fn add_duration(&self, duration: Duration) {
50        // this is `as_nanos` but without the `as u128` - we do not expect durations over 584 years
51        // as input here
52        let total_nanos =
53            duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
54        // add the nanoseconds part of the duration
55        self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
56    }
57}
58
59/// A wrapper of a state provider and latency metrics.
60#[derive(Debug)]
61pub struct InstrumentedStateProvider<S> {
62    /// The state provider
63    state_provider: S,
64    /// Prometheus metrics for the instrumented state provider
65    metrics: StateProviderMetrics,
66    /// Shared fetch statistics, readable after the provider is consumed.
67    stats: Arc<StateProviderStats>,
68}
69
70impl<S> InstrumentedStateProvider<S>
71where
72    S: StateProvider,
73{
74    /// Creates a new [`InstrumentedStateProvider`] from a state provider with the provided label
75    /// for metrics.
76    pub fn new(state_provider: S, source: &'static str) -> Self {
77        Self::with_stats(
78            state_provider,
79            StateProviderMetrics::with_source(source),
80            Arc::new(StateProviderStats::default()),
81        )
82    }
83
84    /// Creates a new [`InstrumentedStateProvider`] that writes into shared statistics.
85    pub(crate) const fn with_stats(
86        state_provider: S,
87        metrics: StateProviderMetrics,
88        stats: Arc<StateProviderStats>,
89    ) -> Self {
90        Self { state_provider, metrics, stats }
91    }
92
93    /// Returns a shared reference to the accumulated fetch statistics.
94    pub fn stats(&self) -> Arc<StateProviderStats> {
95        Arc::clone(&self.stats)
96    }
97}
98
99/// Metrics for the instrumented state provider
100#[derive(Metrics, Clone)]
101#[metrics(scope = "sync.state_provider")]
102pub(crate) struct StateProviderMetrics {
103    /// A histogram of the time it takes to get a storage value
104    storage_fetch_latency: Histogram,
105
106    /// A histogram of the time it takes to get a code value
107    code_fetch_latency: Histogram,
108
109    /// A histogram of the time it takes to get an account value
110    account_fetch_latency: Histogram,
111
112    /// A histogram of the total time we spend fetching storage over the lifetime of this state
113    /// provider
114    total_storage_fetch_latency: Histogram,
115
116    /// A gauge of the total time we spend fetching storage over the lifetime of this state
117    /// provider
118    total_storage_fetch_latency_gauge: Gauge,
119
120    /// A histogram of the total time we spend fetching code over the lifetime of this state
121    /// provider
122    total_code_fetch_latency: Histogram,
123
124    /// A gauge of the total time we spend fetching code over the lifetime of this state provider
125    total_code_fetch_latency_gauge: Gauge,
126
127    /// A histogram of the total time we spend fetching accounts over the lifetime of this state
128    /// provider
129    total_account_fetch_latency: Histogram,
130
131    /// A gauge of the total time we spend fetching accounts over the lifetime of this state
132    /// provider
133    total_account_fetch_latency_gauge: Gauge,
134}
135
136impl StateProviderMetrics {
137    /// Creates state-provider metrics with the given source label.
138    pub(crate) fn with_source(source: &'static str) -> Self {
139        Self::new_with_labels(&[("source", source)])
140    }
141
142    /// Records accumulated fetch latency totals.
143    pub(crate) fn record_totals(&self, stats: &StateProviderStats) {
144        let total_storage_fetch_latency = stats.total_storage_fetch_latency.duration();
145        self.total_storage_fetch_latency.record(total_storage_fetch_latency);
146        self.total_storage_fetch_latency_gauge.set(total_storage_fetch_latency.as_secs_f64());
147
148        let total_code_fetch_latency = stats.total_code_fetch_latency.duration();
149        self.total_code_fetch_latency.record(total_code_fetch_latency);
150        self.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
151
152        let total_account_fetch_latency = stats.total_account_fetch_latency.duration();
153        self.total_account_fetch_latency.record(total_account_fetch_latency);
154        self.total_account_fetch_latency_gauge.set(total_account_fetch_latency.as_secs_f64());
155    }
156}
157
158impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
159    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
160        let start = Instant::now();
161        let res = self.state_provider.basic_account(address);
162        let elapsed = start.elapsed();
163        self.metrics.account_fetch_latency.record(elapsed);
164        self.stats.total_account_fetches.fetch_add(1, Ordering::Relaxed);
165        self.stats.total_account_fetch_latency.add_duration(elapsed);
166        res
167    }
168}
169
170impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
171    fn storage(
172        &self,
173        account: Address,
174        storage_key: StorageKey,
175    ) -> ProviderResult<Option<StorageValue>> {
176        let start = Instant::now();
177        let res = self.state_provider.storage(account, storage_key);
178        let elapsed = start.elapsed();
179        self.metrics.storage_fetch_latency.record(elapsed);
180        self.stats.total_storage_fetches.fetch_add(1, Ordering::Relaxed);
181        self.stats.total_storage_fetch_latency.add_duration(elapsed);
182        res
183    }
184}
185
186impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
187    fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
188        let start = Instant::now();
189        let res = self.state_provider.bytecode_by_hash(code_hash);
190        let elapsed = start.elapsed();
191        self.metrics.code_fetch_latency.record(elapsed);
192        self.stats.total_code_fetches.fetch_add(1, Ordering::Relaxed);
193        self.stats.total_code_fetch_latency.add_duration(elapsed);
194        self.stats.total_code_fetched_bytes.fetch_add(
195            res.as_ref()
196                .ok()
197                .and_then(|code| code.as_ref().map(|code| code.len()))
198                .unwrap_or_default(),
199            Ordering::Relaxed,
200        );
201        res
202    }
203}
204
205impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
206    fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
207        self.state_provider.state_root(hashed_state)
208    }
209
210    fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
211        self.state_provider.state_root_from_nodes(input)
212    }
213
214    fn state_root_with_updates(
215        &self,
216        hashed_state: HashedPostState,
217    ) -> ProviderResult<(B256, TrieUpdates)> {
218        self.state_provider.state_root_with_updates(hashed_state)
219    }
220
221    fn state_root_from_nodes_with_updates(
222        &self,
223        input: TrieInput,
224    ) -> ProviderResult<(B256, TrieUpdates)> {
225        self.state_provider.state_root_from_nodes_with_updates(input)
226    }
227}
228
229impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
230    fn proof(
231        &self,
232        input: TrieInput,
233        address: Address,
234        slots: &[B256],
235    ) -> ProviderResult<AccountProof> {
236        self.state_provider.proof(input, address, slots)
237    }
238
239    fn multiproof(
240        &self,
241        input: TrieInput,
242        targets: MultiProofTargets,
243    ) -> ProviderResult<MultiProof> {
244        self.state_provider.multiproof(input, targets)
245    }
246
247    fn witness(
248        &self,
249        input: TrieInput,
250        target: HashedPostState,
251        mode: reth_trie::ExecutionWitnessMode,
252    ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
253        self.state_provider.witness(input, target, mode)
254    }
255}
256
257impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
258    fn storage_root(
259        &self,
260        address: Address,
261        hashed_storage: HashedStorage,
262    ) -> ProviderResult<B256> {
263        self.state_provider.storage_root(address, hashed_storage)
264    }
265
266    fn storage_proof(
267        &self,
268        address: Address,
269        slot: B256,
270        hashed_storage: HashedStorage,
271    ) -> ProviderResult<StorageProof> {
272        self.state_provider.storage_proof(address, slot, hashed_storage)
273    }
274
275    fn storage_multiproof(
276        &self,
277        address: Address,
278        slots: &[B256],
279        hashed_storage: HashedStorage,
280    ) -> ProviderResult<StorageMultiProof> {
281        self.state_provider.storage_multiproof(address, slots, hashed_storage)
282    }
283}
284
285impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
286    fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
287        self.state_provider.block_hash(number)
288    }
289
290    fn canonical_hashes_range(
291        &self,
292        start: alloy_primitives::BlockNumber,
293        end: alloy_primitives::BlockNumber,
294    ) -> ProviderResult<Vec<B256>> {
295        self.state_provider.canonical_hashes_range(start, end)
296    }
297}
298
299impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
300    fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
301        self.state_provider.hashed_post_state(bundle_state)
302    }
303}
304
305/// Accumulated fetch statistics from an [`InstrumentedStateProvider`].
306///
307/// Shared via `Arc` so statistics can be read after the provider is consumed.
308#[derive(Debug, Default)]
309pub struct StateProviderStats {
310    total_storage_fetches: AtomicUsize,
311    total_storage_fetch_latency: AtomicDuration,
312
313    total_code_fetches: AtomicUsize,
314    total_code_fetch_latency: AtomicDuration,
315    total_code_fetched_bytes: AtomicUsize,
316
317    total_account_fetches: AtomicUsize,
318    total_account_fetch_latency: AtomicDuration,
319}
320
321impl StateProviderStats {
322    /// Returns total number of storage fetches.
323    pub fn total_storage_fetches(&self) -> usize {
324        self.total_storage_fetches.load(Ordering::Relaxed)
325    }
326
327    /// Returns total time spent on storage fetches.
328    pub fn total_storage_fetch_latency(&self) -> Duration {
329        self.total_storage_fetch_latency.duration()
330    }
331
332    /// Returns total number of code fetches.
333    pub fn total_code_fetches(&self) -> usize {
334        self.total_code_fetches.load(Ordering::Relaxed)
335    }
336
337    /// Returns total time spent on code fetches.
338    pub fn total_code_fetch_latency(&self) -> Duration {
339        self.total_code_fetch_latency.duration()
340    }
341
342    /// Returns total amount of code fetched, in bytes.
343    pub fn total_code_fetched_bytes(&self) -> usize {
344        self.total_code_fetched_bytes.load(Ordering::Relaxed)
345    }
346
347    /// Returns total number of account fetches.
348    pub fn total_account_fetches(&self) -> usize {
349        self.total_account_fetches.load(Ordering::Relaxed)
350    }
351
352    /// Returns total time spent on account fetches.
353    pub fn total_account_fetch_latency(&self) -> Duration {
354        self.total_account_fetch_latency.duration()
355    }
356}