Skip to main content

reth_trie_db/
changesets.rs

1//! Trie changeset computation and caching utilities.
2//!
3//! This module provides functionality to compute trie changesets for a given block,
4//! which represent the old trie node values before the block was processed.
5//!
6//! It also provides an efficient in-memory cache for these changesets, which is essential for:
7//! - **Reorg support**: Quickly access changesets to revert blocks during chain reorganizations
8//! - **Memory efficiency**: Automatic eviction ensures bounded memory usage
9
10use crate::{
11    DatabaseHashedCursorFactory, DatabaseStateRoot, DatabaseTrieCursorFactory, TrieTableAdapter,
12};
13use alloy_primitives::{map::B256Map, BlockNumber, B256};
14use parking_lot::RwLock;
15use reth_primitives_traits::FastInstant as Instant;
16use reth_storage_api::{
17    BlockNumReader, ChangeSetReader, DBProvider, StageCheckpointReader, StorageChangeSetReader,
18    StorageSettingsCache,
19};
20use reth_storage_errors::provider::{ProviderError, ProviderResult};
21use reth_trie::{
22    changesets::compute_trie_changesets,
23    trie_cursor::{InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory},
24    TrieInputSorted,
25};
26use reth_trie_common::updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted};
27use std::{
28    collections::BTreeMap,
29    fmt,
30    ops::RangeInclusive,
31    sync::{Arc, OnceLock},
32};
33use tracing::{debug, debug_span, warn};
34
35#[cfg(feature = "metrics")]
36use reth_metrics::{
37    metrics::{Counter, Gauge},
38    Metrics,
39};
40
41/// Computes trie changesets for a block.
42///
43/// # Algorithm
44///
45/// For block N:
46/// 1. Query cumulative `HashedPostState` revert for block N-1 (from db tip to after N-1)
47/// 2. Use that to calculate cumulative `TrieUpdates` revert for block N-1
48/// 3. Query per-block `HashedPostState` revert for block N
49/// 4. Create prefix sets from the per-block revert (step 3)
50/// 5. Create overlay with cumulative trie updates and cumulative state revert for N-1
51/// 6. Calculate trie updates for block N using the overlay and per-block `HashedPostState`.
52/// 7. Compute changesets using the N-1 overlay and the newly calculated trie updates for N
53///
54/// # Arguments
55///
56/// * `provider` - Database provider with changeset access
57/// * `block_number` - Block number to compute changesets for
58///
59/// # Returns
60///
61/// Changesets (old trie node values) for the specified block
62///
63/// # Errors
64///
65/// Returns error if:
66/// - Block number exceeds database tip (based on Finish stage checkpoint)
67/// - Database access fails
68/// - State root computation fails
69pub fn compute_block_trie_changesets<Provider>(
70    provider: &Provider,
71    block_number: BlockNumber,
72) -> Result<TrieUpdatesSorted, ProviderError>
73where
74    Provider: DBProvider
75        + StageCheckpointReader
76        + ChangeSetReader
77        + StorageChangeSetReader
78        + BlockNumReader
79        + StorageSettingsCache,
80{
81    crate::with_adapter!(provider, |A| {
82        compute_block_trie_changesets_inner::<_, A>(provider, block_number)
83    })
84}
85
86fn compute_block_trie_changesets_inner<Provider, A>(
87    provider: &Provider,
88    block_number: BlockNumber,
89) -> Result<TrieUpdatesSorted, ProviderError>
90where
91    Provider: DBProvider
92        + StageCheckpointReader
93        + ChangeSetReader
94        + StorageChangeSetReader
95        + BlockNumReader
96        + StorageSettingsCache,
97    A: TrieTableAdapter,
98{
99    debug!(
100        target: "trie::changeset_cache",
101        block_number,
102        "Computing block trie changesets from database state"
103    );
104
105    // Step 1: Collect/calculate state reverts
106
107    // This is just the changes from this specific block
108    let individual_state_revert =
109        crate::state::from_reverts_auto(provider, block_number..=block_number)?;
110
111    // This reverts all changes from db tip back to just after block was processed
112    let cumulative_state_revert = crate::state::from_reverts_auto(provider, (block_number + 1)..)?;
113
114    // This reverts all changes from db tip back to just after block-1 was processed
115    let mut cumulative_state_revert_prev = cumulative_state_revert.clone();
116    cumulative_state_revert_prev.extend_ref_and_sort(&individual_state_revert);
117
118    // Step 2: Calculate cumulative trie updates revert for block-1
119    // This gives us the trie state as it was after block-1 was processed
120    let prefix_sets_prev = cumulative_state_revert_prev.construct_prefix_sets();
121    let input_prev = TrieInputSorted::new(
122        Arc::default(),
123        Arc::new(cumulative_state_revert_prev),
124        prefix_sets_prev,
125    );
126
127    type DbStateRoot<'a, TX, A> = reth_trie::StateRoot<
128        DatabaseTrieCursorFactory<&'a TX, A>,
129        DatabaseHashedCursorFactory<&'a TX>,
130    >;
131
132    let cumulative_trie_updates_prev =
133        DbStateRoot::<_, A>::overlay_root_from_nodes_with_updates(provider.tx_ref(), input_prev)
134            .map_err(ProviderError::other)?
135            .1
136            .into_sorted();
137
138    // Step 3: Create prefix sets from individual revert (only paths changed by this block)
139    let prefix_sets = individual_state_revert.construct_prefix_sets();
140
141    // Step 4: Calculate trie updates for block
142    // Use cumulative trie updates for block-1 as the node overlay and cumulative state for block
143    let input = TrieInputSorted::new(
144        Arc::new(cumulative_trie_updates_prev.clone()),
145        Arc::new(cumulative_state_revert),
146        prefix_sets,
147    );
148
149    let trie_updates =
150        DbStateRoot::<_, A>::overlay_root_from_nodes_with_updates(provider.tx_ref(), input)
151            .map_err(ProviderError::other)?
152            .1
153            .into_sorted();
154
155    // Step 5: Compute changesets using cumulative trie updates for block-1 as overlay
156    // Create an overlay cursor factory that has the trie state from after block-1
157    let db_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(provider.tx_ref());
158    let overlay_factory =
159        InMemoryTrieCursorFactory::new(db_cursor_factory, &cumulative_trie_updates_prev);
160
161    let changesets =
162        compute_trie_changesets(&overlay_factory, &trie_updates).map_err(ProviderError::other)?;
163
164    debug!(
165        target: "trie::changeset_cache",
166        block_number,
167        num_account_nodes = changesets.account_nodes_ref().len(),
168        num_storage_tries = changesets.storage_tries_ref().len(),
169        "Computed block trie changesets successfully"
170    );
171
172    Ok(changesets)
173}
174
175/// Computes block trie updates using the changeset cache.
176///
177/// # Algorithm
178///
179/// For block N:
180/// 1. Get cumulative trie reverts from block N+1 to db tip using the cache
181/// 2. Create an overlay cursor factory with these reverts (representing trie state after block N)
182/// 3. Walk through account trie changesets for block N
183/// 4. For each changed path, look up the current value using the overlay cursor
184/// 5. Walk through storage trie changesets for block N
185/// 6. For each changed path, look up the current value using the overlay cursor
186/// 7. Return the collected trie updates
187///
188/// # Arguments
189///
190/// * `cache` - Handle to the changeset cache for retrieving trie reverts
191/// * `provider` - Database provider for accessing changesets and block data
192/// * `block_number` - Block number to compute trie updates for
193///
194/// # Returns
195///
196/// Trie updates representing the state of trie nodes after the block was processed
197///
198/// # Errors
199///
200/// Returns error if:
201/// - Block number exceeds database tip
202/// - Database access fails
203/// - Cache retrieval fails
204pub fn compute_block_trie_updates<Provider>(
205    cache: &ChangesetCache,
206    provider: &Provider,
207    block_number: BlockNumber,
208) -> ProviderResult<TrieUpdatesSorted>
209where
210    Provider: DBProvider
211        + StageCheckpointReader
212        + ChangeSetReader
213        + StorageChangeSetReader
214        + BlockNumReader
215        + StorageSettingsCache,
216{
217    crate::with_adapter!(provider, |A| {
218        compute_block_trie_updates_inner::<_, A>(cache, provider, block_number)
219    })
220}
221
222fn compute_block_trie_updates_inner<Provider, A>(
223    cache: &ChangesetCache,
224    provider: &Provider,
225    block_number: BlockNumber,
226) -> ProviderResult<TrieUpdatesSorted>
227where
228    Provider: DBProvider
229        + StageCheckpointReader
230        + ChangeSetReader
231        + StorageChangeSetReader
232        + BlockNumReader
233        + StorageSettingsCache,
234    A: TrieTableAdapter,
235{
236    let tx = provider.tx_ref();
237
238    // Get the database tip block number
239    let db_tip_block = provider
240        .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
241        .as_ref()
242        .map(|chk| chk.block_number)
243        .ok_or_else(|| ProviderError::InsufficientChangesets {
244            requested: block_number,
245            available: 0..=0,
246        })?;
247
248    // Step 1: Get the block hash for the target block
249    let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
250        ProviderError::other(std::io::Error::new(
251            std::io::ErrorKind::NotFound,
252            format!("block hash not found for block number {}", block_number),
253        ))
254    })?;
255
256    // Step 2: Get the trie changesets for the target block from cache
257    let changesets = cache.get_or_compute(block_hash, block_number, provider)?;
258
259    // Step 3: Get the trie reverts for the state after the target block using the cache
260    let reverts = cache.get_or_compute_range(provider, (block_number + 1)..=db_tip_block)?;
261
262    // Step 4: Create an InMemoryTrieCursorFactory with the reverts
263    // This gives us the trie state as it was after the target block was processed
264    let db_cursor_factory = DatabaseTrieCursorFactory::<_, A>::new(tx);
265    let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
266
267    // Step 5: Collect all account trie nodes that changed in the target block
268    let account_nodes_ref = changesets.account_nodes_ref();
269    let mut account_nodes = Vec::with_capacity(account_nodes_ref.len());
270    let mut account_cursor = cursor_factory.account_trie_cursor()?;
271
272    // Iterate over the account nodes from the changesets
273    for (nibbles, _old_node) in account_nodes_ref {
274        // Look up the current value of this trie node using the overlay cursor
275        let node_value = account_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
276        account_nodes.push((*nibbles, node_value));
277    }
278
279    // Step 6: Collect all storage trie nodes that changed in the target block
280    let mut storage_tries = B256Map::default();
281
282    // Iterate over the storage tries from the changesets
283    for (hashed_address, storage_changeset) in changesets.storage_tries_ref() {
284        let mut storage_cursor = cursor_factory.storage_trie_cursor(*hashed_address)?;
285        let storage_nodes_ref = storage_changeset.storage_nodes_ref();
286        let mut storage_nodes = Vec::with_capacity(storage_nodes_ref.len());
287
288        // Iterate over the storage nodes for this account
289        for (nibbles, _old_node) in storage_nodes_ref {
290            // Look up the current value of this storage trie node
291            let node_value = storage_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
292            storage_nodes.push((*nibbles, node_value));
293        }
294
295        storage_tries.insert(
296            *hashed_address,
297            StorageTrieUpdatesSorted { storage_nodes, is_deleted: storage_changeset.is_deleted },
298        );
299    }
300
301    Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
302}
303
304/// A pending changeset computation that other threads can wait on.
305///
306/// When a deferred trie task starts computing changesets for a block, it registers
307/// a pending entry. If another thread needs the same changeset before the computation
308/// finishes, it waits on this entry instead of falling back to the expensive
309/// DB-based computation.
310struct PendingChangeset {
311    /// `None` when cancelled (e.g. due to panic), `Some(..)` when resolved with data.
312    result: OnceLock<Option<Arc<TrieUpdatesSorted>>>,
313}
314
315impl PendingChangeset {
316    const fn new() -> Self {
317        Self { result: OnceLock::new() }
318    }
319
320    /// Blocks until the computation finishes. Returns `Some` if resolved with data,
321    /// `None` if the computation was cancelled.
322    fn wait(&self) -> Option<Arc<TrieUpdatesSorted>> {
323        let _span =
324            debug_span!(target: "trie::changeset_cache", "waiting_for_pending_changeset").entered();
325        self.result.wait().clone()
326    }
327
328    /// Resolves the pending computation with the given result, waking all waiters.
329    fn resolve(&self, changesets: Arc<TrieUpdatesSorted>) {
330        let _ = self.result.set(Some(changesets));
331    }
332
333    /// Cancels the pending computation, waking all waiters so they fall through
334    /// to the DB fallback.
335    fn cancel(&self) {
336        let _ = self.result.set(None);
337    }
338}
339
340impl fmt::Debug for PendingChangeset {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        let is_resolved = self.result.get().is_some();
343        f.debug_struct("PendingChangeset").field("resolved", &is_resolved).finish()
344    }
345}
346
347/// Thread-safe changeset cache.
348///
349/// This type wraps a shared, mutable reference to the cache inner.
350/// The `RwLock` enables concurrent reads while ensuring exclusive access for writes.
351#[derive(Debug, Clone)]
352pub struct ChangesetCache {
353    inner: Arc<RwLock<ChangesetCacheInner>>,
354}
355
356impl Default for ChangesetCache {
357    fn default() -> Self {
358        Self::new()
359    }
360}
361
362impl ChangesetCache {
363    /// Creates a new cache.
364    ///
365    /// The cache has no capacity limit and relies on explicit eviction
366    /// via the `evict()` method to manage memory usage.
367    pub fn new() -> Self {
368        Self { inner: Arc::new(RwLock::new(ChangesetCacheInner::new())) }
369    }
370
371    /// Retrieves changesets for a block by hash.
372    ///
373    /// Returns `None` if the block is not in the cache (either evicted or never computed).
374    /// Updates hit/miss metrics accordingly.
375    pub fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
376        self.inner.read().get(block_hash)
377    }
378
379    /// Inserts changesets for a block into the cache.
380    ///
381    /// Also resolves any pending computation for this block hash, waking threads
382    /// that are waiting for the result.
383    ///
384    /// This method does not perform any eviction. Eviction must be explicitly
385    /// triggered by calling `evict()`.
386    ///
387    /// # Arguments
388    ///
389    /// * `block_hash` - Hash of the block
390    /// * `block_number` - Block number for tracking and eviction
391    /// * `changesets` - Trie changesets to cache
392    fn insert(&self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
393        let pending = {
394            let mut cache = self.inner.write();
395            cache.insert(block_hash, block_number, Arc::clone(&changesets));
396            cache.pending.remove(&block_hash)
397        };
398
399        // Resolve pending entry outside the write lock to avoid holding it
400        // while waiters wake up.
401        if let Some(pending) = pending {
402            pending.resolve(changesets);
403        }
404    }
405
406    /// Registers a pending changeset computation for the given block hash.
407    ///
408    /// Call this before starting changeset computation so that concurrent
409    /// readers can wait for the result instead of falling back to the expensive
410    /// DB-based computation.
411    ///
412    /// The returned [`PendingChangesetGuard`] must be used to resolve or cancel
413    /// the pending entry. If dropped without resolving (e.g. due to a panic),
414    /// the pending entry is automatically removed from the cache so that
415    /// waiters fall through to the DB fallback.
416    pub fn register_pending(&self, block_hash: B256) -> PendingChangesetGuard {
417        let pending = Arc::new(PendingChangeset::new());
418        let prev = self.inner.write().pending.insert(block_hash, Arc::clone(&pending));
419        debug_assert!(prev.is_none(), "duplicate pending changeset for {block_hash:?}");
420        PendingChangesetGuard { cache: self.clone(), block_hash, pending: Some(pending) }
421    }
422
423    /// Evicts changesets for blocks below the given block number.
424    ///
425    /// This should be called after blocks are persisted to the database to free
426    /// memory for changesets that are no longer needed in the cache.
427    ///
428    /// # Arguments
429    ///
430    /// * `up_to_block` - Evict blocks with number < this value. Blocks with number >= this value
431    ///   are retained.
432    pub fn evict(&self, up_to_block: BlockNumber) {
433        self.inner.write().evict(up_to_block)
434    }
435
436    /// Gets changesets from cache, or computes them on-the-fly if missing.
437    ///
438    /// This is the primary API for retrieving changesets. It checks three sources in order:
439    /// 1. **Cache hit** — returns immediately
440    /// 2. **Pending computation** — blocks until the deferred trie task finishes
441    /// 3. **DB fallback** — computes from database state (expensive)
442    ///
443    /// # Arguments
444    ///
445    /// * `block_hash` - Hash of the block to get changesets for
446    /// * `block_number` - Block number (for cache insertion and logging)
447    /// * `provider` - Database provider for DB access
448    ///
449    /// # Returns
450    ///
451    /// Changesets for the block, either from cache, a pending computation, or computed on-the-fly
452    pub fn get_or_compute<P>(
453        &self,
454        block_hash: B256,
455        block_number: u64,
456        provider: &P,
457    ) -> ProviderResult<Arc<TrieUpdatesSorted>>
458    where
459        P: DBProvider
460            + StageCheckpointReader
461            + ChangeSetReader
462            + StorageChangeSetReader
463            + BlockNumReader
464            + StorageSettingsCache,
465    {
466        // Try cache first, and if missing, check for a pending computation.
467        let pending = {
468            let cache = self.inner.read();
469            if let Some(changesets) = cache.get(&block_hash) {
470                debug!(
471                    target: "trie::changeset_cache",
472                    ?block_hash,
473                    block_number,
474                    "Changeset cache HIT"
475                );
476                return Ok(changesets);
477            }
478            cache.pending.get(&block_hash).cloned()
479        };
480
481        // If there's a pending computation, wait for it instead of computing from DB.
482        if let Some(pending) = pending {
483            debug!(
484                target: "trie::changeset_cache",
485                ?block_hash,
486                block_number,
487                "Changeset cache MISS but pending computation found, waiting"
488            );
489
490            let start = Instant::now();
491
492            if let Some(changesets) = pending.wait() {
493                debug!(
494                    target: "trie::changeset_cache",
495                    ?block_hash,
496                    block_number,
497                    elapsed = ?start.elapsed(),
498                    "Pending changeset resolved"
499                );
500                return Ok(changesets);
501            }
502
503            debug!(
504                target: "trie::changeset_cache",
505                ?block_hash,
506                block_number,
507                elapsed = ?start.elapsed(),
508                "Pending changeset was cancelled, falling through to DB computation"
509            );
510        }
511
512        // No cache hit and no pending computation - compute from database
513        warn!(
514            target: "trie::changeset_cache",
515            ?block_hash,
516            block_number,
517            "Changeset cache MISS, falling back to DB-based computation"
518        );
519
520        let start = Instant::now();
521
522        // Compute changesets
523        let changesets =
524            compute_block_trie_changesets(provider, block_number).map_err(ProviderError::other)?;
525
526        let changesets = Arc::new(changesets);
527        let elapsed = start.elapsed();
528
529        debug!(
530            target: "trie::changeset_cache",
531            ?elapsed,
532            block_number,
533            ?block_hash,
534            "Changeset computed from database and inserting into cache"
535        );
536
537        // Store in cache (with write lock)
538        self.insert(block_hash, block_number, Arc::clone(&changesets));
539
540        debug!(
541            target: "trie::changeset_cache",
542            ?block_hash,
543            block_number,
544            "Changeset successfully cached"
545        );
546
547        Ok(changesets)
548    }
549
550    /// Gets or computes accumulated trie reverts for a range of blocks.
551    ///
552    /// This method retrieves and accumulates all trie changesets (reverts) for the specified
553    /// block range (inclusive). The changesets are accumulated in reverse order (newest to oldest)
554    /// so that older values take precedence when there are conflicts.
555    ///
556    /// # Arguments
557    ///
558    /// * `provider` - Database provider for DB access and block lookups
559    /// * `range` - Block range to accumulate reverts for (inclusive)
560    ///
561    /// # Returns
562    ///
563    /// Accumulated trie reverts for all blocks in the specified range
564    ///
565    /// # Errors
566    ///
567    /// Returns error if:
568    /// - Any block in the range is beyond the database tip
569    /// - Database access fails
570    /// - Block hash lookup fails
571    /// - Changeset computation fails
572    pub fn get_or_compute_range<P>(
573        &self,
574        provider: &P,
575        range: RangeInclusive<BlockNumber>,
576    ) -> ProviderResult<TrieUpdatesSorted>
577    where
578        P: DBProvider
579            + StageCheckpointReader
580            + ChangeSetReader
581            + StorageChangeSetReader
582            + BlockNumReader
583            + StorageSettingsCache,
584    {
585        // Get the database tip block number
586        let db_tip_block = provider
587            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
588            .as_ref()
589            .map(|chk| chk.block_number)
590            .ok_or_else(|| ProviderError::InsufficientChangesets {
591                requested: *range.start(),
592                available: 0..=0,
593            })?;
594
595        let start_block = *range.start();
596        let end_block = *range.end();
597
598        // If range end is beyond the tip, return an error
599        if end_block > db_tip_block {
600            return Err(ProviderError::InsufficientChangesets {
601                requested: end_block,
602                available: 0..=db_tip_block,
603            });
604        }
605
606        let timer = Instant::now();
607
608        debug!(
609            target: "trie::changeset_cache",
610            start_block,
611            end_block,
612            db_tip_block,
613            "Starting get_or_compute_range"
614        );
615
616        // Use changeset cache to retrieve and accumulate reverts block by block.
617        // Iterate in reverse order (newest to oldest) so that older changesets
618        // take precedence when there are conflicting updates.
619        let mut accumulated_reverts = TrieUpdatesSorted::default();
620
621        for block_number in range.rev() {
622            // Get the block hash for this block number
623            let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
624                ProviderError::other(std::io::Error::new(
625                    std::io::ErrorKind::NotFound,
626                    format!("block hash not found for block number {}", block_number),
627                ))
628            })?;
629
630            debug!(
631                target: "trie::changeset_cache",
632                block_number,
633                ?block_hash,
634                "Looked up block hash for block number in range"
635            );
636
637            // Get changesets from cache (or compute on-the-fly)
638            let changesets = self.get_or_compute(block_hash, block_number, provider)?;
639
640            // Overlay this block's changesets on top of accumulated reverts.
641            // Since we iterate newest to oldest, older values are added last
642            // and overwrite any conflicting newer values (oldest changeset values take
643            // precedence).
644            accumulated_reverts.extend_ref_and_sort(&changesets);
645        }
646
647        let elapsed = timer.elapsed();
648
649        let num_account_nodes = accumulated_reverts.account_nodes_ref().len();
650        let num_storage_tries = accumulated_reverts.storage_tries_ref().len();
651
652        debug!(
653            target: "trie::changeset_cache",
654            ?elapsed,
655            start_block,
656            end_block,
657            num_blocks = end_block.saturating_sub(start_block).saturating_add(1),
658            num_account_nodes,
659            num_storage_tries,
660            "Finished accumulating trie reverts for block range"
661        );
662
663        Ok(accumulated_reverts)
664    }
665}
666
667/// Guard for a pending changeset computation.
668///
669/// Returned by [`ChangesetCache::register_pending`]. Must be resolved via [`Self::resolve`]
670/// to insert the computed changesets into the cache and wake waiting threads.
671///
672/// If dropped without resolving (e.g. due to a panic), the pending entry is automatically
673/// cancelled so waiters fall through to the DB fallback.
674#[must_use = "call .resolve() to insert changesets into the cache"]
675pub struct PendingChangesetGuard {
676    cache: ChangesetCache,
677    block_hash: B256,
678    /// `None` after [`Self::resolve`] has been called.
679    pending: Option<Arc<PendingChangeset>>,
680}
681
682impl PendingChangesetGuard {
683    /// Resolves the pending computation by inserting the changesets into the cache
684    /// and waking all waiting threads.
685    pub fn resolve(mut self, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
686        self.cache.insert(self.block_hash, block_number, changesets);
687        self.pending = None;
688    }
689}
690
691impl fmt::Debug for PendingChangesetGuard {
692    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
693        f.debug_struct("PendingChangesetGuard").field("block_hash", &self.block_hash).finish()
694    }
695}
696
697impl Drop for PendingChangesetGuard {
698    fn drop(&mut self) {
699        let Some(pending) = self.pending.take() else {
700            // Guard was resolved successfully already, no-op
701            return
702        };
703
704        let removed = self.cache.inner.write().pending.remove(&self.block_hash);
705        if let Some(removed) = removed {
706            if Arc::ptr_eq(&removed, &pending) {
707                debug!(
708                    target: "trie::changeset_cache",
709                    block_hash = ?self.block_hash,
710                    "Pending changeset dropped without resolution, cancelling"
711                );
712                removed.cancel();
713            } else {
714                // Put it back — it belongs to a different registration.
715                self.cache.inner.write().pending.insert(self.block_hash, removed);
716            }
717        }
718    }
719}
720
721/// In-memory cache for trie changesets with explicit eviction policy.
722///
723/// Holds changesets for blocks that have been validated but not yet persisted.
724/// Keyed by block hash for fast lookup during reorgs. Eviction is controlled
725/// explicitly by the engine API tree handler when persistence completes.
726///
727/// ## Eviction Policy
728///
729/// Unlike traditional caches with automatic eviction, this cache requires explicit
730/// eviction calls. The engine API tree handler calls `evict(block_number)` after
731/// blocks are persisted to the database, ensuring changesets remain available
732/// until their corresponding blocks are safely on disk.
733///
734/// ## Metrics
735///
736/// The cache maintains several metrics for observability:
737/// - `hits`: Number of successful cache lookups
738/// - `misses`: Number of failed cache lookups
739/// - `evictions`: Number of blocks evicted
740/// - `size`: Current number of cached blocks
741#[derive(Debug)]
742struct ChangesetCacheInner {
743    /// Cache entries: block hash -> (block number, changesets)
744    entries: B256Map<(u64, Arc<TrieUpdatesSorted>)>,
745
746    /// Block number to hashes mapping for eviction
747    block_numbers: BTreeMap<u64, Vec<B256>>,
748
749    /// Pending changeset computations: block hash -> pending entry.
750    /// Threads waiting on a pending entry will block until it's resolved.
751    pending: B256Map<Arc<PendingChangeset>>,
752
753    /// Metrics for monitoring cache behavior
754    #[cfg(feature = "metrics")]
755    metrics: ChangesetCacheMetrics,
756}
757
758#[cfg(feature = "metrics")]
759/// Metrics for the changeset cache.
760///
761/// These metrics provide visibility into cache performance and help identify
762/// potential issues like high miss rates.
763#[derive(Metrics, Clone)]
764#[metrics(scope = "trie.changeset_cache")]
765struct ChangesetCacheMetrics {
766    /// Cache hit counter
767    hits: Counter,
768
769    /// Cache miss counter
770    misses: Counter,
771
772    /// Eviction counter
773    evictions: Counter,
774
775    /// Current cache size (number of entries)
776    size: Gauge,
777}
778
779impl Default for ChangesetCacheInner {
780    fn default() -> Self {
781        Self::new()
782    }
783}
784
785impl ChangesetCacheInner {
786    /// Creates a new empty changeset cache.
787    ///
788    /// The cache has no capacity limit and relies on explicit eviction
789    /// via the `evict()` method to manage memory usage.
790    fn new() -> Self {
791        Self {
792            entries: B256Map::default(),
793            block_numbers: BTreeMap::new(),
794            pending: B256Map::default(),
795            #[cfg(feature = "metrics")]
796            metrics: Default::default(),
797        }
798    }
799
800    fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
801        match self.entries.get(block_hash) {
802            Some((_, changesets)) => {
803                #[cfg(feature = "metrics")]
804                self.metrics.hits.increment(1);
805                Some(Arc::clone(changesets))
806            }
807            None => {
808                #[cfg(feature = "metrics")]
809                self.metrics.misses.increment(1);
810                None
811            }
812        }
813    }
814
815    fn insert(&mut self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
816        debug!(
817            target: "trie::changeset_cache",
818            ?block_hash,
819            block_number,
820            cache_size_before = self.entries.len(),
821            "Inserting changeset into cache"
822        );
823
824        // Insert the entry
825        self.entries.insert(block_hash, (block_number, changesets));
826
827        // Add block hash to block_numbers mapping
828        self.block_numbers.entry(block_number).or_default().push(block_hash);
829
830        // Update size metric
831        #[cfg(feature = "metrics")]
832        self.metrics.size.set(self.entries.len() as f64);
833
834        debug!(
835            target: "trie::changeset_cache",
836            ?block_hash,
837            block_number,
838            cache_size_after = self.entries.len(),
839            "Changeset inserted into cache"
840        );
841    }
842
843    fn evict(&mut self, up_to_block: BlockNumber) {
844        debug!(
845            target: "trie::changeset_cache",
846            up_to_block,
847            cache_size_before = self.entries.len(),
848            "Starting cache eviction"
849        );
850
851        // Find all block numbers that should be evicted (< up_to_block)
852        let blocks_to_evict: Vec<u64> =
853            self.block_numbers.range(..up_to_block).map(|(num, _)| *num).collect();
854
855        // Remove entries for each block number below threshold
856        #[cfg(feature = "metrics")]
857        let mut evicted_count = 0;
858        #[cfg(not(feature = "metrics"))]
859        let mut evicted_count = 0;
860
861        for block_number in &blocks_to_evict {
862            if let Some(hashes) = self.block_numbers.remove(block_number) {
863                debug!(
864                    target: "trie::changeset_cache",
865                    block_number,
866                    num_hashes = hashes.len(),
867                    "Evicting block from cache"
868                );
869                for hash in hashes {
870                    if self.entries.remove(&hash).is_some() {
871                        evicted_count += 1;
872                    }
873                }
874            }
875        }
876
877        debug!(
878            target: "trie::changeset_cache",
879            up_to_block,
880            evicted_count,
881            cache_size_after = self.entries.len(),
882            "Finished cache eviction"
883        );
884
885        // Update metrics if we evicted anything
886        #[cfg(feature = "metrics")]
887        if evicted_count > 0 {
888            self.metrics.evictions.increment(evicted_count as u64);
889            self.metrics.size.set(self.entries.len() as f64);
890        }
891    }
892}
893
894#[cfg(test)]
895mod tests {
896    use super::*;
897    use alloy_primitives::map::{B256Map, HashMap};
898
899    // Helper function to create empty TrieUpdatesSorted for testing
900    fn create_test_changesets() -> Arc<TrieUpdatesSorted> {
901        Arc::new(TrieUpdatesSorted::new(vec![], B256Map::default()))
902    }
903
904    #[test]
905    fn test_insert_and_retrieve_single_entry() {
906        let mut cache = ChangesetCacheInner::new();
907        let hash = B256::random();
908        let changesets = create_test_changesets();
909
910        cache.insert(hash, 100, Arc::clone(&changesets));
911
912        // Should be able to retrieve it
913        let retrieved = cache.get(&hash);
914        assert!(retrieved.is_some());
915        assert_eq!(cache.entries.len(), 1);
916    }
917
918    #[test]
919    fn test_insert_multiple_entries() {
920        let mut cache = ChangesetCacheInner::new();
921
922        // Insert 10 blocks
923        let mut hashes = Vec::new();
924        for i in 0..10 {
925            let hash = B256::random();
926            cache.insert(hash, 100 + i, create_test_changesets());
927            hashes.push(hash);
928        }
929
930        // Should be able to retrieve all
931        assert_eq!(cache.entries.len(), 10);
932        for hash in &hashes {
933            assert!(cache.get(hash).is_some());
934        }
935    }
936
937    #[test]
938    fn test_eviction_when_explicitly_called() {
939        let mut cache = ChangesetCacheInner::new();
940
941        // Insert 15 blocks (0-14)
942        let mut hashes = Vec::new();
943        for i in 0..15 {
944            let hash = B256::random();
945            cache.insert(hash, i, create_test_changesets());
946            hashes.push((i, hash));
947        }
948
949        // All blocks should be present (no automatic eviction)
950        assert_eq!(cache.entries.len(), 15);
951
952        // Explicitly evict blocks < 4
953        cache.evict(4);
954
955        // Blocks 0-3 should be evicted
956        assert_eq!(cache.entries.len(), 11); // blocks 4-14 = 11 blocks
957
958        // Verify blocks 0-3 are evicted
959        for i in 0..4 {
960            assert!(cache.get(&hashes[i as usize].1).is_none(), "Block {} should be evicted", i);
961        }
962
963        // Verify blocks 4-14 are still present
964        for i in 4..15 {
965            assert!(cache.get(&hashes[i as usize].1).is_some(), "Block {} should be present", i);
966        }
967    }
968
969    #[test]
970    fn test_eviction_with_persistence_watermark() {
971        let mut cache = ChangesetCacheInner::new();
972
973        // Insert blocks 100-165
974        let mut hashes = HashMap::new();
975        for i in 100..=165 {
976            let hash = B256::random();
977            cache.insert(hash, i, create_test_changesets());
978            hashes.insert(i, hash);
979        }
980
981        // All blocks should be present (no automatic eviction)
982        assert_eq!(cache.entries.len(), 66);
983
984        // Simulate persistence up to block 164, with 64-block retention window
985        // Eviction threshold = 164 - 64 = 100
986        cache.evict(100);
987
988        // Blocks 100-165 should remain (66 blocks)
989        assert_eq!(cache.entries.len(), 66);
990
991        // Simulate persistence up to block 165
992        // Eviction threshold = 165 - 64 = 101
993        cache.evict(101);
994
995        // Blocks 101-165 should remain (65 blocks)
996        assert_eq!(cache.entries.len(), 65);
997        assert!(cache.get(&hashes[&100]).is_none());
998        assert!(cache.get(&hashes[&101]).is_some());
999    }
1000
1001    #[test]
1002    fn test_out_of_order_inserts_with_explicit_eviction() {
1003        let mut cache = ChangesetCacheInner::new();
1004
1005        // Insert blocks in random order
1006        let hash_10 = B256::random();
1007        cache.insert(hash_10, 10, create_test_changesets());
1008
1009        let hash_5 = B256::random();
1010        cache.insert(hash_5, 5, create_test_changesets());
1011
1012        let hash_15 = B256::random();
1013        cache.insert(hash_15, 15, create_test_changesets());
1014
1015        let hash_3 = B256::random();
1016        cache.insert(hash_3, 3, create_test_changesets());
1017
1018        // All blocks should be present (no automatic eviction)
1019        assert_eq!(cache.entries.len(), 4);
1020
1021        // Explicitly evict blocks < 5
1022        cache.evict(5);
1023
1024        assert!(cache.get(&hash_3).is_none(), "Block 3 should be evicted");
1025        assert!(cache.get(&hash_5).is_some(), "Block 5 should be present");
1026        assert!(cache.get(&hash_10).is_some(), "Block 10 should be present");
1027        assert!(cache.get(&hash_15).is_some(), "Block 15 should be present");
1028    }
1029
1030    #[test]
1031    fn test_multiple_blocks_same_number() {
1032        let mut cache = ChangesetCacheInner::new();
1033
1034        // Insert multiple blocks with same number (side chains)
1035        let hash_1a = B256::random();
1036        let hash_1b = B256::random();
1037        cache.insert(hash_1a, 100, create_test_changesets());
1038        cache.insert(hash_1b, 100, create_test_changesets());
1039
1040        // Both should be retrievable
1041        assert!(cache.get(&hash_1a).is_some());
1042        assert!(cache.get(&hash_1b).is_some());
1043        assert_eq!(cache.entries.len(), 2);
1044    }
1045
1046    #[test]
1047    fn test_eviction_removes_all_side_chains() {
1048        let mut cache = ChangesetCacheInner::new();
1049
1050        // Insert multiple blocks at same height (side chains)
1051        let hash_10a = B256::random();
1052        let hash_10b = B256::random();
1053        let hash_10c = B256::random();
1054        cache.insert(hash_10a, 10, create_test_changesets());
1055        cache.insert(hash_10b, 10, create_test_changesets());
1056        cache.insert(hash_10c, 10, create_test_changesets());
1057
1058        let hash_20 = B256::random();
1059        cache.insert(hash_20, 20, create_test_changesets());
1060
1061        assert_eq!(cache.entries.len(), 4);
1062
1063        // Evict blocks < 15 - should remove all three side chains at height 10
1064        cache.evict(15);
1065
1066        assert_eq!(cache.entries.len(), 1);
1067        assert!(cache.get(&hash_10a).is_none());
1068        assert!(cache.get(&hash_10b).is_none());
1069        assert!(cache.get(&hash_10c).is_none());
1070        assert!(cache.get(&hash_20).is_some());
1071    }
1072}