reth_engine_tree/tree/
instrumented_state.rs

1//! Implements a state provider that tracks latency metrics.
2use alloy_primitives::{Address, StorageKey, StorageValue, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode};
7use reth_provider::{
8    AccountReader, BlockHashReader, BytecodeReader, HashedPostStateProvider, StateProofProvider,
9    StateProvider, StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12    updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13    MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16    sync::atomic::{AtomicU64, Ordering},
17    time::{Duration, Instant},
18};
19
20/// Nanoseconds per second
21const NANOS_PER_SEC: u32 = 1_000_000_000;
22
23/// An atomic version of [`Duration`], using an [`AtomicU64`] to store the total nanoseconds in the
24/// duration.
25#[derive(Default)]
26pub(crate) struct AtomicDuration {
27    /// The nanoseconds part of the duration
28    ///
29    /// We would have to accumulate 584 years of nanoseconds to overflow a u64, so this is
30    /// sufficiently large for our use case. We don't expect to be adding arbitrary durations to
31    /// this value.
32    nanos: AtomicU64,
33}
34
35impl AtomicDuration {
36    /// Returns a zero duration.
37    pub(crate) const fn zero() -> Self {
38        Self { nanos: AtomicU64::new(0) }
39    }
40
41    /// Returns the duration as a [`Duration`]
42    pub(crate) fn duration(&self) -> Duration {
43        let nanos = self.nanos.load(Ordering::Relaxed);
44        let seconds = nanos / NANOS_PER_SEC as u64;
45        let nanos = nanos % NANOS_PER_SEC as u64;
46        // `as u32` is ok because we did a mod by u32 const
47        Duration::new(seconds, nanos as u32)
48    }
49
50    /// Adds a [`Duration`] to the atomic duration.
51    pub(crate) fn add_duration(&self, duration: Duration) {
52        // this is `as_nanos` but without the `as u128` - we do not expect durations over 584 years
53        // as input here
54        let total_nanos =
55            duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
56        // add the nanoseconds part of the duration
57        self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
58    }
59}
60
61/// A wrapper of a state provider and latency metrics.
62pub(crate) struct InstrumentedStateProvider<S> {
63    /// The state provider
64    state_provider: S,
65
66    /// Metrics for the instrumented state provider
67    metrics: StateProviderMetrics,
68
69    /// The total time we spend fetching storage over the lifetime of this state provider
70    total_storage_fetch_latency: AtomicDuration,
71
72    /// The total time we spend fetching code over the lifetime of this state provider
73    total_code_fetch_latency: AtomicDuration,
74
75    /// The total time we spend fetching accounts over the lifetime of this state provider
76    total_account_fetch_latency: AtomicDuration,
77}
78
79impl<S> InstrumentedStateProvider<S>
80where
81    S: StateProvider,
82{
83    /// Creates a new [`InstrumentedStateProvider`] from a state provider
84    pub(crate) fn from_state_provider(state_provider: S) -> Self {
85        Self {
86            state_provider,
87            metrics: StateProviderMetrics::default(),
88            total_storage_fetch_latency: AtomicDuration::zero(),
89            total_code_fetch_latency: AtomicDuration::zero(),
90            total_account_fetch_latency: AtomicDuration::zero(),
91        }
92    }
93}
94
95impl<S> InstrumentedStateProvider<S> {
96    /// Records the latency for a storage fetch, and increments the duration counter for the storage
97    /// fetch.
98    fn record_storage_fetch(&self, latency: Duration) {
99        self.metrics.storage_fetch_latency.record(latency);
100        self.total_storage_fetch_latency.add_duration(latency);
101    }
102
103    /// Records the latency for a code fetch, and increments the duration counter for the code
104    /// fetch.
105    fn record_code_fetch(&self, latency: Duration) {
106        self.metrics.code_fetch_latency.record(latency);
107        self.total_code_fetch_latency.add_duration(latency);
108    }
109
110    /// Records the latency for an account fetch, and increments the duration counter for the
111    /// account fetch.
112    fn record_account_fetch(&self, latency: Duration) {
113        self.metrics.account_fetch_latency.record(latency);
114        self.total_account_fetch_latency.add_duration(latency);
115    }
116
117    /// Records the total latencies into their respective gauges and histograms.
118    pub(crate) fn record_total_latency(&self) {
119        let total_storage_fetch_latency = self.total_storage_fetch_latency.duration();
120        self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
121        self.metrics
122            .total_storage_fetch_latency_gauge
123            .set(total_storage_fetch_latency.as_secs_f64());
124
125        let total_code_fetch_latency = self.total_code_fetch_latency.duration();
126        self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
127        self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
128
129        let total_account_fetch_latency = self.total_account_fetch_latency.duration();
130        self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
131        self.metrics
132            .total_account_fetch_latency_gauge
133            .set(total_account_fetch_latency.as_secs_f64());
134    }
135}
136
137/// Metrics for the instrumented state provider
138#[derive(Metrics, Clone)]
139#[metrics(scope = "sync.state_provider")]
140pub(crate) struct StateProviderMetrics {
141    /// A histogram of the time it takes to get a storage value
142    storage_fetch_latency: Histogram,
143
144    /// A histogram of the time it takes to get a code value
145    code_fetch_latency: Histogram,
146
147    /// A histogram of the time it takes to get an account value
148    account_fetch_latency: Histogram,
149
150    /// A histogram of the total time we spend fetching storage over the lifetime of this state
151    /// provider
152    total_storage_fetch_latency: Histogram,
153
154    /// A gauge of the total time we spend fetching storage over the lifetime of this state
155    /// provider
156    total_storage_fetch_latency_gauge: Gauge,
157
158    /// A histogram of the total time we spend fetching code over the lifetime of this state
159    /// provider
160    total_code_fetch_latency: Histogram,
161
162    /// A gauge of the total time we spend fetching code over the lifetime of this state provider
163    total_code_fetch_latency_gauge: Gauge,
164
165    /// A histogram of the total time we spend fetching accounts over the lifetime of this state
166    /// provider
167    total_account_fetch_latency: Histogram,
168
169    /// A gauge of the total time we spend fetching accounts over the lifetime of this state
170    /// provider
171    total_account_fetch_latency_gauge: Gauge,
172}
173
174impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
175    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
176        let start = Instant::now();
177        let res = self.state_provider.basic_account(address);
178        self.record_account_fetch(start.elapsed());
179        res
180    }
181}
182
183impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
184    fn storage(
185        &self,
186        account: Address,
187        storage_key: StorageKey,
188    ) -> ProviderResult<Option<StorageValue>> {
189        let start = Instant::now();
190        let res = self.state_provider.storage(account, storage_key);
191        self.record_storage_fetch(start.elapsed());
192        res
193    }
194}
195
196impl<S: BytecodeReader> BytecodeReader for InstrumentedStateProvider<S> {
197    fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
198        let start = Instant::now();
199        let res = self.state_provider.bytecode_by_hash(code_hash);
200        self.record_code_fetch(start.elapsed());
201        res
202    }
203}
204
205impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
206    fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
207        self.state_provider.state_root(hashed_state)
208    }
209
210    fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
211        self.state_provider.state_root_from_nodes(input)
212    }
213
214    fn state_root_with_updates(
215        &self,
216        hashed_state: HashedPostState,
217    ) -> ProviderResult<(B256, TrieUpdates)> {
218        self.state_provider.state_root_with_updates(hashed_state)
219    }
220
221    fn state_root_from_nodes_with_updates(
222        &self,
223        input: TrieInput,
224    ) -> ProviderResult<(B256, TrieUpdates)> {
225        self.state_provider.state_root_from_nodes_with_updates(input)
226    }
227}
228
229impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
230    fn proof(
231        &self,
232        input: TrieInput,
233        address: Address,
234        slots: &[B256],
235    ) -> ProviderResult<AccountProof> {
236        self.state_provider.proof(input, address, slots)
237    }
238
239    fn multiproof(
240        &self,
241        input: TrieInput,
242        targets: MultiProofTargets,
243    ) -> ProviderResult<MultiProof> {
244        self.state_provider.multiproof(input, targets)
245    }
246
247    fn witness(
248        &self,
249        input: TrieInput,
250        target: HashedPostState,
251    ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
252        self.state_provider.witness(input, target)
253    }
254}
255
256impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
257    fn storage_root(
258        &self,
259        address: Address,
260        hashed_storage: HashedStorage,
261    ) -> ProviderResult<B256> {
262        self.state_provider.storage_root(address, hashed_storage)
263    }
264
265    fn storage_proof(
266        &self,
267        address: Address,
268        slot: B256,
269        hashed_storage: HashedStorage,
270    ) -> ProviderResult<StorageProof> {
271        self.state_provider.storage_proof(address, slot, hashed_storage)
272    }
273
274    fn storage_multiproof(
275        &self,
276        address: Address,
277        slots: &[B256],
278        hashed_storage: HashedStorage,
279    ) -> ProviderResult<StorageMultiProof> {
280        self.state_provider.storage_multiproof(address, slots, hashed_storage)
281    }
282}
283
284impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
285    fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
286        self.state_provider.block_hash(number)
287    }
288
289    fn canonical_hashes_range(
290        &self,
291        start: alloy_primitives::BlockNumber,
292        end: alloy_primitives::BlockNumber,
293    ) -> ProviderResult<Vec<B256>> {
294        self.state_provider.canonical_hashes_range(start, end)
295    }
296}
297
298impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
299    fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
300        self.state_provider.hashed_post_state(bundle_state)
301    }
302}