Skip to main content

reth_trie_db/
changesets.rs

1//! Trie changeset computation and caching utilities.
2//!
3//! This module provides functionality to compute trie changesets for a given block,
4//! which represent the old trie node values before the block was processed.
5//!
6//! It also provides an efficient in-memory cache for these changesets, which is essential for:
7//! - **Reorg support**: Quickly access changesets to revert blocks during chain reorganizations
8//! - **Memory efficiency**: Automatic eviction ensures bounded memory usage
9
10use crate::{
11    DatabaseHashedCursorFactory, DatabaseStateRoot, DatabaseTrieCursorFactory, TrieTableAdapter,
12};
13use alloy_primitives::{map::B256Map, BlockNumber, B256};
14use parking_lot::RwLock;
15use reth_primitives_traits::FastInstant as Instant;
16use reth_storage_api::{
17    BlockNumReader, ChangeSetReader, DBProvider, StageCheckpointReader, StorageChangeSetReader,
18    StorageSettingsCache,
19};
20use reth_storage_errors::provider::{ProviderError, ProviderResult};
21use reth_trie::{
22    changesets::compute_trie_changesets,
23    trie_cursor::{InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory},
24    TrieInputSorted,
25};
26use reth_trie_common::updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted};
27use std::{
28    collections::BTreeMap,
29    fmt,
30    ops::RangeInclusive,
31    sync::{Arc, OnceLock},
32};
33use tracing::{debug, debug_span, warn};
34
35#[cfg(feature = "metrics")]
36use reth_metrics::{
37    metrics::{Counter, Gauge},
38    Metrics,
39};
40
41/// Computes trie changesets for a block.
42///
43/// # Algorithm
44///
45/// For block N:
46/// 1. Query cumulative `HashedPostState` revert for block N-1 (from db tip to after N-1)
47/// 2. Use that to calculate cumulative `TrieUpdates` revert for block N-1
48/// 3. Query per-block `HashedPostState` revert for block N
49/// 4. Create prefix sets from the per-block revert (step 3)
50/// 5. Create overlay with cumulative trie updates and cumulative state revert for N-1
51/// 6. Calculate trie updates for block N using the overlay and per-block `HashedPostState`.
52/// 7. Compute changesets using the N-1 overlay and the newly calculated trie updates for N
53///
54/// # Arguments
55///
56/// * `provider` - Database provider with changeset access
57/// * `block_number` - Block number to compute changesets for
58///
59/// # Returns
60///
61/// Changesets (old trie node values) for the specified block
62///
63/// # Errors
64///
65/// Returns error if:
66/// - Block number exceeds database tip (based on Finish stage checkpoint)
67/// - Database access fails
68/// - State root computation fails
69pub fn compute_block_trie_changesets<Provider>(
70    provider: &Provider,
71    block_number: BlockNumber,
72) -> Result<TrieUpdatesSorted, ProviderError>
73where
74    Provider: DBProvider
75        + StageCheckpointReader
76        + ChangeSetReader
77        + StorageChangeSetReader
78        + BlockNumReader
79        + StorageSettingsCache,
80{
81    crate::with_adapter!(provider, |A| {
82        compute_block_trie_changesets_inner::<_, A>(provider, block_number)
83    })
84}
85
86fn compute_block_trie_changesets_inner<Provider, A>(
87    provider: &Provider,
88    block_number: BlockNumber,
89) -> Result<TrieUpdatesSorted, ProviderError>
90where
91    Provider: DBProvider
92        + StageCheckpointReader
93        + ChangeSetReader
94        + StorageChangeSetReader
95        + BlockNumReader
96        + StorageSettingsCache,
97    A: TrieTableAdapter,
98{
99    debug!(
100        target: "trie::changeset_cache",
101        block_number,
102        "Computing block trie changesets from database state"
103    );
104
105    // Step 1: Collect/calculate state reverts
106
107    // This is just the changes from this specific block
108    let individual_state_revert =
109        crate::state::from_reverts_auto(provider, block_number..=block_number)?;
110
111    // This reverts all changes from db tip back to just after block was processed
112    let cumulative_state_revert = crate::state::from_reverts_auto(provider, (block_number + 1)..)?;
113
114    // This reverts all changes from db tip back to just after block-1 was processed
115    let mut cumulative_state_revert_prev = cumulative_state_revert.clone();
116    cumulative_state_revert_prev.extend_ref_and_sort(&individual_state_revert);
117
118    // Step 2: Calculate cumulative trie updates revert for block-1
119    // This gives us the trie state as it was after block-1 was processed
120    let prefix_sets_prev = cumulative_state_revert_prev.construct_prefix_sets();
121    let input_prev = TrieInputSorted::new(
122        Arc::default(),
123        Arc::new(cumulative_state_revert_prev),
124        prefix_sets_prev,
125    );
126
127    type DbStateRoot<'a, TX, A> = reth_trie::StateRoot<
128        DatabaseTrieCursorFactory<&'a TX, A>,
129        DatabaseHashedCursorFactory<&'a TX>,
130    >;
131
132    let cumulative_trie_updates_prev =
133        DbStateRoot::<_, A>::overlay_root_from_nodes_with_updates(provider.tx_ref(), input_prev)
134            .map_err(ProviderError::other)?
135            .1
136            .into_sorted();
137
138    // Step 3: Create prefix sets from individual revert (only paths changed by this block)
139    let prefix_sets = individual_state_revert.construct_prefix_sets();
140
141    // Step 4: Calculate trie updates for block
142    // Use cumulative trie updates for block-1 as the node overlay and cumulative state for block
143    let input = TrieInputSorted::new(
144        Arc::new(cumulative_trie_updates_prev.clone()),
145        Arc::new(cumulative_state_revert),
146        prefix_sets,
147    );
148
149    let trie_updates =
150        DbStateRoot::<_, A>::overlay_root_from_nodes_with_updates(provider.tx_ref(), input)
151            .map_err(ProviderError::other)?
152            .1
153            .into_sorted();
154
155    // Step 5: Compute changesets using cumulative trie updates for block-1 as overlay
156    // Create an overlay cursor factory that has the trie state from after block-1
157    let db_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(provider.tx_ref());
158    let overlay_factory =
159        InMemoryTrieCursorFactory::new(db_cursor_factory, &cumulative_trie_updates_prev);
160
161    let changesets =
162        compute_trie_changesets(&overlay_factory, &trie_updates).map_err(ProviderError::other)?;
163
164    debug!(
165        target: "trie::changeset_cache",
166        block_number,
167        num_account_nodes = changesets.account_nodes_ref().len(),
168        num_storage_tries = changesets.storage_tries_ref().len(),
169        "Computed block trie changesets successfully"
170    );
171
172    Ok(changesets)
173}
174
175/// Computes block trie updates using the changeset cache.
176///
177/// # Algorithm
178///
179/// For block N:
180/// 1. Get cumulative trie reverts from block N+1 to db tip using the cache
181/// 2. Create an overlay cursor factory with these reverts (representing trie state after block N)
182/// 3. Walk through account trie changesets for block N
183/// 4. For each changed path, look up the current value using the overlay cursor
184/// 5. Walk through storage trie changesets for block N
185/// 6. For each changed path, look up the current value using the overlay cursor
186/// 7. Return the collected trie updates
187///
188/// # Arguments
189///
190/// * `cache` - Handle to the changeset cache for retrieving trie reverts
191/// * `provider` - Database provider for accessing changesets and block data
192/// * `block_number` - Block number to compute trie updates for
193///
194/// # Returns
195///
196/// Trie updates representing the state of trie nodes after the block was processed
197///
198/// # Errors
199///
200/// Returns error if:
201/// - Block number exceeds database tip
202/// - Database access fails
203/// - Cache retrieval fails
204pub fn compute_block_trie_updates<Provider>(
205    cache: &ChangesetCache,
206    provider: &Provider,
207    block_number: BlockNumber,
208) -> ProviderResult<TrieUpdatesSorted>
209where
210    Provider: DBProvider
211        + StageCheckpointReader
212        + ChangeSetReader
213        + StorageChangeSetReader
214        + BlockNumReader
215        + StorageSettingsCache,
216{
217    crate::with_adapter!(provider, |A| {
218        compute_block_trie_updates_inner::<_, A>(cache, provider, block_number)
219    })
220}
221
222fn compute_block_trie_updates_inner<Provider, A>(
223    cache: &ChangesetCache,
224    provider: &Provider,
225    block_number: BlockNumber,
226) -> ProviderResult<TrieUpdatesSorted>
227where
228    Provider: DBProvider
229        + StageCheckpointReader
230        + ChangeSetReader
231        + StorageChangeSetReader
232        + BlockNumReader
233        + StorageSettingsCache,
234    A: TrieTableAdapter,
235{
236    let tx = provider.tx_ref();
237
238    // Get the database tip block number
239    let db_tip_block = provider
240        .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
241        .as_ref()
242        .map(|chk| chk.block_number)
243        .ok_or_else(|| ProviderError::InsufficientChangesets {
244            requested: block_number,
245            available: 0..=0,
246        })?;
247
248    // Step 1: Get the block hash for the target block
249    let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
250        ProviderError::other(std::io::Error::new(
251            std::io::ErrorKind::NotFound,
252            format!("block hash not found for block number {}", block_number),
253        ))
254    })?;
255
256    // Step 2: Get the trie changesets for the target block from cache
257    let changesets = cache.get_or_compute(block_hash, block_number, provider)?;
258
259    // Step 3: Get the trie reverts for the state after the target block using the cache
260    let reverts = cache.get_or_compute_range(provider, (block_number + 1)..=db_tip_block)?;
261
262    // Step 4: Create an InMemoryTrieCursorFactory with the reverts
263    // This gives us the trie state as it was after the target block was processed
264    let db_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
265    let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
266
267    // Step 5: Collect all account trie nodes that changed in the target block
268    let account_nodes_ref = changesets.account_nodes_ref();
269    let mut account_nodes = Vec::with_capacity(account_nodes_ref.len());
270    let mut account_cursor = cursor_factory.account_trie_cursor()?;
271
272    // Iterate over the account nodes from the changesets
273    for (nibbles, _old_node) in account_nodes_ref {
274        // Look up the current value of this trie node using the overlay cursor
275        let node_value = account_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
276        account_nodes.push((*nibbles, node_value));
277    }
278
279    // Step 6: Collect all storage trie nodes that changed in the target block
280    let mut storage_tries = B256Map::default();
281
282    // Iterate over the storage tries from the changesets
283    for (hashed_address, storage_changeset) in changesets.storage_tries_ref() {
284        let mut storage_cursor = cursor_factory.storage_trie_cursor(*hashed_address)?;
285        let storage_nodes_ref = storage_changeset.storage_nodes_ref();
286        let mut storage_nodes = Vec::with_capacity(storage_nodes_ref.len());
287
288        // Iterate over the storage nodes for this account
289        for (nibbles, _old_node) in storage_nodes_ref {
290            // Look up the current value of this storage trie node
291            let node_value = storage_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
292            storage_nodes.push((*nibbles, node_value));
293        }
294
295        storage_tries.insert(
296            *hashed_address,
297            StorageTrieUpdatesSorted { storage_nodes, is_deleted: storage_changeset.is_deleted },
298        );
299    }
300
301    Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
302}
303
304/// A pending changeset computation that other threads can wait on.
305///
306/// When a deferred trie task starts computing changesets for a block, it registers
307/// a pending entry. If another thread needs the same changeset before the computation
308/// finishes, it waits on this entry instead of falling back to the expensive
309/// DB-based computation.
310struct PendingChangeset {
311    /// `None` when cancelled (e.g. due to panic), `Some(..)` when resolved with data.
312    result: OnceLock<Option<Arc<TrieUpdatesSorted>>>,
313}
314
315impl PendingChangeset {
316    const fn new() -> Self {
317        Self { result: OnceLock::new() }
318    }
319
320    /// Blocks until the computation finishes. Returns `Some` if resolved with data,
321    /// `None` if the computation was cancelled.
322    fn wait(&self) -> Option<Arc<TrieUpdatesSorted>> {
323        let _span =
324            debug_span!(target: "trie::changeset_cache", "waiting_for_pending_changeset").entered();
325        self.result.wait().clone()
326    }
327
328    /// Resolves the pending computation with the given result, waking all waiters.
329    fn resolve(&self, changesets: Arc<TrieUpdatesSorted>) {
330        let _ = self.result.set(Some(changesets));
331    }
332
333    /// Cancels the pending computation, waking all waiters so they fall through
334    /// to the DB fallback.
335    fn cancel(&self) {
336        let _ = self.result.set(None);
337    }
338}
339
340impl fmt::Debug for PendingChangeset {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        let is_resolved = self.result.get().is_some();
343        f.debug_struct("PendingChangeset").field("resolved", &is_resolved).finish()
344    }
345}
346
347/// Thread-safe changeset cache.
348///
349/// This type wraps a shared, mutable reference to the cache inner.
350/// The `RwLock` enables concurrent reads while ensuring exclusive access for writes.
351#[derive(Debug, Clone)]
352pub struct ChangesetCache {
353    inner: Arc<RwLock<ChangesetCacheInner>>,
354}
355
356impl Default for ChangesetCache {
357    fn default() -> Self {
358        Self::new()
359    }
360}
361
362impl ChangesetCache {
363    /// Creates a new cache.
364    ///
365    /// The cache has no capacity limit and relies on explicit eviction
366    /// via the `evict()` method to manage memory usage.
367    pub fn new() -> Self {
368        Self { inner: Arc::new(RwLock::new(ChangesetCacheInner::new())) }
369    }
370
371    /// Retrieves changesets for a block by hash.
372    ///
373    /// Returns `None` if the block is not in the cache (either evicted or never computed).
374    /// Updates hit/miss metrics accordingly.
375    pub fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
376        self.inner.read().get(block_hash)
377    }
378
379    /// Inserts changesets for a block into the cache.
380    ///
381    /// Also resolves any pending computation for this block hash, waking threads
382    /// that are waiting for the result.
383    ///
384    /// This method does not perform any eviction. Eviction must be explicitly
385    /// triggered by calling `evict()`.
386    ///
387    /// # Arguments
388    ///
389    /// * `block_hash` - Hash of the block
390    /// * `block_number` - Block number for tracking and eviction
391    /// * `changesets` - Trie changesets to cache
392    fn insert(&self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
393        let pending = {
394            let mut cache = self.inner.write();
395            cache.insert(block_hash, block_number, Arc::clone(&changesets));
396            cache.pending.remove(&block_hash)
397        };
398
399        // Resolve pending entry outside the write lock to avoid holding it
400        // while waiters wake up.
401        if let Some(pending) = pending {
402            pending.resolve(changesets);
403        }
404    }
405
406    /// Registers a pending changeset computation for the given block hash.
407    ///
408    /// Call this before starting changeset computation so that concurrent
409    /// readers can wait for the result instead of falling back to the expensive
410    /// DB-based computation.
411    ///
412    /// The returned [`PendingChangesetGuard`] must be used to resolve or cancel
413    /// the pending entry. If dropped without resolving (e.g. due to a panic),
414    /// the pending entry is automatically removed from the cache so that
415    /// waiters fall through to the DB fallback.
416    pub fn register_pending(&self, block_hash: B256) -> PendingChangesetGuard {
417        let pending = Arc::new(PendingChangeset::new());
418        let prev = self.inner.write().pending.insert(block_hash, Arc::clone(&pending));
419        debug_assert!(prev.is_none(), "duplicate pending changeset for {block_hash:?}");
420        PendingChangesetGuard { cache: self.clone(), block_hash, pending: Some(pending) }
421    }
422
423    /// Evicts changesets for blocks below the given block number.
424    ///
425    /// This should be called after blocks are persisted to the database to free
426    /// memory for changesets that are no longer needed in the cache.
427    ///
428    /// # Arguments
429    ///
430    /// * `up_to_block` - Evict blocks with number < this value. Blocks with number >= this value
431    ///   are retained.
432    pub fn evict(&self, up_to_block: BlockNumber) {
433        self.inner.write().evict(up_to_block)
434    }
435
436    /// Gets changesets from cache, or computes them on-the-fly if missing.
437    ///
438    /// This is the primary API for retrieving changesets. It checks three sources in order:
439    /// 1. **Cache hit** — returns immediately
440    /// 2. **Pending computation** — blocks until the deferred trie task finishes
441    /// 3. **DB fallback** — computes from database state (expensive)
442    ///
443    /// # Arguments
444    ///
445    /// * `block_hash` - Hash of the block to get changesets for
446    /// * `block_number` - Block number (for cache insertion and logging)
447    /// * `provider` - Database provider for DB access
448    ///
449    /// # Returns
450    ///
451    /// Changesets for the block, either from cache, a pending computation, or computed on-the-fly
452    pub fn get_or_compute<P>(
453        &self,
454        block_hash: B256,
455        block_number: u64,
456        provider: &P,
457    ) -> ProviderResult<Arc<TrieUpdatesSorted>>
458    where
459        P: DBProvider
460            + StageCheckpointReader
461            + ChangeSetReader
462            + StorageChangeSetReader
463            + BlockNumReader
464            + StorageSettingsCache,
465    {
466        // Try cache first, and if missing, check for a pending computation.
467        let pending = {
468            let cache = self.inner.read();
469            if let Some(changesets) = cache.get(&block_hash) {
470                debug!(
471                    target: "trie::changeset_cache",
472                    ?block_hash,
473                    block_number,
474                    "Changeset cache HIT"
475                );
476                return Ok(changesets);
477            }
478            cache.pending.get(&block_hash).cloned()
479        };
480
481        // If there's a pending computation, wait for it instead of computing from DB.
482        if let Some(pending) = pending {
483            debug!(
484                target: "trie::changeset_cache",
485                ?block_hash,
486                block_number,
487                "Changeset cache MISS but pending computation found, waiting"
488            );
489
490            let start = Instant::now();
491
492            if let Some(changesets) = pending.wait() {
493                debug!(
494                    target: "trie::changeset_cache",
495                    ?block_hash,
496                    block_number,
497                    elapsed = ?start.elapsed(),
498                    "Pending changeset resolved"
499                );
500                return Ok(changesets);
501            }
502
503            debug!(
504                target: "trie::changeset_cache",
505                ?block_hash,
506                block_number,
507                elapsed = ?start.elapsed(),
508                "Pending changeset was cancelled, falling through to DB computation"
509            );
510        }
511
512        // No cache hit and no pending computation - compute from database
513        warn!(
514            target: "trie::changeset_cache",
515            ?block_hash,
516            block_number,
517            "Changeset cache MISS, falling back to DB-based computation"
518        );
519
520        let start = Instant::now();
521
522        // Compute changesets
523        let changesets = compute_block_trie_changesets(provider, block_number)?;
524
525        let changesets = Arc::new(changesets);
526        let elapsed = start.elapsed();
527
528        debug!(
529            target: "trie::changeset_cache",
530            ?elapsed,
531            block_number,
532            ?block_hash,
533            "Changeset computed from database and inserting into cache"
534        );
535
536        // Store in cache (with write lock)
537        self.insert(block_hash, block_number, Arc::clone(&changesets));
538
539        debug!(
540            target: "trie::changeset_cache",
541            ?block_hash,
542            block_number,
543            "Changeset successfully cached"
544        );
545
546        Ok(changesets)
547    }
548
549    /// Gets or computes accumulated trie reverts for a range of blocks.
550    ///
551    /// This method retrieves and accumulates all trie changesets (reverts) for the specified
552    /// block range (inclusive). The changesets are accumulated in reverse order (newest to oldest)
553    /// so that older values take precedence when there are conflicts.
554    ///
555    /// # Arguments
556    ///
557    /// * `provider` - Database provider for DB access and block lookups
558    /// * `range` - Block range to accumulate reverts for (inclusive)
559    ///
560    /// # Returns
561    ///
562    /// Accumulated trie reverts for all blocks in the specified range
563    ///
564    /// # Errors
565    ///
566    /// Returns error if:
567    /// - Any block in the range is beyond the database tip
568    /// - Database access fails
569    /// - Block hash lookup fails
570    /// - Changeset computation fails
571    pub fn get_or_compute_range<P>(
572        &self,
573        provider: &P,
574        range: RangeInclusive<BlockNumber>,
575    ) -> ProviderResult<TrieUpdatesSorted>
576    where
577        P: DBProvider
578            + StageCheckpointReader
579            + ChangeSetReader
580            + StorageChangeSetReader
581            + BlockNumReader
582            + StorageSettingsCache,
583    {
584        // Get the database tip block number
585        let db_tip_block = provider
586            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
587            .as_ref()
588            .map(|chk| chk.block_number)
589            .ok_or_else(|| ProviderError::InsufficientChangesets {
590                requested: *range.start(),
591                available: 0..=0,
592            })?;
593
594        let start_block = *range.start();
595        let end_block = *range.end();
596
597        // If range end is beyond the tip, return an error
598        if end_block > db_tip_block {
599            return Err(ProviderError::InsufficientChangesets {
600                requested: end_block,
601                available: 0..=db_tip_block,
602            });
603        }
604
605        let timer = Instant::now();
606
607        debug!(
608            target: "trie::changeset_cache",
609            start_block,
610            end_block,
611            db_tip_block,
612            "Starting get_or_compute_range"
613        );
614
615        // Use changeset cache to retrieve and accumulate reverts block by block.
616        // Iterate in reverse order (newest to oldest) so that older changesets
617        // take precedence when there are conflicting updates.
618        let mut accumulated_reverts = TrieUpdatesSorted::default();
619
620        for block_number in range.rev() {
621            // Get the block hash for this block number
622            let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
623                ProviderError::other(std::io::Error::new(
624                    std::io::ErrorKind::NotFound,
625                    format!("block hash not found for block number {}", block_number),
626                ))
627            })?;
628
629            debug!(
630                target: "trie::changeset_cache",
631                block_number,
632                ?block_hash,
633                "Looked up block hash for block number in range"
634            );
635
636            // Get changesets from cache (or compute on-the-fly)
637            let changesets = self.get_or_compute(block_hash, block_number, provider)?;
638
639            // Overlay this block's changesets on top of accumulated reverts.
640            // Since we iterate newest to oldest, older values are added last
641            // and overwrite any conflicting newer values (oldest changeset values take
642            // precedence).
643            accumulated_reverts.extend_ref_and_sort(&changesets);
644        }
645
646        let elapsed = timer.elapsed();
647
648        let num_account_nodes = accumulated_reverts.account_nodes_ref().len();
649        let num_storage_tries = accumulated_reverts.storage_tries_ref().len();
650
651        debug!(
652            target: "trie::changeset_cache",
653            ?elapsed,
654            start_block,
655            end_block,
656            num_blocks = end_block.saturating_sub(start_block).saturating_add(1),
657            num_account_nodes,
658            num_storage_tries,
659            "Finished accumulating trie reverts for block range"
660        );
661
662        Ok(accumulated_reverts)
663    }
664}
665
666/// Guard for a pending changeset computation.
667///
668/// Returned by [`ChangesetCache::register_pending`]. Must be resolved via [`Self::resolve`]
669/// to insert the computed changesets into the cache and wake waiting threads.
670///
671/// If dropped without resolving (e.g. due to a panic), the pending entry is automatically
672/// cancelled so waiters fall through to the DB fallback.
673#[must_use = "call .resolve() to insert changesets into the cache"]
674pub struct PendingChangesetGuard {
675    cache: ChangesetCache,
676    block_hash: B256,
677    /// `None` after [`Self::resolve`] has been called.
678    pending: Option<Arc<PendingChangeset>>,
679}
680
681impl PendingChangesetGuard {
682    /// Resolves the pending computation by inserting the changesets into the cache
683    /// and waking all waiting threads.
684    pub fn resolve(mut self, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
685        self.cache.insert(self.block_hash, block_number, changesets);
686        self.pending = None;
687    }
688}
689
690impl fmt::Debug for PendingChangesetGuard {
691    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
692        f.debug_struct("PendingChangesetGuard").field("block_hash", &self.block_hash).finish()
693    }
694}
695
696impl Drop for PendingChangesetGuard {
697    fn drop(&mut self) {
698        let Some(pending) = self.pending.take() else {
699            // Guard was resolved successfully already, no-op
700            return
701        };
702
703        let removed = self.cache.inner.write().pending.remove(&self.block_hash);
704        if let Some(removed) = removed {
705            if Arc::ptr_eq(&removed, &pending) {
706                debug!(
707                    target: "trie::changeset_cache",
708                    block_hash = ?self.block_hash,
709                    "Pending changeset dropped without resolution, cancelling"
710                );
711                removed.cancel();
712            } else {
713                // Put it back — it belongs to a different registration.
714                self.cache.inner.write().pending.insert(self.block_hash, removed);
715            }
716        }
717    }
718}
719
720/// In-memory cache for trie changesets with explicit eviction policy.
721///
722/// Holds changesets for blocks that have been validated but not yet persisted.
723/// Keyed by block hash for fast lookup during reorgs. Eviction is controlled
724/// explicitly by the engine API tree handler when persistence completes.
725///
726/// ## Eviction Policy
727///
728/// Unlike traditional caches with automatic eviction, this cache requires explicit
729/// eviction calls. The engine API tree handler calls `evict(block_number)` after
730/// blocks are persisted to the database, ensuring changesets remain available
731/// until their corresponding blocks are safely on disk.
732///
733/// ## Metrics
734///
735/// The cache maintains several metrics for observability:
736/// - `hits`: Number of successful cache lookups
737/// - `misses`: Number of failed cache lookups
738/// - `evictions`: Number of blocks evicted
739/// - `size`: Current number of cached blocks
740#[derive(Debug)]
741struct ChangesetCacheInner {
742    /// Cache entries: block hash -> (block number, changesets)
743    entries: B256Map<(u64, Arc<TrieUpdatesSorted>)>,
744
745    /// Block number to hashes mapping for eviction
746    block_numbers: BTreeMap<u64, Vec<B256>>,
747
748    /// Pending changeset computations: block hash -> pending entry.
749    /// Threads waiting on a pending entry will block until it's resolved.
750    pending: B256Map<Arc<PendingChangeset>>,
751
752    /// Metrics for monitoring cache behavior
753    #[cfg(feature = "metrics")]
754    metrics: ChangesetCacheMetrics,
755}
756
757#[cfg(feature = "metrics")]
758/// Metrics for the changeset cache.
759///
760/// These metrics provide visibility into cache performance and help identify
761/// potential issues like high miss rates.
762#[derive(Metrics, Clone)]
763#[metrics(scope = "trie.changeset_cache")]
764struct ChangesetCacheMetrics {
765    /// Cache hit counter
766    hits: Counter,
767
768    /// Cache miss counter
769    misses: Counter,
770
771    /// Eviction counter
772    evictions: Counter,
773
774    /// Current cache size (number of entries)
775    size: Gauge,
776}
777
778impl Default for ChangesetCacheInner {
779    fn default() -> Self {
780        Self::new()
781    }
782}
783
784impl ChangesetCacheInner {
785    /// Creates a new empty changeset cache.
786    ///
787    /// The cache has no capacity limit and relies on explicit eviction
788    /// via the `evict()` method to manage memory usage.
789    fn new() -> Self {
790        Self {
791            entries: B256Map::default(),
792            block_numbers: BTreeMap::new(),
793            pending: B256Map::default(),
794            #[cfg(feature = "metrics")]
795            metrics: Default::default(),
796        }
797    }
798
799    fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
800        match self.entries.get(block_hash) {
801            Some((_, changesets)) => {
802                #[cfg(feature = "metrics")]
803                self.metrics.hits.increment(1);
804                Some(Arc::clone(changesets))
805            }
806            None => {
807                #[cfg(feature = "metrics")]
808                self.metrics.misses.increment(1);
809                None
810            }
811        }
812    }
813
814    fn insert(&mut self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
815        debug!(
816            target: "trie::changeset_cache",
817            ?block_hash,
818            block_number,
819            cache_size_before = self.entries.len(),
820            "Inserting changeset into cache"
821        );
822
823        // Insert the entry
824        self.entries.insert(block_hash, (block_number, changesets));
825
826        // Add block hash to block_numbers mapping
827        self.block_numbers.entry(block_number).or_default().push(block_hash);
828
829        // Update size metric
830        #[cfg(feature = "metrics")]
831        self.metrics.size.set(self.entries.len() as f64);
832
833        debug!(
834            target: "trie::changeset_cache",
835            ?block_hash,
836            block_number,
837            cache_size_after = self.entries.len(),
838            "Changeset inserted into cache"
839        );
840    }
841
842    fn evict(&mut self, up_to_block: BlockNumber) {
843        debug!(
844            target: "trie::changeset_cache",
845            up_to_block,
846            cache_size_before = self.entries.len(),
847            "Starting cache eviction"
848        );
849
850        // Find all block numbers that should be evicted (< up_to_block)
851        let blocks_to_evict: Vec<u64> =
852            self.block_numbers.range(..up_to_block).map(|(num, _)| *num).collect();
853
854        // Remove entries for each block number below threshold
855        #[cfg(feature = "metrics")]
856        let mut evicted_count = 0;
857        #[cfg(not(feature = "metrics"))]
858        let mut evicted_count = 0;
859
860        for block_number in &blocks_to_evict {
861            if let Some(hashes) = self.block_numbers.remove(block_number) {
862                debug!(
863                    target: "trie::changeset_cache",
864                    block_number,
865                    num_hashes = hashes.len(),
866                    "Evicting block from cache"
867                );
868                for hash in hashes {
869                    if self.entries.remove(&hash).is_some() {
870                        evicted_count += 1;
871                    }
872                }
873            }
874        }
875
876        debug!(
877            target: "trie::changeset_cache",
878            up_to_block,
879            evicted_count,
880            cache_size_after = self.entries.len(),
881            "Finished cache eviction"
882        );
883
884        // Update metrics if we evicted anything
885        #[cfg(feature = "metrics")]
886        if evicted_count > 0 {
887            self.metrics.evictions.increment(evicted_count as u64);
888            self.metrics.size.set(self.entries.len() as f64);
889        }
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896    use alloy_primitives::map::{B256Map, HashMap};
897
898    // Helper function to create empty TrieUpdatesSorted for testing
899    fn create_test_changesets() -> Arc<TrieUpdatesSorted> {
900        Arc::new(TrieUpdatesSorted::new(vec![], B256Map::default()))
901    }
902
903    #[test]
904    fn test_insert_and_retrieve_single_entry() {
905        let mut cache = ChangesetCacheInner::new();
906        let hash = B256::random();
907        let changesets = create_test_changesets();
908
909        cache.insert(hash, 100, Arc::clone(&changesets));
910
911        // Should be able to retrieve it
912        let retrieved = cache.get(&hash);
913        assert!(retrieved.is_some());
914        assert_eq!(cache.entries.len(), 1);
915    }
916
917    #[test]
918    fn test_insert_multiple_entries() {
919        let mut cache = ChangesetCacheInner::new();
920
921        // Insert 10 blocks
922        let mut hashes = Vec::new();
923        for i in 0..10 {
924            let hash = B256::random();
925            cache.insert(hash, 100 + i, create_test_changesets());
926            hashes.push(hash);
927        }
928
929        // Should be able to retrieve all
930        assert_eq!(cache.entries.len(), 10);
931        for hash in &hashes {
932            assert!(cache.get(hash).is_some());
933        }
934    }
935
936    #[test]
937    fn test_eviction_when_explicitly_called() {
938        let mut cache = ChangesetCacheInner::new();
939
940        // Insert 15 blocks (0-14)
941        let mut hashes = Vec::new();
942        for i in 0..15 {
943            let hash = B256::random();
944            cache.insert(hash, i, create_test_changesets());
945            hashes.push((i, hash));
946        }
947
948        // All blocks should be present (no automatic eviction)
949        assert_eq!(cache.entries.len(), 15);
950
951        // Explicitly evict blocks < 4
952        cache.evict(4);
953
954        // Blocks 0-3 should be evicted
955        assert_eq!(cache.entries.len(), 11); // blocks 4-14 = 11 blocks
956
957        // Verify blocks 0-3 are evicted
958        for i in 0..4 {
959            assert!(cache.get(&hashes[i as usize].1).is_none(), "Block {} should be evicted", i);
960        }
961
962        // Verify blocks 4-14 are still present
963        for i in 4..15 {
964            assert!(cache.get(&hashes[i as usize].1).is_some(), "Block {} should be present", i);
965        }
966    }
967
968    #[test]
969    fn test_eviction_with_persistence_watermark() {
970        let mut cache = ChangesetCacheInner::new();
971
972        // Insert blocks 100-165
973        let mut hashes = HashMap::new();
974        for i in 100..=165 {
975            let hash = B256::random();
976            cache.insert(hash, i, create_test_changesets());
977            hashes.insert(i, hash);
978        }
979
980        // All blocks should be present (no automatic eviction)
981        assert_eq!(cache.entries.len(), 66);
982
983        // Simulate persistence up to block 164, with 64-block retention window
984        // Eviction threshold = 164 - 64 = 100
985        cache.evict(100);
986
987        // Blocks 100-165 should remain (66 blocks)
988        assert_eq!(cache.entries.len(), 66);
989
990        // Simulate persistence up to block 165
991        // Eviction threshold = 165 - 64 = 101
992        cache.evict(101);
993
994        // Blocks 101-165 should remain (65 blocks)
995        assert_eq!(cache.entries.len(), 65);
996        assert!(cache.get(&hashes[&100]).is_none());
997        assert!(cache.get(&hashes[&101]).is_some());
998    }
999
1000    #[test]
1001    fn test_out_of_order_inserts_with_explicit_eviction() {
1002        let mut cache = ChangesetCacheInner::new();
1003
1004        // Insert blocks in random order
1005        let hash_10 = B256::random();
1006        cache.insert(hash_10, 10, create_test_changesets());
1007
1008        let hash_5 = B256::random();
1009        cache.insert(hash_5, 5, create_test_changesets());
1010
1011        let hash_15 = B256::random();
1012        cache.insert(hash_15, 15, create_test_changesets());
1013
1014        let hash_3 = B256::random();
1015        cache.insert(hash_3, 3, create_test_changesets());
1016
1017        // All blocks should be present (no automatic eviction)
1018        assert_eq!(cache.entries.len(), 4);
1019
1020        // Explicitly evict blocks < 5
1021        cache.evict(5);
1022
1023        assert!(cache.get(&hash_3).is_none(), "Block 3 should be evicted");
1024        assert!(cache.get(&hash_5).is_some(), "Block 5 should be present");
1025        assert!(cache.get(&hash_10).is_some(), "Block 10 should be present");
1026        assert!(cache.get(&hash_15).is_some(), "Block 15 should be present");
1027    }
1028
1029    #[test]
1030    fn test_multiple_blocks_same_number() {
1031        let mut cache = ChangesetCacheInner::new();
1032
1033        // Insert multiple blocks with same number (side chains)
1034        let hash_1a = B256::random();
1035        let hash_1b = B256::random();
1036        cache.insert(hash_1a, 100, create_test_changesets());
1037        cache.insert(hash_1b, 100, create_test_changesets());
1038
1039        // Both should be retrievable
1040        assert!(cache.get(&hash_1a).is_some());
1041        assert!(cache.get(&hash_1b).is_some());
1042        assert_eq!(cache.entries.len(), 2);
1043    }
1044
1045    #[test]
1046    fn test_eviction_removes_all_side_chains() {
1047        let mut cache = ChangesetCacheInner::new();
1048
1049        // Insert multiple blocks at same height (side chains)
1050        let hash_10a = B256::random();
1051        let hash_10b = B256::random();
1052        let hash_10c = B256::random();
1053        cache.insert(hash_10a, 10, create_test_changesets());
1054        cache.insert(hash_10b, 10, create_test_changesets());
1055        cache.insert(hash_10c, 10, create_test_changesets());
1056
1057        let hash_20 = B256::random();
1058        cache.insert(hash_20, 20, create_test_changesets());
1059
1060        assert_eq!(cache.entries.len(), 4);
1061
1062        // Evict blocks < 15 - should remove all three side chains at height 10
1063        cache.evict(15);
1064
1065        assert_eq!(cache.entries.len(), 1);
1066        assert!(cache.get(&hash_10a).is_none());
1067        assert!(cache.get(&hash_10b).is_none());
1068        assert!(cache.get(&hash_10c).is_none());
1069        assert!(cache.get(&hash_20).is_some());
1070    }
1071}