Skip to main content

reth_engine_tree/tree/
instrumented_state.rs

1//! Implements a state provider that tracks latency metrics.
2use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode};
7use reth_provider::{
8    AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9    StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12    updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13    MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16    sync::atomic::{AtomicU64, Ordering},
17    time::{Duration, Instant},
18};
19
20/// Nanoseconds per second
21const NANOS_PER_SEC: u32 = 1_000_000_000;
22
23/// An atomic version of [`Duration`], using an [`AtomicU64`] to store the total nanoseconds in the
24/// duration.
25#[derive(Debug, Default)]
26pub(crate) struct AtomicDuration {
27    /// The nanoseconds part of the duration
28    ///
29    /// We would have to accumulate 584 years of nanoseconds to overflow a u64, so this is
30    /// sufficiently large for our use case. We don't expect to be adding arbitrary durations to
31    /// this value.
32    nanos: AtomicU64,
33}
34
35impl AtomicDuration {
36    /// Returns a zero duration.
37    pub(crate) const fn zero() -> Self {
38        Self { nanos: AtomicU64::new(0) }
39    }
40
41    /// Returns the duration as a [`Duration`]
42    pub(crate) fn duration(&self) -> Duration {
43        let nanos = self.nanos.load(Ordering::Relaxed);
44        let seconds = nanos / NANOS_PER_SEC as u64;
45        let nanos = nanos % NANOS_PER_SEC as u64;
46        // `as u32` is ok because we did a mod by u32 const
47        Duration::new(seconds, nanos as u32)
48    }
49
50    /// Adds a [`Duration`] to the atomic duration.
51    pub(crate) fn add_duration(&self, duration: Duration) {
52        // this is `as_nanos` but without the `as u128` - we do not expect durations over 584 years
53        // as input here
54        let total_nanos =
55            duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
56        // add the nanoseconds part of the duration
57        self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
58    }
59}
60
61/// A wrapper of a state provider and latency metrics.
62#[derive(Debug)]
63pub struct InstrumentedStateProvider<S> {
64    /// The state provider
65    state_provider: S,
66
67    /// Metrics for the instrumented state provider
68    metrics: StateProviderMetrics,
69
70    /// The total time we spend fetching storage over the lifetime of this state provider
71    total_storage_fetch_latency: AtomicDuration,
72
73    /// The total time we spend fetching code over the lifetime of this state provider
74    total_code_fetch_latency: AtomicDuration,
75
76    /// The total time we spend fetching accounts over the lifetime of this state provider
77    total_account_fetch_latency: AtomicDuration,
78}
79
80impl<S> InstrumentedStateProvider<S>
81where
82    S: StateProvider,
83{
84    /// Creates a new [`InstrumentedStateProvider`] from a state provider with the provided label
85    /// for metrics.
86    pub fn new(state_provider: S, source: &'static str) -> Self {
87        Self {
88            state_provider,
89            metrics: StateProviderMetrics::new_with_labels(&[("source", source)]),
90            total_storage_fetch_latency: AtomicDuration::zero(),
91            total_code_fetch_latency: AtomicDuration::zero(),
92            total_account_fetch_latency: AtomicDuration::zero(),
93        }
94    }
95}
96
97impl<S> InstrumentedStateProvider<S> {
98    /// Records the latency for a storage fetch, and increments the duration counter for the storage
99    /// fetch.
100    fn record_storage_fetch(&self, latency: Duration) {
101        self.metrics.storage_fetch_latency.record(latency);
102        self.total_storage_fetch_latency.add_duration(latency);
103    }
104
105    /// Records the latency for a code fetch, and increments the duration counter for the code
106    /// fetch.
107    fn record_code_fetch(&self, latency: Duration) {
108        self.metrics.code_fetch_latency.record(latency);
109        self.total_code_fetch_latency.add_duration(latency);
110    }
111
112    /// Records the latency for an account fetch, and increments the duration counter for the
113    /// account fetch.
114    fn record_account_fetch(&self, latency: Duration) {
115        self.metrics.account_fetch_latency.record(latency);
116        self.total_account_fetch_latency.add_duration(latency);
117    }
118
119    /// Records the total latencies into their respective gauges and histograms.
120    pub(crate) fn record_total_latency(&self) {
121        let total_storage_fetch_latency = self.total_storage_fetch_latency.duration();
122        self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
123        self.metrics
124            .total_storage_fetch_latency_gauge
125            .set(total_storage_fetch_latency.as_secs_f64());
126
127        let total_code_fetch_latency = self.total_code_fetch_latency.duration();
128        self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
129        self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
130
131        let total_account_fetch_latency = self.total_account_fetch_latency.duration();
132        self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
133        self.metrics
134            .total_account_fetch_latency_gauge
135            .set(total_account_fetch_latency.as_secs_f64());
136    }
137}
138
139impl<S> Drop for InstrumentedStateProvider<S> {
140    fn drop(&mut self) {
141        self.record_total_latency();
142    }
143}
144
145/// Metrics for the instrumented state provider
146#[derive(Metrics, Clone)]
147#[metrics(scope = "sync.state_provider")]
148pub(crate) struct StateProviderMetrics {
149    /// A histogram of the time it takes to get a storage value
150    storage_fetch_latency: Histogram,
151
152    /// A histogram of the time it takes to get a code value
153    code_fetch_latency: Histogram,
154
155    /// A histogram of the time it takes to get an account value
156    account_fetch_latency: Histogram,
157
158    /// A histogram of the total time we spend fetching storage over the lifetime of this state
159    /// provider
160    total_storage_fetch_latency: Histogram,
161
162    /// A gauge of the total time we spend fetching storage over the lifetime of this state
163    /// provider
164    total_storage_fetch_latency_gauge: Gauge,
165
166    /// A histogram of the total time we spend fetching code over the lifetime of this state
167    /// provider
168    total_code_fetch_latency: Histogram,
169
170    /// A gauge of the total time we spend fetching code over the lifetime of this state provider
171    total_code_fetch_latency_gauge: Gauge,
172
173    /// A histogram of the total time we spend fetching accounts over the lifetime of this state
174    /// provider
175    total_account_fetch_latency: Histogram,
176
177    /// A gauge of the total time we spend fetching accounts over the lifetime of this state
178    /// provider
179    total_account_fetch_latency_gauge: Gauge,
180}
181
182impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
183    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
184        let start = Instant::now();
185        let res = self.state_provider.basic_account(address);
186        self.record_account_fetch(start.elapsed());
187        res
188    }
189}
190
191impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
192    fn storage(
193        &self,
194        account: Address,
195        storage_key: StorageKey,
196    ) -> ProviderResult<Option<StorageValue>> {
197        let start = Instant::now();
198        let res = self.state_provider.storage(account, storage_key);
199        self.record_storage_fetch(start.elapsed());
200        res
201    }
202
203    fn storage_by_hashed_key(
204        &self,
205        address: Address,
206        hashed_storage_key: StorageKey,
207    ) -> ProviderResult<Option<StorageValue>> {
208        let start = Instant::now();
209        let res = self.state_provider.storage_by_hashed_key(address, hashed_storage_key);
210        self.record_storage_fetch(start.elapsed());
211        res
212    }
213}
214
215impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
216    fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
217        let start = Instant::now();
218        let res = self.state_provider.bytecode_by_hash(code_hash);
219        self.record_code_fetch(start.elapsed());
220        res
221    }
222}
223
224impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
225    fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
226        self.state_provider.state_root(hashed_state)
227    }
228
229    fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
230        self.state_provider.state_root_from_nodes(input)
231    }
232
233    fn state_root_with_updates(
234        &self,
235        hashed_state: HashedPostState,
236    ) -> ProviderResult<(B256, TrieUpdates)> {
237        self.state_provider.state_root_with_updates(hashed_state)
238    }
239
240    fn state_root_from_nodes_with_updates(
241        &self,
242        input: TrieInput,
243    ) -> ProviderResult<(B256, TrieUpdates)> {
244        self.state_provider.state_root_from_nodes_with_updates(input)
245    }
246}
247
248impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
249    fn proof(
250        &self,
251        input: TrieInput,
252        address: Address,
253        slots: &[B256],
254    ) -> ProviderResult<AccountProof> {
255        self.state_provider.proof(input, address, slots)
256    }
257
258    fn multiproof(
259        &self,
260        input: TrieInput,
261        targets: MultiProofTargets,
262    ) -> ProviderResult<MultiProof> {
263        self.state_provider.multiproof(input, targets)
264    }
265
266    fn witness(
267        &self,
268        input: TrieInput,
269        target: HashedPostState,
270    ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
271        self.state_provider.witness(input, target)
272    }
273}
274
275impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
276    fn storage_root(
277        &self,
278        address: Address,
279        hashed_storage: HashedStorage,
280    ) -> ProviderResult<B256> {
281        self.state_provider.storage_root(address, hashed_storage)
282    }
283
284    fn storage_proof(
285        &self,
286        address: Address,
287        slot: B256,
288        hashed_storage: HashedStorage,
289    ) -> ProviderResult<StorageProof> {
290        self.state_provider.storage_proof(address, slot, hashed_storage)
291    }
292
293    fn storage_multiproof(
294        &self,
295        address: Address,
296        slots: &[B256],
297        hashed_storage: HashedStorage,
298    ) -> ProviderResult<StorageMultiProof> {
299        self.state_provider.storage_multiproof(address, slots, hashed_storage)
300    }
301}
302
303impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
304    fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
305        self.state_provider.block_hash(number)
306    }
307
308    fn canonical_hashes_range(
309        &self,
310        start: alloy_primitives::BlockNumber,
311        end: alloy_primitives::BlockNumber,
312    ) -> ProviderResult<Vec<B256>> {
313        self.state_provider.canonical_hashes_range(start, end)
314    }
315}
316
317impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
318    fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
319        self.state_provider.hashed_post_state(bundle_state)
320    }
321}