reth_provider/providers/state/
overlay.rs

1use alloy_primitives::{BlockNumber, B256};
2use metrics::{Counter, Histogram};
3use parking_lot::RwLock;
4use reth_db_api::DatabaseError;
5use reth_errors::{ProviderError, ProviderResult};
6use reth_metrics::Metrics;
7use reth_prune_types::PruneSegment;
8use reth_stages_types::StageId;
9use reth_storage_api::{
10    BlockNumReader, DBProvider, DatabaseProviderFactory, DatabaseProviderROFactory,
11    PruneCheckpointReader, StageCheckpointReader, TrieReader,
12};
13use reth_trie::{
14    hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
15    trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
16    updates::TrieUpdatesSorted,
17    HashedPostStateSorted, KeccakKeyHasher,
18};
19use reth_trie_db::{
20    DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
21};
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::Arc,
25    time::{Duration, Instant},
26};
27use tracing::{debug, debug_span, instrument};
28
29/// Metrics for overlay state provider operations.
30#[derive(Clone, Metrics)]
31#[metrics(scope = "storage.providers.overlay")]
32pub(crate) struct OverlayStateProviderMetrics {
33    /// Duration of creating the database provider transaction
34    create_provider_duration: Histogram,
35    /// Duration of retrieving trie updates from the database
36    retrieve_trie_reverts_duration: Histogram,
37    /// Duration of retrieving hashed state from the database
38    retrieve_hashed_state_reverts_duration: Histogram,
39    /// Size of trie updates (number of entries)
40    trie_updates_size: Histogram,
41    /// Size of hashed state (number of entries)
42    hashed_state_size: Histogram,
43    /// Overall duration of the [`OverlayStateProviderFactory::database_provider_ro`] call
44    database_provider_ro_duration: Histogram,
45    /// Number of cache misses when fetching [`Overlay`]s from the overlay cache.
46    overlay_cache_misses: Counter,
47}
48
49/// Contains all fields required to initialize an [`OverlayStateProvider`].
50#[derive(Debug, Clone)]
51struct Overlay {
52    trie_updates: Arc<TrieUpdatesSorted>,
53    hashed_post_state: Arc<HashedPostStateSorted>,
54}
55
56/// Factory for creating overlay state providers with optional reverts and overlays.
57///
58/// This factory allows building an `OverlayStateProvider` whose DB state has been reverted to a
59/// particular block, and/or with additional overlay information added on top.
60#[derive(Debug, Clone)]
61pub struct OverlayStateProviderFactory<F> {
62    /// The underlying database provider factory
63    factory: F,
64    /// Optional block hash for collecting reverts
65    block_hash: Option<B256>,
66    /// Optional trie overlay
67    trie_overlay: Option<Arc<TrieUpdatesSorted>>,
68    /// Optional hashed state overlay
69    hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
70    /// Metrics for tracking provider operations
71    metrics: OverlayStateProviderMetrics,
72    /// A cache which maps `db_tip -> Overlay`. If the db tip changes during usage of the factory
73    /// then a new entry will get added to this, but in most cases only one entry is present.
74    overlay_cache: Arc<RwLock<HashMap<BlockNumber, Overlay>>>,
75}
76
77impl<F> OverlayStateProviderFactory<F> {
78    /// Create a new overlay state provider factory
79    pub fn new(factory: F) -> Self {
80        Self {
81            factory,
82            block_hash: None,
83            trie_overlay: None,
84            hashed_state_overlay: None,
85            metrics: OverlayStateProviderMetrics::default(),
86            overlay_cache: Default::default(),
87        }
88    }
89
90    /// Set the block hash for collecting reverts. All state will be reverted to the point
91    /// _after_ this block has been processed.
92    pub const fn with_block_hash(mut self, block_hash: Option<B256>) -> Self {
93        self.block_hash = block_hash;
94        self
95    }
96
97    /// Set the trie overlay.
98    ///
99    /// This overlay will be applied on top of any reverts applied via `with_block_hash`.
100    pub fn with_trie_overlay(mut self, trie_overlay: Option<Arc<TrieUpdatesSorted>>) -> Self {
101        self.trie_overlay = trie_overlay;
102        self
103    }
104
105    /// Set the hashed state overlay
106    ///
107    /// This overlay will be applied on top of any reverts applied via `with_block_hash`.
108    pub fn with_hashed_state_overlay(
109        mut self,
110        hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
111    ) -> Self {
112        self.hashed_state_overlay = hashed_state_overlay;
113        self
114    }
115}
116
117impl<F> OverlayStateProviderFactory<F>
118where
119    F: DatabaseProviderFactory,
120    F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader + BlockNumReader,
121{
122    /// Returns the block number for [`Self`]'s `block_hash` field, if any.
123    fn get_requested_block_number(
124        &self,
125        provider: &F::Provider,
126    ) -> ProviderResult<Option<BlockNumber>> {
127        if let Some(block_hash) = self.block_hash {
128            Ok(Some(
129                provider
130                    .convert_hash_or_number(block_hash.into())?
131                    .ok_or_else(|| ProviderError::BlockHashNotFound(block_hash))?,
132            ))
133        } else {
134            Ok(None)
135        }
136    }
137
138    /// Returns the block which is at the tip of the DB, i.e. the block which the state tables of
139    /// the DB are currently synced to.
140    fn get_db_tip_block_number(&self, provider: &F::Provider) -> ProviderResult<BlockNumber> {
141        provider
142            .get_stage_checkpoint(StageId::MerkleChangeSets)?
143            .as_ref()
144            .map(|chk| chk.block_number)
145            .ok_or_else(|| ProviderError::InsufficientChangesets { requested: 0, available: 0..=0 })
146    }
147
148    /// Returns whether or not it is required to collect reverts, and validates that there are
149    /// sufficient changesets to revert to the requested block number if so.
150    ///
151    /// Returns an error if the `MerkleChangeSets` checkpoint doesn't cover the requested block.
152    /// Takes into account both the stage checkpoint and the prune checkpoint to determine the
153    /// available data range.
154    fn reverts_required(
155        &self,
156        provider: &F::Provider,
157        db_tip_block: BlockNumber,
158        requested_block: BlockNumber,
159    ) -> ProviderResult<bool> {
160        // If the requested block is the DB tip then there won't be any reverts necessary, and we
161        // can simply return Ok.
162        if db_tip_block == requested_block {
163            return Ok(false)
164        }
165
166        // Get the MerkleChangeSets prune checkpoints, which will be used to determine the lower
167        // bound.
168        let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::MerkleChangeSets)?;
169
170        // Extract the lower bound from prune checkpoint if available.
171        //
172        // If not available we assume pruning has never ran and so there is no lower bound. This
173        // should not generally happen, since MerkleChangeSets always have pruning enabled, but when
174        // starting a new node from scratch (e.g. in a test case or benchmark) it can surface.
175        //
176        // The prune checkpoint's block_number is the highest pruned block, so data is available
177        // starting from the next block
178        let lower_bound = prune_checkpoint
179            .and_then(|chk| chk.block_number)
180            .map(|block_number| block_number + 1)
181            .unwrap_or_default();
182
183        let available_range = lower_bound..=db_tip_block;
184
185        // Check if the requested block is within the available range
186        if !available_range.contains(&requested_block) {
187            return Err(ProviderError::InsufficientChangesets {
188                requested: requested_block,
189                available: available_range,
190            });
191        }
192
193        Ok(true)
194    }
195
196    /// Calculates a new [`Overlay`] given a transaction and the current db tip.
197    #[instrument(
198        level = "debug",
199        target = "providers::state::overlay",
200        skip_all,
201        fields(db_tip_block)
202    )]
203    fn calculate_overlay(
204        &self,
205        provider: &F::Provider,
206        db_tip_block: BlockNumber,
207    ) -> ProviderResult<Overlay> {
208        // Set up variables we'll use for recording metrics. There's two different code-paths here,
209        // and we want to make sure both record metrics, so we do metrics recording after.
210        let retrieve_trie_reverts_duration;
211        let retrieve_hashed_state_reverts_duration;
212        let trie_updates_total_len;
213        let hashed_state_updates_total_len;
214
215        // If block_hash is provided, collect reverts
216        let (trie_updates, hashed_post_state) = if let Some(from_block) =
217            self.get_requested_block_number(provider)? &&
218            self.reverts_required(provider, db_tip_block, from_block)?
219        {
220            // Collect trie reverts
221            let mut trie_reverts = {
222                let _guard =
223                    debug_span!(target: "providers::state::overlay", "Retrieving trie reverts")
224                        .entered();
225
226                let start = Instant::now();
227                let res = provider.trie_reverts(from_block + 1)?;
228                retrieve_trie_reverts_duration = start.elapsed();
229                res
230            };
231
232            // Collect state reverts
233            let mut hashed_state_reverts = {
234                let _guard = debug_span!(target: "providers::state::overlay", "Retrieving hashed state reverts").entered();
235
236                let start = Instant::now();
237                let res = HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(
238                    provider.tx_ref(),
239                    from_block + 1..,
240                )?;
241                retrieve_hashed_state_reverts_duration = start.elapsed();
242                res
243            };
244
245            // Extend with overlays if provided. If the reverts are empty we should just use the
246            // overlays directly, because `extend_ref` will actually clone the overlay.
247            let trie_updates = match self.trie_overlay.as_ref() {
248                Some(trie_overlay) if trie_reverts.is_empty() => Arc::clone(trie_overlay),
249                Some(trie_overlay) => {
250                    trie_reverts.extend_ref(trie_overlay);
251                    Arc::new(trie_reverts)
252                }
253                None => Arc::new(trie_reverts),
254            };
255
256            let hashed_state_updates = match self.hashed_state_overlay.as_ref() {
257                Some(hashed_state_overlay) if hashed_state_reverts.is_empty() => {
258                    Arc::clone(hashed_state_overlay)
259                }
260                Some(hashed_state_overlay) => {
261                    hashed_state_reverts.extend_ref(hashed_state_overlay);
262                    Arc::new(hashed_state_reverts)
263                }
264                None => Arc::new(hashed_state_reverts),
265            };
266
267            trie_updates_total_len = trie_updates.total_len();
268            hashed_state_updates_total_len = hashed_state_updates.total_len();
269
270            debug!(
271                target: "providers::state::overlay",
272                block_hash = ?self.block_hash,
273                ?from_block,
274                num_trie_updates = ?trie_updates_total_len,
275                num_state_updates = ?hashed_state_updates_total_len,
276                "Reverted to target block",
277            );
278
279            (trie_updates, hashed_state_updates)
280        } else {
281            // If no block_hash, use overlays directly or defaults
282            let trie_updates =
283                self.trie_overlay.clone().unwrap_or_else(|| Arc::new(TrieUpdatesSorted::default()));
284            let hashed_state = self
285                .hashed_state_overlay
286                .clone()
287                .unwrap_or_else(|| Arc::new(HashedPostStateSorted::default()));
288
289            retrieve_trie_reverts_duration = Duration::ZERO;
290            retrieve_hashed_state_reverts_duration = Duration::ZERO;
291            trie_updates_total_len = trie_updates.total_len();
292            hashed_state_updates_total_len = hashed_state.total_len();
293
294            (trie_updates, hashed_state)
295        };
296
297        // Record metrics
298        self.metrics
299            .retrieve_trie_reverts_duration
300            .record(retrieve_trie_reverts_duration.as_secs_f64());
301        self.metrics
302            .retrieve_hashed_state_reverts_duration
303            .record(retrieve_hashed_state_reverts_duration.as_secs_f64());
304        self.metrics.trie_updates_size.record(trie_updates_total_len as f64);
305        self.metrics.hashed_state_size.record(hashed_state_updates_total_len as f64);
306
307        Ok(Overlay { trie_updates, hashed_post_state })
308    }
309
310    /// Fetches an [`Overlay`] from the cache based on the current db tip block. If there is no
311    /// cached value then this calculates the [`Overlay`] and populates the cache.
312    #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
313    fn get_overlay(&self, provider: &F::Provider) -> ProviderResult<Overlay> {
314        // If we have no anchor block configured then we will never need to get trie reverts, just
315        // return the in-memory overlay.
316        if self.block_hash.is_none() {
317            let trie_updates =
318                self.trie_overlay.clone().unwrap_or_else(|| Arc::new(TrieUpdatesSorted::default()));
319            let hashed_post_state = self
320                .hashed_state_overlay
321                .clone()
322                .unwrap_or_else(|| Arc::new(HashedPostStateSorted::default()));
323            return Ok(Overlay { trie_updates, hashed_post_state })
324        }
325
326        let db_tip_block = self.get_db_tip_block_number(provider)?;
327
328        // If the overlay is present in the cache then return it directly.
329        if let Some(overlay) = self.overlay_cache.as_ref().read().get(&db_tip_block) {
330            return Ok(overlay.clone());
331        }
332
333        // If the overlay is not present then we need to calculate a new one. We grab a write lock,
334        // and then check the cache again in case some other thread populated the cache since we
335        // checked with the read-lock. If still not present we calculate and populate.
336        let mut cache_miss = false;
337        let overlay = match self.overlay_cache.as_ref().write().entry(db_tip_block) {
338            Entry::Occupied(entry) => entry.get().clone(),
339            Entry::Vacant(entry) => {
340                cache_miss = true;
341                let overlay = self.calculate_overlay(provider, db_tip_block)?;
342                entry.insert(overlay.clone());
343                overlay
344            }
345        };
346
347        if cache_miss {
348            self.metrics.overlay_cache_misses.increment(1);
349        }
350
351        Ok(overlay)
352    }
353}
354
355impl<F> DatabaseProviderROFactory for OverlayStateProviderFactory<F>
356where
357    F: DatabaseProviderFactory,
358    F::Provider: TrieReader + StageCheckpointReader + PruneCheckpointReader + BlockNumReader,
359{
360    type Provider = OverlayStateProvider<F::Provider>;
361
362    /// Create a read-only [`OverlayStateProvider`].
363    #[instrument(level = "debug", target = "providers::state::overlay", skip_all)]
364    fn database_provider_ro(&self) -> ProviderResult<OverlayStateProvider<F::Provider>> {
365        let overall_start = Instant::now();
366
367        // Get a read-only provider
368        let provider = {
369            let _guard =
370                debug_span!(target: "providers::state::overlay", "Creating db provider").entered();
371
372            let start = Instant::now();
373            let res = self.factory.database_provider_ro()?;
374            self.metrics.create_provider_duration.record(start.elapsed());
375            res
376        };
377
378        let Overlay { trie_updates, hashed_post_state } = self.get_overlay(&provider)?;
379
380        self.metrics.database_provider_ro_duration.record(overall_start.elapsed());
381        Ok(OverlayStateProvider::new(provider, trie_updates, hashed_post_state))
382    }
383}
384
385/// State provider with in-memory overlay from trie updates and hashed post state.
386///
387/// This provider uses in-memory trie updates and hashed post state as an overlay
388/// on top of a database provider, implementing [`TrieCursorFactory`] and [`HashedCursorFactory`]
389/// using the in-memory overlay factories.
390#[derive(Debug)]
391pub struct OverlayStateProvider<Provider: DBProvider> {
392    provider: Provider,
393    trie_updates: Arc<TrieUpdatesSorted>,
394    hashed_post_state: Arc<HashedPostStateSorted>,
395}
396
397impl<Provider> OverlayStateProvider<Provider>
398where
399    Provider: DBProvider,
400{
401    /// Create new overlay state provider. The `Provider` must be cloneable, which generally means
402    /// it should be wrapped in an `Arc`.
403    pub const fn new(
404        provider: Provider,
405        trie_updates: Arc<TrieUpdatesSorted>,
406        hashed_post_state: Arc<HashedPostStateSorted>,
407    ) -> Self {
408        Self { provider, trie_updates, hashed_post_state }
409    }
410}
411
412impl<Provider> TrieCursorFactory for OverlayStateProvider<Provider>
413where
414    Provider: DBProvider,
415{
416    type AccountTrieCursor<'a>
417        = <InMemoryTrieCursorFactory<
418        DatabaseTrieCursorFactory<&'a Provider::Tx>,
419        &'a TrieUpdatesSorted,
420    > as TrieCursorFactory>::AccountTrieCursor<'a>
421    where
422        Self: 'a;
423
424    type StorageTrieCursor<'a>
425        = <InMemoryTrieCursorFactory<
426        DatabaseTrieCursorFactory<&'a Provider::Tx>,
427        &'a TrieUpdatesSorted,
428    > as TrieCursorFactory>::StorageTrieCursor<'a>
429    where
430        Self: 'a;
431
432    fn account_trie_cursor(&self) -> Result<Self::AccountTrieCursor<'_>, DatabaseError> {
433        let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
434        let trie_cursor_factory =
435            InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
436        trie_cursor_factory.account_trie_cursor()
437    }
438
439    fn storage_trie_cursor(
440        &self,
441        hashed_address: B256,
442    ) -> Result<Self::StorageTrieCursor<'_>, DatabaseError> {
443        let db_trie_cursor_factory = DatabaseTrieCursorFactory::new(self.provider.tx_ref());
444        let trie_cursor_factory =
445            InMemoryTrieCursorFactory::new(db_trie_cursor_factory, self.trie_updates.as_ref());
446        trie_cursor_factory.storage_trie_cursor(hashed_address)
447    }
448}
449
450impl<Provider> HashedCursorFactory for OverlayStateProvider<Provider>
451where
452    Provider: DBProvider,
453{
454    type AccountCursor<'a>
455        = <HashedPostStateCursorFactory<
456        DatabaseHashedCursorFactory<&'a Provider::Tx>,
457        &'a Arc<HashedPostStateSorted>,
458    > as HashedCursorFactory>::AccountCursor<'a>
459    where
460        Self: 'a;
461
462    type StorageCursor<'a>
463        = <HashedPostStateCursorFactory<
464        DatabaseHashedCursorFactory<&'a Provider::Tx>,
465        &'a Arc<HashedPostStateSorted>,
466    > as HashedCursorFactory>::StorageCursor<'a>
467    where
468        Self: 'a;
469
470    fn hashed_account_cursor(&self) -> Result<Self::AccountCursor<'_>, DatabaseError> {
471        let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
472        let hashed_cursor_factory =
473            HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
474        hashed_cursor_factory.hashed_account_cursor()
475    }
476
477    fn hashed_storage_cursor(
478        &self,
479        hashed_address: B256,
480    ) -> Result<Self::StorageCursor<'_>, DatabaseError> {
481        let db_hashed_cursor_factory = DatabaseHashedCursorFactory::new(self.provider.tx_ref());
482        let hashed_cursor_factory =
483            HashedPostStateCursorFactory::new(db_hashed_cursor_factory, &self.hashed_post_state);
484        hashed_cursor_factory.hashed_storage_cursor(hashed_address)
485    }
486}