reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::{
3        storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4    },
5    providers::{
6        database::{chain::ChainStorage, metrics},
7        rocksdb::RocksDBProvider,
8        static_file::StaticFileWriter,
9        NodeTypesForProvider, StaticFileProvider,
10    },
11    to_range,
12    traits::{
13        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
14    },
15    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
16    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
17    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
18    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
19    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
20    PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, RocksDBProviderFactory,
21    StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
22    StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
23    TransactionsProviderExt, TrieReader, TrieWriter,
24};
25use alloy_consensus::{
26    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
27    BlockHeader, TxReceipt,
28};
29use alloy_eips::BlockHashOrNumber;
30use alloy_primitives::{
31    keccak256,
32    map::{hash_map, B256Map, HashMap, HashSet},
33    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
34};
35use itertools::Itertools;
36use parking_lot::RwLock;
37use rayon::slice::ParallelSliceMut;
38use reth_chain_state::ExecutedBlock;
39use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
40use reth_db_api::{
41    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
42    database::Database,
43    models::{
44        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
45        BlockNumberHashedAddress, ShardedKey, StorageSettings, StoredBlockBodyIndices,
46    },
47    table::Table,
48    tables,
49    transaction::{DbTx, DbTxMut},
50    BlockNumberList, PlainAccountState, PlainStorageState,
51};
52use reth_execution_types::{Chain, ExecutionOutcome};
53use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
54use reth_primitives_traits::{
55    Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
65    TryIntoHistoricalStateProvider,
66};
67use reth_storage_errors::provider::ProviderResult;
68use reth_trie::{
69    trie_cursor::{
70        InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
71        TrieCursorIter,
72    },
73    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
74    HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
75};
76use reth_trie_db::{
77    DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
78};
79use revm_database::states::{
80    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
81};
82use std::{
83    cmp::Ordering,
84    collections::{BTreeMap, BTreeSet},
85    fmt::Debug,
86    ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
87    sync::Arc,
88};
89use tracing::{debug, trace};
90
91/// A [`DatabaseProvider`] that holds a read-only database transaction.
92pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
93
94/// A [`DatabaseProvider`] that holds a read-write database transaction.
95///
96/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
97/// Once that issue is solved, we can probably revert back to being an alias type.
98#[derive(Debug)]
99pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
100    pub DatabaseProvider<<DB as Database>::TXMut, N>,
101);
102
103impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
104    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
105
106    fn deref(&self) -> &Self::Target {
107        &self.0
108    }
109}
110
111impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
112    fn deref_mut(&mut self) -> &mut Self::Target {
113        &mut self.0
114    }
115}
116
117impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
118    for DatabaseProviderRW<DB, N>
119{
120    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
121        &self.0
122    }
123}
124
125impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
126    /// Commit database transaction and static file if it exists.
127    pub fn commit(self) -> ProviderResult<bool> {
128        self.0.commit()
129    }
130
131    /// Consume `DbTx` or `DbTxMut`.
132    pub fn into_tx(self) -> <DB as Database>::TXMut {
133        self.0.into_tx()
134    }
135
136    /// Override the minimum pruning distance for testing purposes.
137    #[cfg(any(test, feature = "test-utils"))]
138    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
139        self.0.minimum_pruning_distance = distance;
140        self
141    }
142}
143
144impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
145    for DatabaseProvider<<DB as Database>::TXMut, N>
146{
147    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
148        provider.0
149    }
150}
151
152/// A provider struct that fetches data from the database.
153/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
154pub struct DatabaseProvider<TX, N: NodeTypes> {
155    /// Database transaction.
156    tx: TX,
157    /// Chain spec
158    chain_spec: Arc<N::ChainSpec>,
159    /// Static File provider
160    static_file_provider: StaticFileProvider<N::Primitives>,
161    /// Pruning configuration
162    prune_modes: PruneModes,
163    /// Node storage handler.
164    storage: Arc<N::Storage>,
165    /// Storage configuration settings for this node
166    storage_settings: Arc<RwLock<StorageSettings>>,
167    /// `RocksDB` provider
168    rocksdb_provider: RocksDBProvider,
169    /// Pending `RocksDB` batches to be committed at provider commit time.
170    #[cfg(all(unix, feature = "rocksdb"))]
171    pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
172    /// Minimum distance from tip required for pruning
173    minimum_pruning_distance: u64,
174}
175
176impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        let mut s = f.debug_struct("DatabaseProvider");
179        s.field("tx", &self.tx)
180            .field("chain_spec", &self.chain_spec)
181            .field("static_file_provider", &self.static_file_provider)
182            .field("prune_modes", &self.prune_modes)
183            .field("storage", &self.storage)
184            .field("storage_settings", &self.storage_settings)
185            .field("rocksdb_provider", &self.rocksdb_provider);
186        #[cfg(all(unix, feature = "rocksdb"))]
187        s.field("pending_rocksdb_batches", &"<pending batches>");
188        s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
189    }
190}
191
192impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
193    /// Returns reference to prune modes.
194    pub const fn prune_modes_ref(&self) -> &PruneModes {
195        &self.prune_modes
196    }
197}
198
199impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
200    /// State provider for latest state
201    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
202        trace!(target: "providers::db", "Returning latest state provider");
203        Box::new(LatestStateProviderRef::new(self))
204    }
205
206    /// Storage provider for state at that given block hash
207    pub fn history_by_block_hash<'a>(
208        &'a self,
209        block_hash: BlockHash,
210    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
211        let mut block_number =
212            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
213        if block_number == self.best_block_number().unwrap_or_default() &&
214            block_number == self.last_block_number().unwrap_or_default()
215        {
216            return Ok(Box::new(LatestStateProviderRef::new(self)))
217        }
218
219        // +1 as the changeset that we want is the one that was applied after this block.
220        block_number += 1;
221
222        let account_history_prune_checkpoint =
223            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
224        let storage_history_prune_checkpoint =
225            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
226
227        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
228
229        // If we pruned account or storage history, we can't return state on every historical block.
230        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
231        if let Some(prune_checkpoint_block_number) =
232            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
233        {
234            state_provider = state_provider.with_lowest_available_account_history_block_number(
235                prune_checkpoint_block_number + 1,
236            );
237        }
238        if let Some(prune_checkpoint_block_number) =
239            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
240        {
241            state_provider = state_provider.with_lowest_available_storage_history_block_number(
242                prune_checkpoint_block_number + 1,
243            );
244        }
245
246        Ok(Box::new(state_provider))
247    }
248
249    #[cfg(feature = "test-utils")]
250    /// Sets the prune modes for provider.
251    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
252        self.prune_modes = prune_modes;
253    }
254}
255
256impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
257    type Primitives = N::Primitives;
258}
259
260impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
261    /// Returns a static file provider
262    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
263        self.static_file_provider.clone()
264    }
265
266    fn get_static_file_writer(
267        &self,
268        block: BlockNumber,
269        segment: StaticFileSegment,
270    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
271        self.static_file_provider.get_writer(block, segment)
272    }
273}
274
275impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
276    /// Returns the `RocksDB` provider.
277    fn rocksdb_provider(&self) -> RocksDBProvider {
278        self.rocksdb_provider.clone()
279    }
280
281    #[cfg(all(unix, feature = "rocksdb"))]
282    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
283        self.pending_rocksdb_batches.lock().push(batch);
284    }
285}
286
287impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
288    for DatabaseProvider<TX, N>
289{
290    type ChainSpec = N::ChainSpec;
291
292    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
293        self.chain_spec.clone()
294    }
295}
296
297impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
298    /// Creates a provider with an inner read-write transaction.
299    pub const fn new_rw(
300        tx: TX,
301        chain_spec: Arc<N::ChainSpec>,
302        static_file_provider: StaticFileProvider<N::Primitives>,
303        prune_modes: PruneModes,
304        storage: Arc<N::Storage>,
305        storage_settings: Arc<RwLock<StorageSettings>>,
306        rocksdb_provider: RocksDBProvider,
307    ) -> Self {
308        Self {
309            tx,
310            chain_spec,
311            static_file_provider,
312            prune_modes,
313            storage,
314            storage_settings,
315            rocksdb_provider,
316            #[cfg(all(unix, feature = "rocksdb"))]
317            pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
318            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
319        }
320    }
321}
322
323impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
324    fn as_ref(&self) -> &Self {
325        self
326    }
327}
328
329impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
330    /// Writes executed blocks and state to storage.
331    pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
332        if blocks.is_empty() {
333            debug!(target: "providers::db", "Attempted to write empty block range");
334            return Ok(())
335        }
336
337        // NOTE: checked non-empty above
338        let first_block = blocks.first().unwrap().recovered_block();
339
340        let last_block = blocks.last().unwrap().recovered_block();
341        let first_number = first_block.number();
342        let last_block_number = last_block.number();
343
344        debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
345
346        // TODO: Do performant / batched writes for each type of object
347        // instead of a loop over all blocks,
348        // meaning:
349        //  * blocks
350        //  * state
351        //  * hashed state
352        //  * trie updates (cannot naively extend, need helper)
353        //  * indices (already done basically)
354        // Insert the blocks
355        for block in blocks {
356            let trie_data = block.trie_data();
357            let ExecutedBlock { recovered_block, execution_output, .. } = block;
358            let block_number = recovered_block.number();
359            self.insert_block(&recovered_block)?;
360
361            // Write state and changesets to the database.
362            // Must be written after blocks because of the receipt lookup.
363            self.write_state(&execution_output, OriginalValuesKnown::No)?;
364
365            // insert hashes and intermediate merkle nodes
366            self.write_hashed_state(&trie_data.hashed_state)?;
367
368            self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
369            self.write_trie_updates_sorted(&trie_data.trie_updates)?;
370        }
371
372        // update history indices
373        self.update_history_indices(first_number..=last_block_number)?;
374
375        // Update pipeline progress
376        self.update_pipeline_stages(last_block_number, false)?;
377
378        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
379
380        Ok(())
381    }
382
383    /// Unwinds trie state starting at and including the given block.
384    ///
385    /// This includes calculating the resulted state root and comparing it with the parent block
386    /// state root.
387    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
388        let changed_accounts = self
389            .tx
390            .cursor_read::<tables::AccountChangeSets>()?
391            .walk_range(from..)?
392            .collect::<Result<Vec<_>, _>>()?;
393
394        // Unwind account hashes.
395        self.unwind_account_hashing(changed_accounts.iter())?;
396
397        // Unwind account history indices.
398        self.unwind_account_history_indices(changed_accounts.iter())?;
399
400        let storage_start = BlockNumberAddress((from, Address::ZERO));
401        let changed_storages = self
402            .tx
403            .cursor_read::<tables::StorageChangeSets>()?
404            .walk_range(storage_start..)?
405            .collect::<Result<Vec<_>, _>>()?;
406
407        // Unwind storage hashes.
408        self.unwind_storage_hashing(changed_storages.iter().copied())?;
409
410        // Unwind storage history indices.
411        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
412
413        // Unwind accounts/storages trie tables using the revert.
414        let trie_revert = self.trie_reverts(from)?;
415        self.write_trie_updates_sorted(&trie_revert)?;
416
417        // Clear trie changesets which have been unwound.
418        self.clear_trie_changesets_from(from)?;
419
420        Ok(())
421    }
422
423    /// Removes receipts from all transactions starting with provided number (inclusive).
424    fn remove_receipts_from(
425        &self,
426        from_tx: TxNumber,
427        last_block: BlockNumber,
428    ) -> ProviderResult<()> {
429        // iterate over block body and remove receipts
430        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
431
432        if EitherWriter::receipts_destination(self).is_static_file() {
433            let static_file_receipt_num =
434                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
435
436            let to_delete = static_file_receipt_num
437                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
438                .unwrap_or_default();
439
440            self.static_file_provider
441                .latest_writer(StaticFileSegment::Receipts)?
442                .prune_receipts(to_delete, last_block)?;
443        }
444
445        Ok(())
446    }
447}
448
449impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
450    fn try_into_history_at_block(
451        self,
452        mut block_number: BlockNumber,
453    ) -> ProviderResult<StateProviderBox> {
454        // if the block number is the same as the currently best block number on disk we can use the
455        // latest state provider here
456        if block_number == self.best_block_number().unwrap_or_default() {
457            return Ok(Box::new(LatestStateProvider::new(self)))
458        }
459
460        // +1 as the changeset that we want is the one that was applied after this block.
461        block_number += 1;
462
463        let account_history_prune_checkpoint =
464            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
465        let storage_history_prune_checkpoint =
466            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
467
468        let mut state_provider = HistoricalStateProvider::new(self, block_number);
469
470        // If we pruned account or storage history, we can't return state on every historical block.
471        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
472        if let Some(prune_checkpoint_block_number) =
473            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
474        {
475            state_provider = state_provider.with_lowest_available_account_history_block_number(
476                prune_checkpoint_block_number + 1,
477            );
478        }
479        if let Some(prune_checkpoint_block_number) =
480            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
481        {
482            state_provider = state_provider.with_lowest_available_storage_history_block_number(
483                prune_checkpoint_block_number + 1,
484            );
485        }
486
487        Ok(Box::new(state_provider))
488    }
489}
490
491/// For a given key, unwind all history shards that contain block numbers at or above the given
492/// block number.
493///
494/// S - Sharded key subtype.
495/// T - Table to walk over.
496/// C - Cursor implementation.
497///
498/// This function walks the entries from the given start key and deletes all shards that belong to
499/// the key and contain block numbers at or above the given block number. Shards entirely below
500/// the block number are preserved.
501///
502/// The boundary shard (the shard that spans across the block number) is removed from the database.
503/// Any indices that are below the block number are filtered out and returned for reinsertion.
504/// The boundary shard is returned for reinsertion (if it's not empty).
505fn unwind_history_shards<S, T, C>(
506    cursor: &mut C,
507    start_key: T::Key,
508    block_number: BlockNumber,
509    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
510) -> ProviderResult<Vec<u64>>
511where
512    T: Table<Value = BlockNumberList>,
513    T::Key: AsRef<ShardedKey<S>>,
514    C: DbCursorRO<T> + DbCursorRW<T>,
515{
516    // Start from the given key and iterate through shards
517    let mut item = cursor.seek_exact(start_key)?;
518    while let Some((sharded_key, list)) = item {
519        // If the shard does not belong to the key, break.
520        if !shard_belongs_to_key(&sharded_key) {
521            break
522        }
523
524        // Always delete the current shard from the database first
525        // We'll decide later what (if anything) to reinsert
526        cursor.delete_current()?;
527
528        // Get the first (lowest) block number in this shard
529        // All block numbers in a shard are sorted in ascending order
530        let first = list.iter().next().expect("List can't be empty");
531
532        // Case 1: Entire shard is at or above the unwinding point
533        // Keep it deleted (don't return anything for reinsertion)
534        if first >= block_number {
535            item = cursor.prev()?;
536            continue
537        }
538        // Case 2: This is a boundary shard (spans across the unwinding point)
539        // The shard contains some blocks below and some at/above the unwinding point
540        else if block_number <= sharded_key.as_ref().highest_block_number {
541            // Return only the block numbers that are below the unwinding point
542            // These will be reinserted to preserve the historical data
543            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
544        }
545        // Case 3: Entire shard is below the unwinding point
546        // Return all block numbers for reinsertion (preserve entire shard)
547        return Ok(list.iter().collect::<Vec<_>>())
548    }
549
550    // No shards found or all processed
551    Ok(Vec::new())
552}
553
554impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
555    /// Creates a provider with an inner read-only transaction.
556    pub const fn new(
557        tx: TX,
558        chain_spec: Arc<N::ChainSpec>,
559        static_file_provider: StaticFileProvider<N::Primitives>,
560        prune_modes: PruneModes,
561        storage: Arc<N::Storage>,
562        storage_settings: Arc<RwLock<StorageSettings>>,
563        rocksdb_provider: RocksDBProvider,
564    ) -> Self {
565        Self {
566            tx,
567            chain_spec,
568            static_file_provider,
569            prune_modes,
570            storage,
571            storage_settings,
572            rocksdb_provider,
573            #[cfg(all(unix, feature = "rocksdb"))]
574            pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
575            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
576        }
577    }
578
579    /// Consume `DbTx` or `DbTxMut`.
580    pub fn into_tx(self) -> TX {
581        self.tx
582    }
583
584    /// Pass `DbTx` or `DbTxMut` mutable reference.
585    pub const fn tx_mut(&mut self) -> &mut TX {
586        &mut self.tx
587    }
588
589    /// Pass `DbTx` or `DbTxMut` immutable reference.
590    pub const fn tx_ref(&self) -> &TX {
591        &self.tx
592    }
593
594    /// Returns a reference to the chain specification.
595    pub fn chain_spec(&self) -> &N::ChainSpec {
596        &self.chain_spec
597    }
598}
599
600impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
601    fn recovered_block<H, HF, B, BF>(
602        &self,
603        id: BlockHashOrNumber,
604        _transaction_kind: TransactionVariant,
605        header_by_number: HF,
606        construct_block: BF,
607    ) -> ProviderResult<Option<B>>
608    where
609        H: AsRef<HeaderTy<N>>,
610        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
611        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
612    {
613        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
614        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
615
616        // Get the block body
617        //
618        // If the body indices are not found, this means that the transactions either do not exist
619        // in the database yet, or they do exit but are not indexed. If they exist but are not
620        // indexed, we don't have enough information to return the block anyways, so we return
621        // `None`.
622        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
623
624        let tx_range = body.tx_num_range();
625
626        let (transactions, senders) = if tx_range.is_empty() {
627            (vec![], vec![])
628        } else {
629            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
630        };
631
632        let body = self
633            .storage
634            .reader()
635            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
636            .pop()
637            .ok_or(ProviderError::InvalidStorageOutput)?;
638
639        construct_block(header, body, senders)
640    }
641
642    /// Returns a range of blocks from the database.
643    ///
644    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
645    /// construct blocks from the following inputs:
646    ///     – Header
647    ///     - Range of transaction numbers
648    ///     – Ommers
649    ///     – Withdrawals
650    ///     – Senders
651    fn block_range<F, H, HF, R>(
652        &self,
653        range: RangeInclusive<BlockNumber>,
654        headers_range: HF,
655        mut assemble_block: F,
656    ) -> ProviderResult<Vec<R>>
657    where
658        H: AsRef<HeaderTy<N>>,
659        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
660        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
661    {
662        if range.is_empty() {
663            return Ok(Vec::new())
664        }
665
666        let len = range.end().saturating_sub(*range.start()) as usize;
667        let mut blocks = Vec::with_capacity(len);
668
669        let headers = headers_range(range.clone())?;
670
671        // If the body indices are not found, this means that the transactions either do
672        // not exist in the database yet, or they do exit but are
673        // not indexed. If they exist but are not indexed, we don't
674        // have enough information to return the block anyways, so
675        // we skip the block.
676        let present_headers = self
677            .block_body_indices_range(range)?
678            .into_iter()
679            .map(|b| b.tx_num_range())
680            .zip(headers)
681            .collect::<Vec<_>>();
682
683        let mut inputs = Vec::new();
684        for (tx_range, header) in &present_headers {
685            let transactions = if tx_range.is_empty() {
686                Vec::new()
687            } else {
688                self.transactions_by_tx_range(tx_range.clone())?
689            };
690
691            inputs.push((header.as_ref(), transactions));
692        }
693
694        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
695
696        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
697            blocks.push(assemble_block(header, body, tx_range)?);
698        }
699
700        Ok(blocks)
701    }
702
703    /// Returns a range of blocks from the database, along with the senders of each
704    /// transaction in the blocks.
705    ///
706    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
707    /// construct blocks from the following inputs:
708    ///     – Header
709    ///     - Transactions
710    ///     – Ommers
711    ///     – Withdrawals
712    ///     – Senders
713    fn block_with_senders_range<H, HF, B, BF>(
714        &self,
715        range: RangeInclusive<BlockNumber>,
716        headers_range: HF,
717        assemble_block: BF,
718    ) -> ProviderResult<Vec<B>>
719    where
720        H: AsRef<HeaderTy<N>>,
721        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
722        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
723    {
724        self.block_range(range, headers_range, |header, body, tx_range| {
725            let senders = if tx_range.is_empty() {
726                Vec::new()
727            } else {
728                let known_senders: HashMap<TxNumber, Address> =
729                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
730
731                let mut senders = Vec::with_capacity(body.transactions().len());
732                for (tx_num, tx) in tx_range.zip(body.transactions()) {
733                    match known_senders.get(&tx_num) {
734                        None => {
735                            // recover the sender from the transaction if not found
736                            let sender = tx.recover_signer_unchecked()?;
737                            senders.push(sender);
738                        }
739                        Some(sender) => senders.push(*sender),
740                    }
741                }
742
743                senders
744            };
745
746            assemble_block(header, body, senders)
747        })
748    }
749
750    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
751    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
752    /// account changesets.
753    fn populate_bundle_state<A, S>(
754        &self,
755        account_changeset: Vec<(u64, AccountBeforeTx)>,
756        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
757        plain_accounts_cursor: &mut A,
758        plain_storage_cursor: &mut S,
759    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
760    where
761        A: DbCursorRO<PlainAccountState>,
762        S: DbDupCursorRO<PlainStorageState>,
763    {
764        // iterate previous value and get plain state value to create changeset
765        // Double option around Account represent if Account state is know (first option) and
766        // account is removed (Second Option)
767        let mut state: BundleStateInit = HashMap::default();
768
769        // This is not working for blocks that are not at tip. as plain state is not the last
770        // state of end range. We should rename the functions or add support to access
771        // History state. Accessing history state can be tricky but we are not gaining
772        // anything.
773
774        let mut reverts: RevertsInit = HashMap::default();
775
776        // add account changeset changes
777        for (block_number, account_before) in account_changeset.into_iter().rev() {
778            let AccountBeforeTx { info: old_info, address } = account_before;
779            match state.entry(address) {
780                hash_map::Entry::Vacant(entry) => {
781                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
782                    entry.insert((old_info, new_info, HashMap::default()));
783                }
784                hash_map::Entry::Occupied(mut entry) => {
785                    // overwrite old account state.
786                    entry.get_mut().0 = old_info;
787                }
788            }
789            // insert old info into reverts.
790            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
791        }
792
793        // add storage changeset changes
794        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
795            let BlockNumberAddress((block_number, address)) = block_and_address;
796            // get account state or insert from plain state.
797            let account_state = match state.entry(address) {
798                hash_map::Entry::Vacant(entry) => {
799                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
800                    entry.insert((present_info, present_info, HashMap::default()))
801                }
802                hash_map::Entry::Occupied(entry) => entry.into_mut(),
803            };
804
805            // match storage.
806            match account_state.2.entry(old_storage.key) {
807                hash_map::Entry::Vacant(entry) => {
808                    let new_storage = plain_storage_cursor
809                        .seek_by_key_subkey(address, old_storage.key)?
810                        .filter(|storage| storage.key == old_storage.key)
811                        .unwrap_or_default();
812                    entry.insert((old_storage.value, new_storage.value));
813                }
814                hash_map::Entry::Occupied(mut entry) => {
815                    entry.get_mut().0 = old_storage.value;
816                }
817            };
818
819            reverts
820                .entry(block_number)
821                .or_default()
822                .entry(address)
823                .or_default()
824                .1
825                .push(old_storage);
826        }
827
828        Ok((state, reverts))
829    }
830}
831
832impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
833    /// Insert history index to the database.
834    ///
835    /// For each updated partial key, this function retrieves the last shard from the database
836    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
837    /// the shards back into the database.
838    ///
839    /// This function is used by history indexing stages.
840    fn append_history_index<P, T>(
841        &self,
842        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
843        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
844    ) -> ProviderResult<()>
845    where
846        P: Copy,
847        T: Table<Value = BlockNumberList>,
848    {
849        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
850        // will append duplicate entries instead of updating existing ones, causing data corruption.
851        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
852
853        let mut cursor = self.tx.cursor_write::<T>()?;
854
855        for (partial_key, indices) in index_updates {
856            let last_key = sharded_key_factory(partial_key, u64::MAX);
857            let mut last_shard = cursor
858                .seek_exact(last_key.clone())?
859                .map(|(_, list)| list)
860                .unwrap_or_else(BlockNumberList::empty);
861
862            last_shard.append(indices).map_err(ProviderError::other)?;
863
864            // fast path: all indices fit in one shard
865            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
866                cursor.upsert(last_key, &last_shard)?;
867                continue;
868            }
869
870            // slow path: rechunk into multiple shards
871            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
872            let mut chunks_peekable = chunks.into_iter().peekable();
873
874            while let Some(chunk) = chunks_peekable.next() {
875                let shard = BlockNumberList::new_pre_sorted(chunk);
876                let highest_block_number = if chunks_peekable.peek().is_some() {
877                    shard.iter().next_back().expect("`chunks` does not return empty list")
878                } else {
879                    // Insert last list with `u64::MAX`.
880                    u64::MAX
881                };
882
883                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
884            }
885        }
886
887        Ok(())
888    }
889}
890
891impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
892    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
893        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
894    }
895}
896
897impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
898    fn changed_accounts_with_range(
899        &self,
900        range: impl RangeBounds<BlockNumber>,
901    ) -> ProviderResult<BTreeSet<Address>> {
902        self.tx
903            .cursor_read::<tables::AccountChangeSets>()?
904            .walk_range(range)?
905            .map(|entry| {
906                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
907            })
908            .collect()
909    }
910
911    fn basic_accounts(
912        &self,
913        iter: impl IntoIterator<Item = Address>,
914    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
915        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
916        Ok(iter
917            .into_iter()
918            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
919            .collect::<Result<Vec<_>, _>>()?)
920    }
921
922    fn changed_accounts_and_blocks_with_range(
923        &self,
924        range: RangeInclusive<BlockNumber>,
925    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
926        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
927
928        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
929            BTreeMap::new(),
930            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
931                let (index, account) = entry?;
932                accounts.entry(account.address).or_default().push(index);
933                Ok(accounts)
934            },
935        )?;
936
937        Ok(account_transitions)
938    }
939}
940
941impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
942    fn storage_changeset(
943        &self,
944        block_number: BlockNumber,
945    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
946        let range = block_number..=block_number;
947        let storage_range = BlockNumberAddress::range(range);
948        self.tx
949            .cursor_dup_read::<tables::StorageChangeSets>()?
950            .walk_range(storage_range)?
951            .map(|result| -> ProviderResult<_> { Ok(result?) })
952            .collect()
953    }
954}
955
956impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
957    fn account_block_changeset(
958        &self,
959        block_number: BlockNumber,
960    ) -> ProviderResult<Vec<AccountBeforeTx>> {
961        let range = block_number..=block_number;
962        self.tx
963            .cursor_read::<tables::AccountChangeSets>()?
964            .walk_range(range)?
965            .map(|result| -> ProviderResult<_> {
966                let (_, account_before) = result?;
967                Ok(account_before)
968            })
969            .collect()
970    }
971
972    fn get_account_before_block(
973        &self,
974        block_number: BlockNumber,
975        address: Address,
976    ) -> ProviderResult<Option<AccountBeforeTx>> {
977        self.tx
978            .cursor_dup_read::<tables::AccountChangeSets>()?
979            .seek_by_key_subkey(block_number, address)?
980            .filter(|acc| acc.address == address)
981            .map(Ok)
982            .transpose()
983    }
984}
985
986impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
987    for DatabaseProvider<TX, N>
988{
989    type Header = HeaderTy<N>;
990
991    fn local_tip_header(
992        &self,
993        highest_uninterrupted_block: BlockNumber,
994    ) -> ProviderResult<SealedHeader<Self::Header>> {
995        let static_file_provider = self.static_file_provider();
996
997        // Make sure Headers static file is at the same height. If it's further, this
998        // input execution was interrupted previously and we need to unwind the static file.
999        let next_static_file_block_num = static_file_provider
1000            .get_highest_static_file_block(StaticFileSegment::Headers)
1001            .map(|id| id + 1)
1002            .unwrap_or_default();
1003        let next_block = highest_uninterrupted_block + 1;
1004
1005        match next_static_file_block_num.cmp(&next_block) {
1006            // The node shutdown between an executed static file commit and before the database
1007            // commit, so we need to unwind the static files.
1008            Ordering::Greater => {
1009                let mut static_file_producer =
1010                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1011                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1012                // Since this is a database <-> static file inconsistency, we commit the change
1013                // straight away.
1014                static_file_producer.commit()?
1015            }
1016            Ordering::Less => {
1017                // There's either missing or corrupted files.
1018                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1019            }
1020            Ordering::Equal => {}
1021        }
1022
1023        let local_head = static_file_provider
1024            .sealed_header(highest_uninterrupted_block)?
1025            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1026
1027        Ok(local_head)
1028    }
1029}
1030
1031impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1032    type Header = HeaderTy<N>;
1033
1034    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1035        if let Some(num) = self.block_number(block_hash)? {
1036            Ok(self.header_by_number(num)?)
1037        } else {
1038            Ok(None)
1039        }
1040    }
1041
1042    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1043        self.static_file_provider.header_by_number(num)
1044    }
1045
1046    fn headers_range(
1047        &self,
1048        range: impl RangeBounds<BlockNumber>,
1049    ) -> ProviderResult<Vec<Self::Header>> {
1050        self.static_file_provider.headers_range(range)
1051    }
1052
1053    fn sealed_header(
1054        &self,
1055        number: BlockNumber,
1056    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1057        self.static_file_provider.sealed_header(number)
1058    }
1059
1060    fn sealed_headers_while(
1061        &self,
1062        range: impl RangeBounds<BlockNumber>,
1063        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1064    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1065        self.static_file_provider.sealed_headers_while(range, predicate)
1066    }
1067}
1068
1069impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1070    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1071        self.static_file_provider.block_hash(number)
1072    }
1073
1074    fn canonical_hashes_range(
1075        &self,
1076        start: BlockNumber,
1077        end: BlockNumber,
1078    ) -> ProviderResult<Vec<B256>> {
1079        self.static_file_provider.canonical_hashes_range(start, end)
1080    }
1081}
1082
1083impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1084    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1085        let best_number = self.best_block_number()?;
1086        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1087        Ok(ChainInfo { best_hash, best_number })
1088    }
1089
1090    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1091        // The best block number is tracked via the finished stage which gets updated in the same tx
1092        // when new blocks committed
1093        Ok(self
1094            .get_stage_checkpoint(StageId::Finish)?
1095            .map(|checkpoint| checkpoint.block_number)
1096            .unwrap_or_default())
1097    }
1098
1099    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1100        self.static_file_provider.last_block_number()
1101    }
1102
1103    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1104        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1105    }
1106}
1107
1108impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1109    type Block = BlockTy<N>;
1110
1111    fn find_block_by_hash(
1112        &self,
1113        hash: B256,
1114        source: BlockSource,
1115    ) -> ProviderResult<Option<Self::Block>> {
1116        if source.is_canonical() {
1117            self.block(hash.into())
1118        } else {
1119            Ok(None)
1120        }
1121    }
1122
1123    /// Returns the block with matching number from database.
1124    ///
1125    /// If the header for this block is not found, this returns `None`.
1126    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1127    /// will return None.
1128    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1129        if let Some(number) = self.convert_hash_or_number(id)? &&
1130            let Some(header) = self.header_by_number(number)?
1131        {
1132            // If the body indices are not found, this means that the transactions either do not
1133            // exist in the database yet, or they do exit but are not indexed.
1134            // If they exist but are not indexed, we don't have enough
1135            // information to return the block anyways, so we return `None`.
1136            let Some(transactions) = self.transactions_by_block(number.into())? else {
1137                return Ok(None)
1138            };
1139
1140            let body = self
1141                .storage
1142                .reader()
1143                .read_block_bodies(self, vec![(&header, transactions)])?
1144                .pop()
1145                .ok_or(ProviderError::InvalidStorageOutput)?;
1146
1147            return Ok(Some(Self::Block::new(header, body)))
1148        }
1149
1150        Ok(None)
1151    }
1152
1153    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1154        Ok(None)
1155    }
1156
1157    fn pending_block_and_receipts(
1158        &self,
1159    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1160        Ok(None)
1161    }
1162
1163    /// Returns the block with senders with matching number or hash from database.
1164    ///
1165    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1166    /// spot, and we want fast querying.**
1167    ///
1168    /// If the header for this block is not found, this returns `None`.
1169    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1170    /// will return None.
1171    fn recovered_block(
1172        &self,
1173        id: BlockHashOrNumber,
1174        transaction_kind: TransactionVariant,
1175    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1176        self.recovered_block(
1177            id,
1178            transaction_kind,
1179            |block_number| self.header_by_number(block_number),
1180            |header, body, senders| {
1181                Self::Block::new(header, body)
1182                    // Note: we're using unchecked here because we know the block contains valid txs
1183                    // wrt to its height and can ignore the s value check so pre
1184                    // EIP-2 txs are allowed
1185                    .try_into_recovered_unchecked(senders)
1186                    .map(Some)
1187                    .map_err(|_| ProviderError::SenderRecoveryError)
1188            },
1189        )
1190    }
1191
1192    fn sealed_block_with_senders(
1193        &self,
1194        id: BlockHashOrNumber,
1195        transaction_kind: TransactionVariant,
1196    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1197        self.recovered_block(
1198            id,
1199            transaction_kind,
1200            |block_number| self.sealed_header(block_number),
1201            |header, body, senders| {
1202                Self::Block::new_sealed(header, body)
1203                    // Note: we're using unchecked here because we know the block contains valid txs
1204                    // wrt to its height and can ignore the s value check so pre
1205                    // EIP-2 txs are allowed
1206                    .try_with_senders_unchecked(senders)
1207                    .map(Some)
1208                    .map_err(|_| ProviderError::SenderRecoveryError)
1209            },
1210        )
1211    }
1212
1213    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1214        self.block_range(
1215            range,
1216            |range| self.headers_range(range),
1217            |header, body, _| Ok(Self::Block::new(header, body)),
1218        )
1219    }
1220
1221    fn block_with_senders_range(
1222        &self,
1223        range: RangeInclusive<BlockNumber>,
1224    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1225        self.block_with_senders_range(
1226            range,
1227            |range| self.headers_range(range),
1228            |header, body, senders| {
1229                Self::Block::new(header, body)
1230                    .try_into_recovered_unchecked(senders)
1231                    .map_err(|_| ProviderError::SenderRecoveryError)
1232            },
1233        )
1234    }
1235
1236    fn recovered_block_range(
1237        &self,
1238        range: RangeInclusive<BlockNumber>,
1239    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1240        self.block_with_senders_range(
1241            range,
1242            |range| self.sealed_headers_range(range),
1243            |header, body, senders| {
1244                Self::Block::new_sealed(header, body)
1245                    .try_with_senders(senders)
1246                    .map_err(|_| ProviderError::SenderRecoveryError)
1247            },
1248        )
1249    }
1250
1251    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1252        Ok(self
1253            .tx
1254            .cursor_read::<tables::TransactionBlocks>()?
1255            .seek(id)
1256            .map(|b| b.map(|(_, bn)| bn))?)
1257    }
1258}
1259
1260impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1261    for DatabaseProvider<TX, N>
1262{
1263    /// Recovers transaction hashes by walking through `Transactions` table and
1264    /// calculating them in a parallel manner. Returned unsorted.
1265    fn transaction_hashes_by_range(
1266        &self,
1267        tx_range: Range<TxNumber>,
1268    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1269        self.static_file_provider.transaction_hashes_by_range(tx_range)
1270    }
1271}
1272
1273// Calculates the hash of the given transaction
1274impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1275    type Transaction = TxTy<N>;
1276
1277    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1278        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1279    }
1280
1281    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1282        self.static_file_provider.transaction_by_id(id)
1283    }
1284
1285    fn transaction_by_id_unhashed(
1286        &self,
1287        id: TxNumber,
1288    ) -> ProviderResult<Option<Self::Transaction>> {
1289        self.static_file_provider.transaction_by_id_unhashed(id)
1290    }
1291
1292    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1293        if let Some(id) = self.transaction_id(hash)? {
1294            Ok(self.transaction_by_id_unhashed(id)?)
1295        } else {
1296            Ok(None)
1297        }
1298    }
1299
1300    fn transaction_by_hash_with_meta(
1301        &self,
1302        tx_hash: TxHash,
1303    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1304        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1305            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1306            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1307            let Some(sealed_header) = self.sealed_header(block_number)?
1308        {
1309            let (header, block_hash) = sealed_header.split();
1310            if let Some(block_body) = self.block_body_indices(block_number)? {
1311                // the index of the tx in the block is the offset:
1312                // len([start..tx_id])
1313                // NOTE: `transaction_id` is always `>=` the block's first
1314                // index
1315                let index = transaction_id - block_body.first_tx_num();
1316
1317                let meta = TransactionMeta {
1318                    tx_hash,
1319                    index,
1320                    block_hash,
1321                    block_number,
1322                    base_fee: header.base_fee_per_gas(),
1323                    excess_blob_gas: header.excess_blob_gas(),
1324                    timestamp: header.timestamp(),
1325                };
1326
1327                return Ok(Some((transaction, meta)))
1328            }
1329        }
1330
1331        Ok(None)
1332    }
1333
1334    fn transactions_by_block(
1335        &self,
1336        id: BlockHashOrNumber,
1337    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1338        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1339            let Some(body) = self.block_body_indices(block_number)?
1340        {
1341            let tx_range = body.tx_num_range();
1342            return if tx_range.is_empty() {
1343                Ok(Some(Vec::new()))
1344            } else {
1345                self.transactions_by_tx_range(tx_range).map(Some)
1346            }
1347        }
1348        Ok(None)
1349    }
1350
1351    fn transactions_by_block_range(
1352        &self,
1353        range: impl RangeBounds<BlockNumber>,
1354    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1355        let range = to_range(range);
1356
1357        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1358            .into_iter()
1359            .map(|body| {
1360                let tx_num_range = body.tx_num_range();
1361                if tx_num_range.is_empty() {
1362                    Ok(Vec::new())
1363                } else {
1364                    self.transactions_by_tx_range(tx_num_range)
1365                }
1366            })
1367            .collect()
1368    }
1369
1370    fn transactions_by_tx_range(
1371        &self,
1372        range: impl RangeBounds<TxNumber>,
1373    ) -> ProviderResult<Vec<Self::Transaction>> {
1374        self.static_file_provider.transactions_by_tx_range(range)
1375    }
1376
1377    fn senders_by_tx_range(
1378        &self,
1379        range: impl RangeBounds<TxNumber>,
1380    ) -> ProviderResult<Vec<Address>> {
1381        if EitherWriterDestination::senders(self).is_static_file() {
1382            self.static_file_provider.senders_by_tx_range(range)
1383        } else {
1384            self.cursor_read_collect::<tables::TransactionSenders>(range)
1385        }
1386    }
1387
1388    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1389        if EitherWriterDestination::senders(self).is_static_file() {
1390            self.static_file_provider.transaction_sender(id)
1391        } else {
1392            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1393        }
1394    }
1395}
1396
1397impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1398    type Receipt = ReceiptTy<N>;
1399
1400    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1401        self.static_file_provider.get_with_static_file_or_database(
1402            StaticFileSegment::Receipts,
1403            id,
1404            |static_file| static_file.receipt(id),
1405            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1406        )
1407    }
1408
1409    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1410        if let Some(id) = self.transaction_id(hash)? {
1411            self.receipt(id)
1412        } else {
1413            Ok(None)
1414        }
1415    }
1416
1417    fn receipts_by_block(
1418        &self,
1419        block: BlockHashOrNumber,
1420    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1421        if let Some(number) = self.convert_hash_or_number(block)? &&
1422            let Some(body) = self.block_body_indices(number)?
1423        {
1424            let tx_range = body.tx_num_range();
1425            return if tx_range.is_empty() {
1426                Ok(Some(Vec::new()))
1427            } else {
1428                self.receipts_by_tx_range(tx_range).map(Some)
1429            }
1430        }
1431        Ok(None)
1432    }
1433
1434    fn receipts_by_tx_range(
1435        &self,
1436        range: impl RangeBounds<TxNumber>,
1437    ) -> ProviderResult<Vec<Self::Receipt>> {
1438        self.static_file_provider.get_range_with_static_file_or_database(
1439            StaticFileSegment::Receipts,
1440            to_range(range),
1441            |static_file, range, _| static_file.receipts_by_tx_range(range),
1442            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1443            |_| true,
1444        )
1445    }
1446
1447    fn receipts_by_block_range(
1448        &self,
1449        block_range: RangeInclusive<BlockNumber>,
1450    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1451        if block_range.is_empty() {
1452            return Ok(Vec::new());
1453        }
1454
1455        // collect block body indices for each block in the range
1456        let mut block_body_indices = Vec::new();
1457        for block_num in block_range {
1458            if let Some(indices) = self.block_body_indices(block_num)? {
1459                block_body_indices.push(indices);
1460            } else {
1461                // use default indices for missing blocks (empty block)
1462                block_body_indices.push(StoredBlockBodyIndices::default());
1463            }
1464        }
1465
1466        if block_body_indices.is_empty() {
1467            return Ok(Vec::new());
1468        }
1469
1470        // find blocks with transactions to determine transaction range
1471        let non_empty_blocks: Vec<_> =
1472            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1473
1474        if non_empty_blocks.is_empty() {
1475            // all blocks are empty
1476            return Ok(vec![Vec::new(); block_body_indices.len()]);
1477        }
1478
1479        // calculate the overall transaction range
1480        let first_tx = non_empty_blocks[0].first_tx_num();
1481        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1482
1483        // fetch all receipts in the transaction range
1484        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1485        let mut receipts_iter = all_receipts.into_iter();
1486
1487        // distribute receipts to their respective blocks
1488        let mut result = Vec::with_capacity(block_body_indices.len());
1489        for indices in &block_body_indices {
1490            if indices.tx_count == 0 {
1491                result.push(Vec::new());
1492            } else {
1493                let block_receipts =
1494                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1495                result.push(block_receipts);
1496            }
1497        }
1498
1499        Ok(result)
1500    }
1501}
1502
1503impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1504    for DatabaseProvider<TX, N>
1505{
1506    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1507        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1508    }
1509
1510    fn block_body_indices_range(
1511        &self,
1512        range: RangeInclusive<BlockNumber>,
1513    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1514        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1515    }
1516}
1517
1518impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1519    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1520        Ok(if let Some(encoded) = id.get_pre_encoded() {
1521            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1522        } else {
1523            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1524        })
1525    }
1526
1527    /// Get stage checkpoint progress.
1528    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1529        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1530    }
1531
1532    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1533        self.tx
1534            .cursor_read::<tables::StageCheckpoints>()?
1535            .walk(None)?
1536            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1537            .map_err(ProviderError::Database)
1538    }
1539}
1540
1541impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1542    /// Save stage checkpoint.
1543    fn save_stage_checkpoint(
1544        &self,
1545        id: StageId,
1546        checkpoint: StageCheckpoint,
1547    ) -> ProviderResult<()> {
1548        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1549    }
1550
1551    /// Save stage checkpoint progress.
1552    fn save_stage_checkpoint_progress(
1553        &self,
1554        id: StageId,
1555        checkpoint: Vec<u8>,
1556    ) -> ProviderResult<()> {
1557        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1558    }
1559
1560    fn update_pipeline_stages(
1561        &self,
1562        block_number: BlockNumber,
1563        drop_stage_checkpoint: bool,
1564    ) -> ProviderResult<()> {
1565        // iterate over all existing stages in the table and update its progress.
1566        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1567        for stage_id in StageId::ALL {
1568            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1569            cursor.upsert(
1570                stage_id.to_string(),
1571                &StageCheckpoint {
1572                    block_number,
1573                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1574                },
1575            )?;
1576        }
1577
1578        Ok(())
1579    }
1580}
1581
1582impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1583    fn plain_state_storages(
1584        &self,
1585        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1586    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1587        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1588
1589        addresses_with_keys
1590            .into_iter()
1591            .map(|(address, storage)| {
1592                storage
1593                    .into_iter()
1594                    .map(|key| -> ProviderResult<_> {
1595                        Ok(plain_storage
1596                            .seek_by_key_subkey(address, key)?
1597                            .filter(|v| v.key == key)
1598                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1599                    })
1600                    .collect::<ProviderResult<Vec<_>>>()
1601                    .map(|storage| (address, storage))
1602            })
1603            .collect::<ProviderResult<Vec<(_, _)>>>()
1604    }
1605
1606    fn changed_storages_with_range(
1607        &self,
1608        range: RangeInclusive<BlockNumber>,
1609    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1610        self.tx
1611            .cursor_read::<tables::StorageChangeSets>()?
1612            .walk_range(BlockNumberAddress::range(range))?
1613            // fold all storages and save its old state so we can remove it from HashedStorage
1614            // it is needed as it is dup table.
1615            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1616                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1617                accounts.entry(address).or_default().insert(storage_entry.key);
1618                Ok(accounts)
1619            })
1620    }
1621
1622    fn changed_storages_and_blocks_with_range(
1623        &self,
1624        range: RangeInclusive<BlockNumber>,
1625    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1626        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1627
1628        let storage_changeset_lists =
1629            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1630                BTreeMap::new(),
1631                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1632                    let (index, storage) = entry?;
1633                    storages
1634                        .entry((index.address(), storage.key))
1635                        .or_default()
1636                        .push(index.block_number());
1637                    Ok(storages)
1638                },
1639            )?;
1640
1641        Ok(storage_changeset_lists)
1642    }
1643}
1644
1645impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1646    for DatabaseProvider<TX, N>
1647{
1648    type Receipt = ReceiptTy<N>;
1649
1650    fn write_state(
1651        &self,
1652        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1653        is_value_known: OriginalValuesKnown,
1654    ) -> ProviderResult<()> {
1655        let first_block = execution_outcome.first_block();
1656        let block_count = execution_outcome.len() as u64;
1657        let last_block = execution_outcome.last_block();
1658        let block_range = first_block..=last_block;
1659
1660        let tip = self.last_block_number()?.max(last_block);
1661
1662        let (plain_state, reverts) =
1663            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1664
1665        self.write_state_reverts(reverts, first_block)?;
1666        self.write_state_changes(plain_state)?;
1667
1668        // Fetch the first transaction number for each block in the range
1669        let block_indices: Vec<_> = self
1670            .block_body_indices_range(block_range)?
1671            .into_iter()
1672            .map(|b| b.first_tx_num)
1673            .collect();
1674
1675        // Ensure all expected blocks are present.
1676        if block_indices.len() < block_count as usize {
1677            let missing_blocks = block_count - block_indices.len() as u64;
1678            return Err(ProviderError::BlockBodyIndicesNotFound(
1679                last_block.saturating_sub(missing_blocks - 1),
1680            ));
1681        }
1682
1683        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
1684
1685        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1686        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1687
1688        // All receipts from the last 128 blocks are required for blockchain tree, even with
1689        // [`PruneSegment::ContractLogs`].
1690        //
1691        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
1692        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
1693        // files yet. Once receipts exist in static files, we must continue writing to maintain
1694        // continuity and have no gaps.
1695        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
1696            self.static_file_provider()
1697                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1698                .is_none()) &&
1699            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
1700
1701        // Prepare set of addresses which logs should not be pruned.
1702        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1703        for (_, addresses) in contract_log_pruner.range(..first_block) {
1704            allowed_addresses.extend(addresses.iter().copied());
1705        }
1706
1707        for (idx, (receipts, first_tx_index)) in
1708            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1709        {
1710            let block_number = first_block + idx as u64;
1711
1712            // Increment block number for receipts static file writer
1713            receipts_writer.increment_block(block_number)?;
1714
1715            // Skip writing receipts if pruning configuration requires us to.
1716            if prunable_receipts &&
1717                self.prune_modes
1718                    .receipts
1719                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1720            {
1721                continue
1722            }
1723
1724            // If there are new addresses to retain after this block number, track them
1725            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1726                allowed_addresses.extend(new_addresses.iter().copied());
1727            }
1728
1729            for (idx, receipt) in receipts.iter().enumerate() {
1730                let receipt_idx = first_tx_index + idx as u64;
1731                // Skip writing receipt if log filter is active and it does not have any logs to
1732                // retain
1733                if prunable_receipts &&
1734                    has_contract_log_filter &&
1735                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1736                {
1737                    continue
1738                }
1739
1740                receipts_writer.append_receipt(receipt_idx, receipt)?;
1741            }
1742        }
1743
1744        Ok(())
1745    }
1746
1747    fn write_state_reverts(
1748        &self,
1749        reverts: PlainStateReverts,
1750        first_block: BlockNumber,
1751    ) -> ProviderResult<()> {
1752        // Write storage changes
1753        tracing::trace!("Writing storage changes");
1754        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1755        let mut storage_changeset_cursor =
1756            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1757        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1758            let block_number = first_block + block_index as BlockNumber;
1759
1760            tracing::trace!(block_number, "Writing block change");
1761            // sort changes by address.
1762            storage_changes.par_sort_unstable_by_key(|a| a.address);
1763            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1764                let storage_id = BlockNumberAddress((block_number, address));
1765
1766                let mut storage = storage_revert
1767                    .into_iter()
1768                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1769                    .collect::<Vec<_>>();
1770                // sort storage slots by key.
1771                storage.par_sort_unstable_by_key(|a| a.0);
1772
1773                // If we are writing the primary storage wipe transition, the pre-existing plain
1774                // storage state has to be taken from the database and written to storage history.
1775                // See [StorageWipe::Primary] for more details.
1776                //
1777                // TODO(mediocregopher): This could be rewritten in a way which doesn't require
1778                // collecting wiped entries into a Vec like this, see
1779                // `write_storage_trie_changesets`.
1780                let mut wiped_storage = Vec::new();
1781                if wiped {
1782                    tracing::trace!(?address, "Wiping storage");
1783                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1784                        wiped_storage.push((entry.key, entry.value));
1785                        while let Some(entry) = storages_cursor.next_dup_val()? {
1786                            wiped_storage.push((entry.key, entry.value))
1787                        }
1788                    }
1789                }
1790
1791                tracing::trace!(?address, ?storage, "Writing storage reverts");
1792                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1793                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1794                }
1795            }
1796        }
1797
1798        // Write account changes
1799        tracing::trace!("Writing account changes");
1800        let mut account_changeset_cursor =
1801            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1802
1803        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1804            let block_number = first_block + block_index as BlockNumber;
1805            // Sort accounts by address.
1806            account_block_reverts.par_sort_by_key(|a| a.0);
1807
1808            for (address, info) in account_block_reverts {
1809                account_changeset_cursor.append_dup(
1810                    block_number,
1811                    AccountBeforeTx { address, info: info.map(Into::into) },
1812                )?;
1813            }
1814        }
1815
1816        Ok(())
1817    }
1818
1819    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1820        // sort all entries so they can be written to database in more performant way.
1821        // and take smaller memory footprint.
1822        changes.accounts.par_sort_by_key(|a| a.0);
1823        changes.storage.par_sort_by_key(|a| a.address);
1824        changes.contracts.par_sort_by_key(|a| a.0);
1825
1826        // Write new account state
1827        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1828        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1829        // write account to database.
1830        for (address, account) in changes.accounts {
1831            if let Some(account) = account {
1832                tracing::trace!(?address, "Updating plain state account");
1833                accounts_cursor.upsert(address, &account.into())?;
1834            } else if accounts_cursor.seek_exact(address)?.is_some() {
1835                tracing::trace!(?address, "Deleting plain state account");
1836                accounts_cursor.delete_current()?;
1837            }
1838        }
1839
1840        // Write bytecode
1841        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1842        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1843        for (hash, bytecode) in changes.contracts {
1844            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1845        }
1846
1847        // Write new storage state and wipe storage if needed.
1848        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1849        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1850        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1851            // Wiping of storage.
1852            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1853                storages_cursor.delete_current_duplicates()?;
1854            }
1855            // cast storages to B256.
1856            let mut storage = storage
1857                .into_iter()
1858                .map(|(k, value)| StorageEntry { key: k.into(), value })
1859                .collect::<Vec<_>>();
1860            // sort storage slots by key.
1861            storage.par_sort_unstable_by_key(|a| a.key);
1862
1863            for entry in storage {
1864                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1865                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1866                    db_entry.key == entry.key
1867                {
1868                    storages_cursor.delete_current()?;
1869                }
1870
1871                if !entry.value.is_zero() {
1872                    storages_cursor.upsert(address, &entry)?;
1873                }
1874            }
1875        }
1876
1877        Ok(())
1878    }
1879
1880    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1881        // Write hashed account updates.
1882        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1883        for (hashed_address, account) in hashed_state.accounts() {
1884            if let Some(account) = account {
1885                hashed_accounts_cursor.upsert(*hashed_address, account)?;
1886            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
1887                hashed_accounts_cursor.delete_current()?;
1888            }
1889        }
1890
1891        // Write hashed storage changes.
1892        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1893        let mut hashed_storage_cursor =
1894            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1895        for (hashed_address, storage) in sorted_storages {
1896            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1897                hashed_storage_cursor.delete_current_duplicates()?;
1898            }
1899
1900            for (hashed_slot, value) in storage.storage_slots_ref() {
1901                let entry = StorageEntry { key: *hashed_slot, value: *value };
1902
1903                if let Some(db_entry) =
1904                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1905                    db_entry.key == entry.key
1906                {
1907                    hashed_storage_cursor.delete_current()?;
1908                }
1909
1910                if !entry.value.is_zero() {
1911                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1912                }
1913            }
1914        }
1915
1916        Ok(())
1917    }
1918
1919    /// Remove the last N blocks of state.
1920    ///
1921    /// The latest state will be unwound
1922    ///
1923    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
1924    ///    transaction ids.
1925    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
1926    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
1927    ///    the changesets.
1928    ///    - In order to have both the old and new values in the changesets, we also access the
1929    ///      plain state tables.
1930    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
1931    ///    we:
1932    ///     1. Take the old value from the changeset
1933    ///     2. Take the new value from the plain state
1934    ///     3. Save the old value to the local state
1935    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
1936    ///    have seen before we:
1937    ///     1. Take the old value from the changeset
1938    ///     2. Take the new value from the local state
1939    ///     3. Set the local state to the value in the changeset
1940    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1941        let range = block + 1..=self.last_block_number()?;
1942
1943        if range.is_empty() {
1944            return Ok(());
1945        }
1946
1947        // We are not removing block meta as it is used to get block changesets.
1948        let block_bodies = self.block_body_indices_range(range.clone())?;
1949
1950        // get transaction receipts
1951        let from_transaction_num =
1952            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1953
1954        let storage_range = BlockNumberAddress::range(range.clone());
1955
1956        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1957        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1958
1959        // This is not working for blocks that are not at tip. as plain state is not the last
1960        // state of end range. We should rename the functions or add support to access
1961        // History state. Accessing history state can be tricky but we are not gaining
1962        // anything.
1963        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1964        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1965
1966        let (state, _) = self.populate_bundle_state(
1967            account_changeset,
1968            storage_changeset,
1969            &mut plain_accounts_cursor,
1970            &mut plain_storage_cursor,
1971        )?;
1972
1973        // iterate over local plain state remove all account and all storages.
1974        for (address, (old_account, new_account, storage)) in &state {
1975            // revert account if needed.
1976            if old_account != new_account {
1977                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1978                if let Some(account) = old_account {
1979                    plain_accounts_cursor.upsert(*address, account)?;
1980                } else if existing_entry.is_some() {
1981                    plain_accounts_cursor.delete_current()?;
1982                }
1983            }
1984
1985            // revert storages
1986            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1987                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1988                // delete previous value
1989                if plain_storage_cursor
1990                    .seek_by_key_subkey(*address, *storage_key)?
1991                    .filter(|s| s.key == *storage_key)
1992                    .is_some()
1993                {
1994                    plain_storage_cursor.delete_current()?
1995                }
1996
1997                // insert value if needed
1998                if !old_storage_value.is_zero() {
1999                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2000                }
2001            }
2002        }
2003
2004        self.remove_receipts_from(from_transaction_num, block)?;
2005
2006        Ok(())
2007    }
2008
2009    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2010    ///
2011    /// The latest state will be unwound and returned back with all the blocks
2012    ///
2013    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2014    ///    transaction ids.
2015    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2016    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2017    ///    the changesets.
2018    ///    - In order to have both the old and new values in the changesets, we also access the
2019    ///      plain state tables.
2020    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2021    ///    we:
2022    ///     1. Take the old value from the changeset
2023    ///     2. Take the new value from the plain state
2024    ///     3. Save the old value to the local state
2025    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2026    ///    have seen before we:
2027    ///     1. Take the old value from the changeset
2028    ///     2. Take the new value from the local state
2029    ///     3. Set the local state to the value in the changeset
2030    fn take_state_above(
2031        &self,
2032        block: BlockNumber,
2033    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2034        let range = block + 1..=self.last_block_number()?;
2035
2036        if range.is_empty() {
2037            return Ok(ExecutionOutcome::default())
2038        }
2039        let start_block_number = *range.start();
2040
2041        // We are not removing block meta as it is used to get block changesets.
2042        let block_bodies = self.block_body_indices_range(range.clone())?;
2043
2044        // get transaction receipts
2045        let from_transaction_num =
2046            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2047        let to_transaction_num =
2048            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2049
2050        let storage_range = BlockNumberAddress::range(range.clone());
2051
2052        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2053        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2054
2055        // This is not working for blocks that are not at tip. as plain state is not the last
2056        // state of end range. We should rename the functions or add support to access
2057        // History state. Accessing history state can be tricky but we are not gaining
2058        // anything.
2059        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2060        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2061
2062        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2063        // remove, and return later
2064        let (state, reverts) = self.populate_bundle_state(
2065            account_changeset,
2066            storage_changeset,
2067            &mut plain_accounts_cursor,
2068            &mut plain_storage_cursor,
2069        )?;
2070
2071        // iterate over local plain state remove all account and all storages.
2072        for (address, (old_account, new_account, storage)) in &state {
2073            // revert account if needed.
2074            if old_account != new_account {
2075                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2076                if let Some(account) = old_account {
2077                    plain_accounts_cursor.upsert(*address, account)?;
2078                } else if existing_entry.is_some() {
2079                    plain_accounts_cursor.delete_current()?;
2080                }
2081            }
2082
2083            // revert storages
2084            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2085                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2086                // delete previous value
2087                if plain_storage_cursor
2088                    .seek_by_key_subkey(*address, *storage_key)?
2089                    .filter(|s| s.key == *storage_key)
2090                    .is_some()
2091                {
2092                    plain_storage_cursor.delete_current()?
2093                }
2094
2095                // insert value if needed
2096                if !old_storage_value.is_zero() {
2097                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2098                }
2099            }
2100        }
2101
2102        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2103        let mut receipts_iter = self
2104            .static_file_provider
2105            .get_range_with_static_file_or_database(
2106                StaticFileSegment::Receipts,
2107                from_transaction_num..to_transaction_num + 1,
2108                |static_file, range, _| {
2109                    static_file
2110                        .receipts_by_tx_range(range.clone())
2111                        .map(|r| range.into_iter().zip(r).collect())
2112                },
2113                |range, _| {
2114                    self.tx
2115                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2116                        .walk_range(range)?
2117                        .map(|r| r.map_err(Into::into))
2118                        .collect()
2119                },
2120                |_| true,
2121            )?
2122            .into_iter()
2123            .peekable();
2124
2125        let mut receipts = Vec::with_capacity(block_bodies.len());
2126        // loop break if we are at the end of the blocks.
2127        for block_body in block_bodies {
2128            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2129            for num in block_body.tx_num_range() {
2130                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2131                    block_receipts.push(receipts_iter.next().unwrap().1);
2132                }
2133            }
2134            receipts.push(block_receipts);
2135        }
2136
2137        self.remove_receipts_from(from_transaction_num, block)?;
2138
2139        Ok(ExecutionOutcome::new_init(
2140            state,
2141            reverts,
2142            Vec::new(),
2143            receipts,
2144            start_block_number,
2145            Vec::new(),
2146        ))
2147    }
2148}
2149
2150impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2151    /// Writes trie updates to the database with already sorted updates.
2152    ///
2153    /// Returns the number of entries modified.
2154    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2155        if trie_updates.is_empty() {
2156            return Ok(0)
2157        }
2158
2159        // Track the number of inserted entries.
2160        let mut num_entries = 0;
2161
2162        let tx = self.tx_ref();
2163        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2164
2165        // Process sorted account nodes
2166        for (key, updated_node) in trie_updates.account_nodes_ref() {
2167            let nibbles = StoredNibbles(*key);
2168            match updated_node {
2169                Some(node) => {
2170                    if !nibbles.0.is_empty() {
2171                        num_entries += 1;
2172                        account_trie_cursor.upsert(nibbles, node)?;
2173                    }
2174                }
2175                None => {
2176                    num_entries += 1;
2177                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2178                        account_trie_cursor.delete_current()?;
2179                    }
2180                }
2181            }
2182        }
2183
2184        num_entries +=
2185            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2186
2187        Ok(num_entries)
2188    }
2189
2190    /// Records the current values of all trie nodes which will be updated using the `TrieUpdates`
2191    /// into the trie changesets tables.
2192    ///
2193    /// The intended usage of this method is to call it _prior_ to calling `write_trie_updates` with
2194    /// the same `TrieUpdates`.
2195    ///
2196    /// Returns the number of keys written.
2197    fn write_trie_changesets(
2198        &self,
2199        block_number: BlockNumber,
2200        trie_updates: &TrieUpdatesSorted,
2201        updates_overlay: Option<&TrieUpdatesSorted>,
2202    ) -> ProviderResult<usize> {
2203        let mut num_entries = 0;
2204
2205        let mut changeset_cursor =
2206            self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2207        let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2208
2209        // Wrap the cursor in DatabaseAccountTrieCursor
2210        let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2211
2212        // Create empty TrieUpdatesSorted for when updates_overlay is None
2213        let empty_updates = TrieUpdatesSorted::default();
2214        let overlay = updates_overlay.unwrap_or(&empty_updates);
2215
2216        // Wrap the cursor in InMemoryTrieCursor with the overlay
2217        let mut in_memory_account_cursor =
2218            InMemoryTrieCursor::new_account(&mut db_account_cursor, overlay);
2219
2220        for (path, _) in trie_updates.account_nodes_ref() {
2221            num_entries += 1;
2222            let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2223            changeset_cursor.append_dup(
2224                block_number,
2225                TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2226            )?;
2227        }
2228
2229        let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2230        storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2231
2232        num_entries += self.write_storage_trie_changesets(
2233            block_number,
2234            storage_updates.into_iter(),
2235            updates_overlay,
2236        )?;
2237
2238        Ok(num_entries)
2239    }
2240
2241    fn clear_trie_changesets(&self) -> ProviderResult<()> {
2242        let tx = self.tx_ref();
2243        tx.clear::<tables::AccountsTrieChangeSets>()?;
2244        tx.clear::<tables::StoragesTrieChangeSets>()?;
2245        Ok(())
2246    }
2247
2248    fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2249        let tx = self.tx_ref();
2250        {
2251            let range = from..;
2252            let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2253            let mut walker = cursor.walk_range(range)?;
2254
2255            while walker.next().transpose()?.is_some() {
2256                walker.delete_current()?;
2257            }
2258        }
2259
2260        {
2261            let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2262            let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2263            let mut walker = cursor.walk_range(range)?;
2264
2265            while walker.next().transpose()?.is_some() {
2266                walker.delete_current()?;
2267            }
2268        }
2269
2270        Ok(())
2271    }
2272}
2273
2274impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2275    fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2276        let tx = self.tx_ref();
2277
2278        // Read account trie changes directly into a Vec - data is already sorted by nibbles
2279        // within each block, and we want the oldest (first) version of each node sorted by path.
2280        let mut account_nodes = Vec::new();
2281        let mut seen_account_keys = HashSet::new();
2282        let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2283
2284        for entry in accounts_cursor.walk_range(from..)? {
2285            let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2286            // Only keep the first (oldest) version of each node
2287            if seen_account_keys.insert(nibbles.0) {
2288                account_nodes.push((nibbles.0, node));
2289            }
2290        }
2291
2292        account_nodes.sort_by_key(|(path, _)| *path);
2293
2294        // Read storage trie changes - data is sorted by (block, hashed_address, nibbles)
2295        // Keep track of seen (address, nibbles) pairs to only keep the oldest version per address,
2296        // sorted by path.
2297        let mut storage_tries = B256Map::<Vec<_>>::default();
2298        let mut seen_storage_keys = HashSet::new();
2299        let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2300
2301        // Create storage range starting from `from` block
2302        let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2303
2304        for entry in storages_cursor.walk_range(storage_range_start..)? {
2305            let (
2306                BlockNumberHashedAddress((_, hashed_address)),
2307                TrieChangeSetsEntry { nibbles, node },
2308            ) = entry?;
2309
2310            // Only keep the first (oldest) version of each node for this address
2311            if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2312                storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2313            }
2314        }
2315
2316        // Convert to StorageTrieUpdatesSorted
2317        let storage_tries = storage_tries
2318            .into_iter()
2319            .map(|(address, mut nodes)| {
2320                nodes.sort_by_key(|(path, _)| *path);
2321                (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2322            })
2323            .collect();
2324
2325        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2326    }
2327
2328    fn get_block_trie_updates(
2329        &self,
2330        block_number: BlockNumber,
2331    ) -> ProviderResult<TrieUpdatesSorted> {
2332        let tx = self.tx_ref();
2333
2334        // Step 1: Get the trie reverts for the state after the target block
2335        let reverts = self.trie_reverts(block_number + 1)?;
2336
2337        // Step 2: Create an InMemoryTrieCursorFactory with the reverts
2338        // This gives us the trie state as it was after the target block was processed
2339        let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2340        let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2341
2342        // Step 3: Collect all account trie nodes that changed in the target block
2343        let mut account_nodes = Vec::new();
2344
2345        // Walk through all account trie changes for this block
2346        let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2347        let mut account_cursor = cursor_factory.account_trie_cursor()?;
2348
2349        for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2350            let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2351            // Look up the current value of this trie node using the overlay cursor
2352            let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2353            account_nodes.push((nibbles.0, node_value));
2354        }
2355
2356        // Step 4: Collect all storage trie nodes that changed in the target block
2357        let mut storage_tries = B256Map::default();
2358        let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2359        let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2360        let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2361
2362        let mut current_hashed_address = None;
2363        let mut storage_cursor = None;
2364
2365        for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2366            let (
2367                BlockNumberHashedAddress((_, hashed_address)),
2368                TrieChangeSetsEntry { nibbles, .. },
2369            ) = entry?;
2370
2371            // Check if we need to create a new storage cursor for a different account
2372            if current_hashed_address != Some(hashed_address) {
2373                storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2374                current_hashed_address = Some(hashed_address);
2375            }
2376
2377            // Look up the current value of this storage trie node
2378            let cursor =
2379                storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2380            let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2381            storage_tries
2382                .entry(hashed_address)
2383                .or_insert_with(|| StorageTrieUpdatesSorted {
2384                    storage_nodes: Vec::new(),
2385                    is_deleted: false,
2386                })
2387                .storage_nodes
2388                .push((nibbles.0, node_value));
2389        }
2390
2391        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2392    }
2393}
2394
2395impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2396    /// Writes storage trie updates from the given storage trie map with already sorted updates.
2397    ///
2398    /// Expects the storage trie updates to already be sorted by the hashed address key.
2399    ///
2400    /// Returns the number of entries modified.
2401    fn write_storage_trie_updates_sorted<'a>(
2402        &self,
2403        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2404    ) -> ProviderResult<usize> {
2405        let mut num_entries = 0;
2406        let mut storage_tries = storage_tries.collect::<Vec<_>>();
2407        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2408        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2409        for (hashed_address, storage_trie_updates) in storage_tries {
2410            let mut db_storage_trie_cursor =
2411                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2412            num_entries +=
2413                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2414            cursor = db_storage_trie_cursor.cursor;
2415        }
2416
2417        Ok(num_entries)
2418    }
2419
2420    /// Records the current values of all trie nodes which will be updated using the
2421    /// `StorageTrieUpdates` into the storage trie changesets table.
2422    ///
2423    /// The intended usage of this method is to call it _prior_ to calling
2424    /// `write_storage_trie_updates` with the same set of `StorageTrieUpdates`.
2425    ///
2426    /// Returns the number of keys written.
2427    fn write_storage_trie_changesets<'a>(
2428        &self,
2429        block_number: BlockNumber,
2430        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2431        updates_overlay: Option<&TrieUpdatesSorted>,
2432    ) -> ProviderResult<usize> {
2433        let mut num_written = 0;
2434
2435        let mut changeset_cursor =
2436            self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2437
2438        // We hold two cursors to the same table because we use them simultaneously when an
2439        // account's storage is wiped. We keep them outside the for-loop so they can be re-used
2440        // between accounts.
2441        let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2442        let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2443
2444        // DatabaseStorageTrieCursor requires ownership of the cursor. The easiest way to deal with
2445        // this is to create this outer variable with an initial dummy account, and overwrite it on
2446        // every loop for every real account.
2447        let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2448            changed_curr_values_cursor,
2449            B256::default(), // Will be set per iteration
2450        );
2451        let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2452            wiped_nodes_cursor,
2453            B256::default(), // Will be set per iteration
2454        );
2455
2456        // Create empty TrieUpdatesSorted for when updates_overlay is None
2457        let empty_updates = TrieUpdatesSorted::default();
2458
2459        for (hashed_address, storage_trie_updates) in storage_tries {
2460            let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2461
2462            // Update the hashed address for the cursors
2463            changed_curr_values_cursor =
2464                DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2465
2466            // Get the overlay updates, or use empty updates
2467            let overlay = updates_overlay.unwrap_or(&empty_updates);
2468
2469            // Wrap the cursor in InMemoryTrieCursor with the overlay
2470            let mut in_memory_changed_cursor = InMemoryTrieCursor::new_storage(
2471                &mut changed_curr_values_cursor,
2472                overlay,
2473                *hashed_address,
2474            );
2475
2476            // Create an iterator which produces the current values of all updated paths, or None if
2477            // they are currently unset.
2478            let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2479                storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2480                &mut in_memory_changed_cursor,
2481            )?;
2482
2483            if storage_trie_updates.is_deleted() {
2484                // Create an iterator that starts from the beginning of the storage trie for this
2485                // account
2486                wiped_nodes_cursor =
2487                    DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2488
2489                // Wrap the wiped nodes cursor in InMemoryTrieCursor with the overlay
2490                let mut in_memory_wiped_cursor = InMemoryTrieCursor::new_storage(
2491                    &mut wiped_nodes_cursor,
2492                    overlay,
2493                    *hashed_address,
2494                );
2495
2496                let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2497
2498                for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2499                    let (path, node) = wiped?;
2500                    num_written += 1;
2501                    changeset_cursor.append_dup(
2502                        changeset_key,
2503                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2504                    )?;
2505                }
2506            } else {
2507                for curr_value in curr_values_of_changed {
2508                    let (path, node) = curr_value?;
2509                    num_written += 1;
2510                    changeset_cursor.append_dup(
2511                        changeset_key,
2512                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2513                    )?;
2514                }
2515            }
2516        }
2517
2518        Ok(num_written)
2519    }
2520}
2521
2522impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2523    fn unwind_account_hashing<'a>(
2524        &self,
2525        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2526    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2527        // Aggregate all block changesets and make a list of accounts that have been changed.
2528        // Note that collecting and then reversing the order is necessary to ensure that the
2529        // changes are applied in the correct order.
2530        let hashed_accounts = changesets
2531            .into_iter()
2532            .map(|(_, e)| (keccak256(e.address), e.info))
2533            .collect::<Vec<_>>()
2534            .into_iter()
2535            .rev()
2536            .collect::<BTreeMap<_, _>>();
2537
2538        // Apply values to HashedState, and remove the account if it's None.
2539        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2540        for (hashed_address, account) in &hashed_accounts {
2541            if let Some(account) = account {
2542                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2543            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2544                hashed_accounts_cursor.delete_current()?;
2545            }
2546        }
2547
2548        Ok(hashed_accounts)
2549    }
2550
2551    fn unwind_account_hashing_range(
2552        &self,
2553        range: impl RangeBounds<BlockNumber>,
2554    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2555        let changesets = self
2556            .tx
2557            .cursor_read::<tables::AccountChangeSets>()?
2558            .walk_range(range)?
2559            .collect::<Result<Vec<_>, _>>()?;
2560        self.unwind_account_hashing(changesets.iter())
2561    }
2562
2563    fn insert_account_for_hashing(
2564        &self,
2565        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2566    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2567        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2568        let hashed_accounts =
2569            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2570        for (hashed_address, account) in &hashed_accounts {
2571            if let Some(account) = account {
2572                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2573            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2574                hashed_accounts_cursor.delete_current()?;
2575            }
2576        }
2577        Ok(hashed_accounts)
2578    }
2579
2580    fn unwind_storage_hashing(
2581        &self,
2582        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2583    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2584        // Aggregate all block changesets and make list of accounts that have been changed.
2585        let mut hashed_storages = changesets
2586            .into_iter()
2587            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2588                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2589            })
2590            .collect::<Vec<_>>();
2591        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2592
2593        // Apply values to HashedState, and remove the account if it's None.
2594        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2595            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2596        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2597        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2598            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2599
2600            if hashed_storage
2601                .seek_by_key_subkey(hashed_address, key)?
2602                .filter(|entry| entry.key == key)
2603                .is_some()
2604            {
2605                hashed_storage.delete_current()?;
2606            }
2607
2608            if !value.is_zero() {
2609                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2610            }
2611        }
2612        Ok(hashed_storage_keys)
2613    }
2614
2615    fn unwind_storage_hashing_range(
2616        &self,
2617        range: impl RangeBounds<BlockNumberAddress>,
2618    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2619        let changesets = self
2620            .tx
2621            .cursor_read::<tables::StorageChangeSets>()?
2622            .walk_range(range)?
2623            .collect::<Result<Vec<_>, _>>()?;
2624        self.unwind_storage_hashing(changesets.into_iter())
2625    }
2626
2627    fn insert_storage_for_hashing(
2628        &self,
2629        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2630    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2631        // hash values
2632        let hashed_storages =
2633            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2634                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2635                    map.insert(keccak256(entry.key), entry.value);
2636                    map
2637                });
2638                map.insert(keccak256(address), storage);
2639                map
2640            });
2641
2642        let hashed_storage_keys = hashed_storages
2643            .iter()
2644            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2645            .collect();
2646
2647        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2648        // Hash the address and key and apply them to HashedStorage (if Storage is None
2649        // just remove it);
2650        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2651            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2652                if hashed_storage_cursor
2653                    .seek_by_key_subkey(hashed_address, key)?
2654                    .filter(|entry| entry.key == key)
2655                    .is_some()
2656                {
2657                    hashed_storage_cursor.delete_current()?;
2658                }
2659
2660                if !value.is_zero() {
2661                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2662                }
2663                Ok(())
2664            })
2665        })?;
2666
2667        Ok(hashed_storage_keys)
2668    }
2669}
2670
2671impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2672    fn unwind_account_history_indices<'a>(
2673        &self,
2674        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2675    ) -> ProviderResult<usize> {
2676        let mut last_indices = changesets
2677            .into_iter()
2678            .map(|(index, account)| (account.address, *index))
2679            .collect::<Vec<_>>();
2680        last_indices.sort_by_key(|(a, _)| *a);
2681
2682        // Unwind the account history index.
2683        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2684        for &(address, rem_index) in &last_indices {
2685            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2686                &mut cursor,
2687                ShardedKey::last(address),
2688                rem_index,
2689                |sharded_key| sharded_key.key == address,
2690            )?;
2691
2692            // Check the last returned partial shard.
2693            // If it's not empty, the shard needs to be reinserted.
2694            if !partial_shard.is_empty() {
2695                cursor.insert(
2696                    ShardedKey::last(address),
2697                    &BlockNumberList::new_pre_sorted(partial_shard),
2698                )?;
2699            }
2700        }
2701
2702        let changesets = last_indices.len();
2703        Ok(changesets)
2704    }
2705
2706    fn unwind_account_history_indices_range(
2707        &self,
2708        range: impl RangeBounds<BlockNumber>,
2709    ) -> ProviderResult<usize> {
2710        let changesets = self
2711            .tx
2712            .cursor_read::<tables::AccountChangeSets>()?
2713            .walk_range(range)?
2714            .collect::<Result<Vec<_>, _>>()?;
2715        self.unwind_account_history_indices(changesets.iter())
2716    }
2717
2718    fn insert_account_history_index(
2719        &self,
2720        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2721    ) -> ProviderResult<()> {
2722        self.append_history_index::<_, tables::AccountsHistory>(
2723            account_transitions,
2724            ShardedKey::new,
2725        )
2726    }
2727
2728    fn unwind_storage_history_indices(
2729        &self,
2730        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2731    ) -> ProviderResult<usize> {
2732        let mut storage_changesets = changesets
2733            .into_iter()
2734            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2735            .collect::<Vec<_>>();
2736        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2737
2738        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2739        for &(address, storage_key, rem_index) in &storage_changesets {
2740            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2741                &mut cursor,
2742                StorageShardedKey::last(address, storage_key),
2743                rem_index,
2744                |storage_sharded_key| {
2745                    storage_sharded_key.address == address &&
2746                        storage_sharded_key.sharded_key.key == storage_key
2747                },
2748            )?;
2749
2750            // Check the last returned partial shard.
2751            // If it's not empty, the shard needs to be reinserted.
2752            if !partial_shard.is_empty() {
2753                cursor.insert(
2754                    StorageShardedKey::last(address, storage_key),
2755                    &BlockNumberList::new_pre_sorted(partial_shard),
2756                )?;
2757            }
2758        }
2759
2760        let changesets = storage_changesets.len();
2761        Ok(changesets)
2762    }
2763
2764    fn unwind_storage_history_indices_range(
2765        &self,
2766        range: impl RangeBounds<BlockNumberAddress>,
2767    ) -> ProviderResult<usize> {
2768        let changesets = self
2769            .tx
2770            .cursor_read::<tables::StorageChangeSets>()?
2771            .walk_range(range)?
2772            .collect::<Result<Vec<_>, _>>()?;
2773        self.unwind_storage_history_indices(changesets.into_iter())
2774    }
2775
2776    fn insert_storage_history_index(
2777        &self,
2778        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2779    ) -> ProviderResult<()> {
2780        self.append_history_index::<_, tables::StoragesHistory>(
2781            storage_transitions,
2782            |(address, storage_key), highest_block_number| {
2783                StorageShardedKey::new(address, storage_key, highest_block_number)
2784            },
2785        )
2786    }
2787
2788    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2789        // account history stage
2790        {
2791            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2792            self.insert_account_history_index(indices)?;
2793        }
2794
2795        // storage history stage
2796        {
2797            let indices = self.changed_storages_and_blocks_with_range(range)?;
2798            self.insert_storage_history_index(indices)?;
2799        }
2800
2801        Ok(())
2802    }
2803}
2804
2805impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2806    for DatabaseProvider<TX, N>
2807{
2808    fn take_block_and_execution_above(
2809        &self,
2810        block: BlockNumber,
2811    ) -> ProviderResult<Chain<Self::Primitives>> {
2812        let range = block + 1..=self.last_block_number()?;
2813
2814        self.unwind_trie_state_from(block + 1)?;
2815
2816        // get execution res
2817        let execution_state = self.take_state_above(block)?;
2818
2819        let blocks = self.recovered_block_range(range)?;
2820
2821        // remove block bodies it is needed for both get block range and get block execution results
2822        // that is why it is deleted afterwards.
2823        self.remove_blocks_above(block)?;
2824
2825        // Update pipeline progress
2826        self.update_pipeline_stages(block, true)?;
2827
2828        Ok(Chain::new(blocks, execution_state, None))
2829    }
2830
2831    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2832        self.unwind_trie_state_from(block + 1)?;
2833
2834        // remove execution res
2835        self.remove_state_above(block)?;
2836
2837        // remove block bodies it is needed for both get block range and get block execution results
2838        // that is why it is deleted afterwards.
2839        self.remove_blocks_above(block)?;
2840
2841        // Update pipeline progress
2842        self.update_pipeline_stages(block, true)?;
2843
2844        Ok(())
2845    }
2846}
2847
2848impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2849    for DatabaseProvider<TX, N>
2850{
2851    type Block = BlockTy<N>;
2852    type Receipt = ReceiptTy<N>;
2853
2854    /// Inserts the block into the database, always modifying the following static file segments and
2855    /// tables:
2856    /// * [`StaticFileSegment::Headers`]
2857    /// * [`tables::HeaderNumbers`]
2858    /// * [`tables::BlockBodyIndices`]
2859    ///
2860    /// If there are transactions in the block, the following static file segments and tables will
2861    /// be modified:
2862    /// * [`StaticFileSegment::Transactions`]
2863    /// * [`tables::TransactionBlocks`]
2864    ///
2865    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2866    /// If withdrawals are not empty, this will modify
2867    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2868    ///
2869    /// If the provider has __not__ configured full sender pruning, this will modify either:
2870    /// * [`StaticFileSegment::TransactionSenders`] if senders are written to static files
2871    /// * [`tables::TransactionSenders`] if senders are written to the database
2872    ///
2873    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2874    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2875    fn insert_block(
2876        &self,
2877        block: &RecoveredBlock<Self::Block>,
2878    ) -> ProviderResult<StoredBlockBodyIndices> {
2879        let block_number = block.number();
2880        let tx_count = block.body().transaction_count() as u64;
2881
2882        let mut durations_recorder = metrics::DurationsRecorder::default();
2883
2884        self.static_file_provider
2885            .get_writer(block_number, StaticFileSegment::Headers)?
2886            .append_header(block.header(), &block.hash())?;
2887
2888        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2889        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2890
2891        let first_tx_num = self
2892            .tx
2893            .cursor_read::<tables::TransactionBlocks>()?
2894            .last()?
2895            .map(|(n, _)| n + 1)
2896            .unwrap_or_default();
2897        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2898
2899        let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
2900
2901        if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2902            let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
2903            senders_writer.increment_block(block.number())?;
2904            senders_writer
2905                .append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
2906            durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
2907        }
2908
2909        if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2910            for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
2911                let hash = transaction.tx_hash();
2912                self.tx.put::<tables::TransactionHashNumbers>(*hash, tx_num)?;
2913            }
2914            durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
2915        }
2916
2917        self.append_block_bodies(vec![(block_number, Some(block.body()))])?;
2918
2919        debug!(
2920            target: "providers::db",
2921            ?block_number,
2922            actions = ?durations_recorder.actions,
2923            "Inserted block"
2924        );
2925
2926        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2927    }
2928
2929    fn append_block_bodies(
2930        &self,
2931        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
2932    ) -> ProviderResult<()> {
2933        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2934
2935        // Initialize writer if we will be writing transactions to staticfiles
2936        let mut tx_writer =
2937            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2938
2939        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2940        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2941
2942        // Get id for the next tx_num or zero if there are no transactions.
2943        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2944
2945        for (block_number, body) in &bodies {
2946            // Increment block on static file header.
2947            tx_writer.increment_block(*block_number)?;
2948
2949            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2950            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2951
2952            let mut durations_recorder = metrics::DurationsRecorder::default();
2953
2954            // insert block meta
2955            block_indices_cursor.append(*block_number, &block_indices)?;
2956
2957            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2958
2959            let Some(body) = body else { continue };
2960
2961            // write transaction block index
2962            if !body.transactions().is_empty() {
2963                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2964                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2965            }
2966
2967            // write transactions
2968            for transaction in body.transactions() {
2969                tx_writer.append_transaction(next_tx_num, transaction)?;
2970
2971                // Increment transaction id for each transaction.
2972                next_tx_num += 1;
2973            }
2974        }
2975
2976        self.storage.writer().write_block_bodies(self, bodies)?;
2977
2978        Ok(())
2979    }
2980
2981    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2982        let last_block_number = self.last_block_number()?;
2983        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
2984        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
2985            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2986        }
2987
2988        // Get highest static file block for the total block range
2989        let highest_static_file_block = self
2990            .static_file_provider()
2991            .get_highest_static_file_block(StaticFileSegment::Headers)
2992            .expect("todo: error handling, headers should exist");
2993
2994        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
2995        // we remove only what is ABOVE the block.
2996        //
2997        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
2998        // will have three blocks to remove, which will be block 8, 7, and 6.
2999        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3000        self.static_file_provider()
3001            .get_writer(block, StaticFileSegment::Headers)?
3002            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3003
3004        // First transaction to be removed
3005        let unwind_tx_from = self
3006            .block_body_indices(block)?
3007            .map(|b| b.next_tx_num())
3008            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3009
3010        // Last transaction to be removed
3011        let unwind_tx_to = self
3012            .tx
3013            .cursor_read::<tables::BlockBodyIndices>()?
3014            .last()?
3015            // shouldn't happen because this was OK above
3016            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3017            .1
3018            .last_tx_num();
3019
3020        if unwind_tx_from <= unwind_tx_to {
3021            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3022                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3023            }
3024        }
3025
3026        EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3027
3028        self.remove_bodies_above(block)?;
3029
3030        Ok(())
3031    }
3032
3033    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3034        self.storage.writer().remove_block_bodies_above(self, block)?;
3035
3036        // First transaction to be removed
3037        let unwind_tx_from = self
3038            .block_body_indices(block)?
3039            .map(|b| b.next_tx_num())
3040            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3041
3042        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3043        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3044
3045        let static_file_tx_num =
3046            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3047
3048        let to_delete = static_file_tx_num
3049            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3050            .unwrap_or_default();
3051
3052        self.static_file_provider
3053            .latest_writer(StaticFileSegment::Transactions)?
3054            .prune_transactions(to_delete, block)?;
3055
3056        Ok(())
3057    }
3058
3059    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3060    fn append_blocks_with_state(
3061        &self,
3062        blocks: Vec<RecoveredBlock<Self::Block>>,
3063        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3064        hashed_state: HashedPostStateSorted,
3065    ) -> ProviderResult<()> {
3066        if blocks.is_empty() {
3067            debug!(target: "providers::db", "Attempted to append empty block range");
3068            return Ok(())
3069        }
3070
3071        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3072        // `None`.
3073        let first_number = blocks[0].number();
3074
3075        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3076        // `None`.
3077        let last_block_number = blocks[blocks.len() - 1].number();
3078
3079        let mut durations_recorder = metrics::DurationsRecorder::default();
3080
3081        // Insert the blocks
3082        for block in blocks {
3083            self.insert_block(&block)?;
3084            durations_recorder.record_relative(metrics::Action::InsertBlock);
3085        }
3086
3087        self.write_state(execution_outcome, OriginalValuesKnown::No)?;
3088        durations_recorder.record_relative(metrics::Action::InsertState);
3089
3090        // insert hashes and intermediate merkle nodes
3091        self.write_hashed_state(&hashed_state)?;
3092        durations_recorder.record_relative(metrics::Action::InsertHashes);
3093
3094        self.update_history_indices(first_number..=last_block_number)?;
3095        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3096
3097        // Update pipeline progress
3098        self.update_pipeline_stages(last_block_number, false)?;
3099        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3100
3101        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3102
3103        Ok(())
3104    }
3105}
3106
3107impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3108    fn get_prune_checkpoint(
3109        &self,
3110        segment: PruneSegment,
3111    ) -> ProviderResult<Option<PruneCheckpoint>> {
3112        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3113    }
3114
3115    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3116        Ok(PruneSegment::variants()
3117            .filter_map(|segment| {
3118                self.tx
3119                    .get::<tables::PruneCheckpoints>(segment)
3120                    .transpose()
3121                    .map(|chk| chk.map(|chk| (segment, chk)))
3122            })
3123            .collect::<Result<_, _>>()?)
3124    }
3125}
3126
3127impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3128    fn save_prune_checkpoint(
3129        &self,
3130        segment: PruneSegment,
3131        checkpoint: PruneCheckpoint,
3132    ) -> ProviderResult<()> {
3133        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3134    }
3135}
3136
3137impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3138    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3139        let db_entries = self.tx.entries::<T>()?;
3140        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3141            Ok(entries) => entries,
3142            Err(ProviderError::UnsupportedProvider) => 0,
3143            Err(err) => return Err(err),
3144        };
3145
3146        Ok(db_entries + static_file_entries)
3147    }
3148}
3149
3150impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3151    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3152        let mut finalized_blocks = self
3153            .tx
3154            .cursor_read::<tables::ChainState>()?
3155            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3156            .take(1)
3157            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3158
3159        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3160        Ok(last_finalized_block_number)
3161    }
3162
3163    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3164        let mut finalized_blocks = self
3165            .tx
3166            .cursor_read::<tables::ChainState>()?
3167            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3168            .take(1)
3169            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3170
3171        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3172        Ok(last_finalized_block_number)
3173    }
3174}
3175
3176impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3177    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3178        Ok(self
3179            .tx
3180            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3181    }
3182
3183    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3184        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3185    }
3186}
3187
3188impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3189    type Tx = TX;
3190
3191    fn tx_ref(&self) -> &Self::Tx {
3192        &self.tx
3193    }
3194
3195    fn tx_mut(&mut self) -> &mut Self::Tx {
3196        &mut self.tx
3197    }
3198
3199    fn into_tx(self) -> Self::Tx {
3200        self.tx
3201    }
3202
3203    fn prune_modes_ref(&self) -> &PruneModes {
3204        self.prune_modes_ref()
3205    }
3206
3207    /// Commit database transaction, static files, and pending `RocksDB` batches.
3208    fn commit(self) -> ProviderResult<bool> {
3209        // For unwinding it makes more sense to commit the database first, since if
3210        // it is interrupted before the static files commit, we can just
3211        // truncate the static files according to the
3212        // checkpoints on the next start-up.
3213        if self.static_file_provider.has_unwind_queued() {
3214            self.tx.commit()?;
3215
3216            #[cfg(all(unix, feature = "rocksdb"))]
3217            {
3218                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3219                for batch in batches {
3220                    self.rocksdb_provider.commit_batch(batch)?;
3221                }
3222            }
3223
3224            self.static_file_provider.commit()?;
3225        } else {
3226            self.static_file_provider.commit()?;
3227
3228            #[cfg(all(unix, feature = "rocksdb"))]
3229            {
3230                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3231                for batch in batches {
3232                    self.rocksdb_provider.commit_batch(batch)?;
3233                }
3234            }
3235
3236            self.tx.commit()?;
3237        }
3238
3239        Ok(true)
3240    }
3241}
3242
3243impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3244    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3245        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3246    }
3247}
3248
3249impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3250    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3251        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3252    }
3253}
3254
3255impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3256    fn cached_storage_settings(&self) -> StorageSettings {
3257        *self.storage_settings.read()
3258    }
3259
3260    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3261        *self.storage_settings.write() = settings;
3262    }
3263}
3264
3265#[cfg(test)]
3266mod tests {
3267    use super::*;
3268    use crate::{
3269        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3270        BlockWriter,
3271    };
3272    use reth_ethereum_primitives::Receipt;
3273    use reth_testing_utils::generators::{self, random_block, BlockParams};
3274    use reth_trie::Nibbles;
3275
3276    #[test]
3277    fn test_receipts_by_block_range_empty_range() {
3278        let factory = create_test_provider_factory();
3279        let provider = factory.provider().unwrap();
3280
3281        // empty range should return empty vec
3282        let start = 10u64;
3283        let end = 9u64;
3284        let result = provider.receipts_by_block_range(start..=end).unwrap();
3285        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3286    }
3287
3288    #[test]
3289    fn test_receipts_by_block_range_nonexistent_blocks() {
3290        let factory = create_test_provider_factory();
3291        let provider = factory.provider().unwrap();
3292
3293        // non-existent blocks should return empty vecs for each block
3294        let result = provider.receipts_by_block_range(10..=12).unwrap();
3295        assert_eq!(result, vec![vec![], vec![], vec![]]);
3296    }
3297
3298    #[test]
3299    fn test_receipts_by_block_range_single_block() {
3300        let factory = create_test_provider_factory();
3301        let data = BlockchainTestData::default();
3302
3303        let provider_rw = factory.provider_rw().unwrap();
3304        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3305        provider_rw
3306            .write_state(
3307                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3308                crate::OriginalValuesKnown::No,
3309            )
3310            .unwrap();
3311        provider_rw.insert_block(&data.blocks[0].0).unwrap();
3312        provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3313        provider_rw.commit().unwrap();
3314
3315        let provider = factory.provider().unwrap();
3316        let result = provider.receipts_by_block_range(1..=1).unwrap();
3317
3318        // should have one vec with one receipt
3319        assert_eq!(result.len(), 1);
3320        assert_eq!(result[0].len(), 1);
3321        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3322    }
3323
3324    #[test]
3325    fn test_receipts_by_block_range_multiple_blocks() {
3326        let factory = create_test_provider_factory();
3327        let data = BlockchainTestData::default();
3328
3329        let provider_rw = factory.provider_rw().unwrap();
3330        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3331        provider_rw
3332            .write_state(
3333                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3334                crate::OriginalValuesKnown::No,
3335            )
3336            .unwrap();
3337        for i in 0..3 {
3338            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3339            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3340        }
3341        provider_rw.commit().unwrap();
3342
3343        let provider = factory.provider().unwrap();
3344        let result = provider.receipts_by_block_range(1..=3).unwrap();
3345
3346        // should have 3 vecs, each with one receipt
3347        assert_eq!(result.len(), 3);
3348        for (i, block_receipts) in result.iter().enumerate() {
3349            assert_eq!(block_receipts.len(), 1);
3350            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3351        }
3352    }
3353
3354    #[test]
3355    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3356        let factory = create_test_provider_factory();
3357        let data = BlockchainTestData::default();
3358
3359        let provider_rw = factory.provider_rw().unwrap();
3360        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3361        provider_rw
3362            .write_state(
3363                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3364                crate::OriginalValuesKnown::No,
3365            )
3366            .unwrap();
3367
3368        // insert blocks 1-3 with receipts
3369        for i in 0..3 {
3370            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3371            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3372        }
3373        provider_rw.commit().unwrap();
3374
3375        let provider = factory.provider().unwrap();
3376        let result = provider.receipts_by_block_range(1..=3).unwrap();
3377
3378        // verify each block has one receipt
3379        assert_eq!(result.len(), 3);
3380        for block_receipts in &result {
3381            assert_eq!(block_receipts.len(), 1);
3382        }
3383    }
3384
3385    #[test]
3386    fn test_receipts_by_block_range_partial_range() {
3387        let factory = create_test_provider_factory();
3388        let data = BlockchainTestData::default();
3389
3390        let provider_rw = factory.provider_rw().unwrap();
3391        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3392        provider_rw
3393            .write_state(
3394                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3395                crate::OriginalValuesKnown::No,
3396            )
3397            .unwrap();
3398        for i in 0..3 {
3399            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3400            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3401        }
3402        provider_rw.commit().unwrap();
3403
3404        let provider = factory.provider().unwrap();
3405
3406        // request range that includes both existing and non-existing blocks
3407        let result = provider.receipts_by_block_range(2..=5).unwrap();
3408        assert_eq!(result.len(), 4);
3409
3410        // blocks 2-3 should have receipts, blocks 4-5 should be empty
3411        assert_eq!(result[0].len(), 1); // block 2
3412        assert_eq!(result[1].len(), 1); // block 3
3413        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
3414        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
3415
3416        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3417        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3418    }
3419
3420    #[test]
3421    fn test_receipts_by_block_range_all_empty_blocks() {
3422        let factory = create_test_provider_factory();
3423        let mut rng = generators::rng();
3424
3425        // create blocks with no transactions
3426        let mut blocks = Vec::new();
3427        for i in 0..3 {
3428            let block =
3429                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3430            blocks.push(block);
3431        }
3432
3433        let provider_rw = factory.provider_rw().unwrap();
3434        for block in blocks {
3435            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
3436        }
3437        provider_rw.commit().unwrap();
3438
3439        let provider = factory.provider().unwrap();
3440        let result = provider.receipts_by_block_range(1..=3).unwrap();
3441
3442        assert_eq!(result.len(), 3);
3443        for block_receipts in result {
3444            assert_eq!(block_receipts.len(), 0);
3445        }
3446    }
3447
3448    #[test]
3449    fn test_receipts_by_block_range_consistency_with_individual_calls() {
3450        let factory = create_test_provider_factory();
3451        let data = BlockchainTestData::default();
3452
3453        let provider_rw = factory.provider_rw().unwrap();
3454        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3455        provider_rw
3456            .write_state(
3457                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3458                crate::OriginalValuesKnown::No,
3459            )
3460            .unwrap();
3461        for i in 0..3 {
3462            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3463            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3464        }
3465        provider_rw.commit().unwrap();
3466
3467        let provider = factory.provider().unwrap();
3468
3469        // get receipts using block range method
3470        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3471
3472        // get receipts using individual block calls
3473        let mut individual_results = Vec::new();
3474        for block_num in 1..=3 {
3475            let receipts =
3476                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3477            individual_results.push(receipts);
3478        }
3479
3480        assert_eq!(range_result, individual_results);
3481    }
3482
3483    #[test]
3484    fn test_write_trie_changesets() {
3485        use reth_db_api::models::BlockNumberHashedAddress;
3486        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3487
3488        let factory = create_test_provider_factory();
3489        let provider_rw = factory.provider_rw().unwrap();
3490
3491        let block_number = 1u64;
3492
3493        // Create some test nibbles and nodes
3494        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3495        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3496
3497        let node1 = BranchNodeCompact::new(
3498            0b1111_1111_1111_1111, // state_mask
3499            0b0000_0000_0000_0000, // tree_mask
3500            0b0000_0000_0000_0000, // hash_mask
3501            vec![],                // hashes
3502            None,                  // root hash
3503        );
3504
3505        // Pre-populate AccountsTrie with a node that will be updated (for account_nibbles1)
3506        {
3507            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3508            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3509        }
3510
3511        // Create account trie updates: one Some (update) and one None (removal)
3512        let account_nodes = vec![
3513            (account_nibbles1, Some(node1.clone())), // This will update existing node
3514            (account_nibbles2, None),                // This will be a removal (no existing node)
3515        ];
3516
3517        // Create storage trie updates
3518        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3519        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3520
3521        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3522        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3523        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3524
3525        let storage_node1 = BranchNodeCompact::new(
3526            0b1111_0000_0000_0000,
3527            0b0000_0000_0000_0000,
3528            0b0000_0000_0000_0000,
3529            vec![],
3530            None,
3531        );
3532
3533        let storage_node2 = BranchNodeCompact::new(
3534            0b0000_1111_0000_0000,
3535            0b0000_0000_0000_0000,
3536            0b0000_0000_0000_0000,
3537            vec![],
3538            None,
3539        );
3540
3541        // Create an old version of storage_node1 to prepopulate
3542        let storage_node1_old = BranchNodeCompact::new(
3543            0b1010_0000_0000_0000, // Different mask to show it's an old value
3544            0b0000_0000_0000_0000,
3545            0b0000_0000_0000_0000,
3546            vec![],
3547            None,
3548        );
3549
3550        // Pre-populate StoragesTrie for normal storage (storage_address1)
3551        {
3552            let mut cursor =
3553                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3554            // Add node that will be updated (storage_nibbles1) with old value
3555            let entry = StorageTrieEntry {
3556                nibbles: StoredNibblesSubKey(storage_nibbles1),
3557                node: storage_node1_old.clone(),
3558            };
3559            cursor.upsert(storage_address1, &entry).unwrap();
3560        }
3561
3562        // Pre-populate StoragesTrie for wiped storage (storage_address2)
3563        {
3564            let mut cursor =
3565                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3566            // Add node that will be updated (storage_nibbles1)
3567            let entry1 = StorageTrieEntry {
3568                nibbles: StoredNibblesSubKey(storage_nibbles1),
3569                node: storage_node1.clone(),
3570            };
3571            cursor.upsert(storage_address2, &entry1).unwrap();
3572            // Add node that won't be updated but exists (storage_nibbles3)
3573            let entry3 = StorageTrieEntry {
3574                nibbles: StoredNibblesSubKey(storage_nibbles3),
3575                node: storage_node2.clone(),
3576            };
3577            cursor.upsert(storage_address2, &entry3).unwrap();
3578        }
3579
3580        // Normal storage trie: one Some (update) and one None (new)
3581        let storage_trie1 = StorageTrieUpdatesSorted {
3582            is_deleted: false,
3583            storage_nodes: vec![
3584                (storage_nibbles1, Some(storage_node1.clone())), // This will update existing node
3585                (storage_nibbles2, None),                        // This is a new node
3586            ],
3587        };
3588
3589        // Wiped storage trie
3590        let storage_trie2 = StorageTrieUpdatesSorted {
3591            is_deleted: true,
3592            storage_nodes: vec![
3593                (storage_nibbles1, Some(storage_node1.clone())), // Updated node already in db
3594                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in db
3595                                                                  * storage_nibbles3 is in db
3596                                                                  * but not updated */
3597            ],
3598        };
3599
3600        let mut storage_tries = B256Map::default();
3601        storage_tries.insert(storage_address1, storage_trie1);
3602        storage_tries.insert(storage_address2, storage_trie2);
3603
3604        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3605
3606        // Write the changesets
3607        let num_written =
3608            provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3609
3610        // Verify number of entries written
3611        // Account changesets: 2 (one update, one removal)
3612        // Storage changesets:
3613        //   - Normal storage: 2 (one update, one removal)
3614        //   - Wiped storage: 3 (two updated, one existing not updated)
3615        // Total: 2 + 2 + 3 = 7
3616        assert_eq!(num_written, 7);
3617
3618        // Verify account changesets were written correctly
3619        {
3620            let mut cursor =
3621                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3622
3623            // Get all entries for this block to see what was written
3624            let all_entries = cursor
3625                .walk_dup(Some(block_number), None)
3626                .unwrap()
3627                .collect::<Result<Vec<_>, _>>()
3628                .unwrap();
3629
3630            // Assert the full value of all_entries in a single assert_eq
3631            assert_eq!(
3632                all_entries,
3633                vec![
3634                    (
3635                        block_number,
3636                        TrieChangeSetsEntry {
3637                            nibbles: StoredNibblesSubKey(account_nibbles1),
3638                            node: Some(node1),
3639                        }
3640                    ),
3641                    (
3642                        block_number,
3643                        TrieChangeSetsEntry {
3644                            nibbles: StoredNibblesSubKey(account_nibbles2),
3645                            node: None,
3646                        }
3647                    ),
3648                ]
3649            );
3650        }
3651
3652        // Verify storage changesets were written correctly
3653        {
3654            let mut cursor =
3655                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3656
3657            // Check normal storage trie changesets
3658            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3659            let entries1 =
3660                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3661
3662            assert_eq!(
3663                entries1,
3664                vec![
3665                    (
3666                        key1,
3667                        TrieChangeSetsEntry {
3668                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3669                            node: Some(storage_node1_old), // Old value that was prepopulated
3670                        }
3671                    ),
3672                    (
3673                        key1,
3674                        TrieChangeSetsEntry {
3675                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3676                            node: None, // New node, no previous value
3677                        }
3678                    ),
3679                ]
3680            );
3681
3682            // Check wiped storage trie changesets
3683            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3684            let entries2 =
3685                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3686
3687            assert_eq!(
3688                entries2,
3689                vec![
3690                    (
3691                        key2,
3692                        TrieChangeSetsEntry {
3693                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3694                            node: Some(storage_node1), // Was in db, so has old value
3695                        }
3696                    ),
3697                    (
3698                        key2,
3699                        TrieChangeSetsEntry {
3700                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3701                            node: None, // Was not in db
3702                        }
3703                    ),
3704                    (
3705                        key2,
3706                        TrieChangeSetsEntry {
3707                            nibbles: StoredNibblesSubKey(storage_nibbles3),
3708                            node: Some(storage_node2), // Existing node in wiped storage
3709                        }
3710                    ),
3711                ]
3712            );
3713        }
3714
3715        provider_rw.commit().unwrap();
3716    }
3717
3718    #[test]
3719    fn test_write_trie_changesets_with_overlay() {
3720        use reth_db_api::models::BlockNumberHashedAddress;
3721        use reth_trie::BranchNodeCompact;
3722
3723        let factory = create_test_provider_factory();
3724        let provider_rw = factory.provider_rw().unwrap();
3725
3726        let block_number = 1u64;
3727
3728        // Create some test nibbles and nodes
3729        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3730        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3731
3732        let node1 = BranchNodeCompact::new(
3733            0b1111_1111_1111_1111, // state_mask
3734            0b0000_0000_0000_0000, // tree_mask
3735            0b0000_0000_0000_0000, // hash_mask
3736            vec![],                // hashes
3737            None,                  // root hash
3738        );
3739
3740        // NOTE: Unlike the previous test, we're NOT pre-populating the database
3741        // All node values will come from the overlay
3742
3743        // Create the overlay with existing values that would normally be in the DB
3744        let node1_old = BranchNodeCompact::new(
3745            0b1010_1010_1010_1010, // Different mask to show it's the overlay "existing" value
3746            0b0000_0000_0000_0000,
3747            0b0000_0000_0000_0000,
3748            vec![],
3749            None,
3750        );
3751
3752        // Create overlay account nodes
3753        let overlay_account_nodes = vec![
3754            (account_nibbles1, Some(node1_old.clone())), // This simulates existing node in overlay
3755        ];
3756
3757        // Create account trie updates: one Some (update) and one None (removal)
3758        let account_nodes = vec![
3759            (account_nibbles1, Some(node1)), // This will update overlay node
3760            (account_nibbles2, None),        // This will be a removal (no existing node)
3761        ];
3762
3763        // Create storage trie updates
3764        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3765        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3766
3767        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3768        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3769        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3770
3771        let storage_node1 = BranchNodeCompact::new(
3772            0b1111_0000_0000_0000,
3773            0b0000_0000_0000_0000,
3774            0b0000_0000_0000_0000,
3775            vec![],
3776            None,
3777        );
3778
3779        let storage_node2 = BranchNodeCompact::new(
3780            0b0000_1111_0000_0000,
3781            0b0000_0000_0000_0000,
3782            0b0000_0000_0000_0000,
3783            vec![],
3784            None,
3785        );
3786
3787        // Create old versions for overlay
3788        let storage_node1_old = BranchNodeCompact::new(
3789            0b1010_0000_0000_0000, // Different mask to show it's an old value
3790            0b0000_0000_0000_0000,
3791            0b0000_0000_0000_0000,
3792            vec![],
3793            None,
3794        );
3795
3796        // Create overlay storage nodes
3797        let mut overlay_storage_tries = B256Map::default();
3798
3799        // Overlay for normal storage (storage_address1)
3800        let overlay_storage_trie1 = StorageTrieUpdatesSorted {
3801            is_deleted: false,
3802            storage_nodes: vec![
3803                (storage_nibbles1, Some(storage_node1_old.clone())), /* Simulates existing in
3804                                                                      * overlay */
3805            ],
3806        };
3807
3808        // Overlay for wiped storage (storage_address2)
3809        let overlay_storage_trie2 = StorageTrieUpdatesSorted {
3810            is_deleted: false,
3811            storage_nodes: vec![
3812                (storage_nibbles1, Some(storage_node1.clone())), // Existing in overlay
3813                (storage_nibbles3, Some(storage_node2.clone())), // Also existing in overlay
3814            ],
3815        };
3816
3817        overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
3818        overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
3819
3820        let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
3821
3822        // Normal storage trie: one Some (update) and one None (new)
3823        let storage_trie1 = StorageTrieUpdatesSorted {
3824            is_deleted: false,
3825            storage_nodes: vec![
3826                (storage_nibbles1, Some(storage_node1.clone())), // This will update overlay node
3827                (storage_nibbles2, None),                        // This is a new node
3828            ],
3829        };
3830
3831        // Wiped storage trie
3832        let storage_trie2 = StorageTrieUpdatesSorted {
3833            is_deleted: true,
3834            storage_nodes: vec![
3835                (storage_nibbles1, Some(storage_node1.clone())), // Updated node from overlay
3836                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in overlay
3837                                                                  * storage_nibbles3 is in
3838                                                                  * overlay
3839                                                                  * but not updated */
3840            ],
3841        };
3842
3843        let mut storage_tries = B256Map::default();
3844        storage_tries.insert(storage_address1, storage_trie1);
3845        storage_tries.insert(storage_address2, storage_trie2);
3846
3847        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3848
3849        // Write the changesets WITH OVERLAY
3850        let num_written =
3851            provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
3852
3853        // Verify number of entries written
3854        // Account changesets: 2 (one update from overlay, one removal)
3855        // Storage changesets:
3856        //   - Normal storage: 2 (one update from overlay, one new)
3857        //   - Wiped storage: 3 (two updated, one existing from overlay not updated)
3858        // Total: 2 + 2 + 3 = 7
3859        assert_eq!(num_written, 7);
3860
3861        // Verify account changesets were written correctly
3862        {
3863            let mut cursor =
3864                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3865
3866            // Get all entries for this block to see what was written
3867            let all_entries = cursor
3868                .walk_dup(Some(block_number), None)
3869                .unwrap()
3870                .collect::<Result<Vec<_>, _>>()
3871                .unwrap();
3872
3873            // Assert the full value of all_entries in a single assert_eq
3874            assert_eq!(
3875                all_entries,
3876                vec![
3877                    (
3878                        block_number,
3879                        TrieChangeSetsEntry {
3880                            nibbles: StoredNibblesSubKey(account_nibbles1),
3881                            node: Some(node1_old), // Value from overlay, not DB
3882                        }
3883                    ),
3884                    (
3885                        block_number,
3886                        TrieChangeSetsEntry {
3887                            nibbles: StoredNibblesSubKey(account_nibbles2),
3888                            node: None,
3889                        }
3890                    ),
3891                ]
3892            );
3893        }
3894
3895        // Verify storage changesets were written correctly
3896        {
3897            let mut cursor =
3898                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3899
3900            // Check normal storage trie changesets
3901            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3902            let entries1 =
3903                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3904
3905            assert_eq!(
3906                entries1,
3907                vec![
3908                    (
3909                        key1,
3910                        TrieChangeSetsEntry {
3911                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3912                            node: Some(storage_node1_old), // Old value from overlay
3913                        }
3914                    ),
3915                    (
3916                        key1,
3917                        TrieChangeSetsEntry {
3918                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3919                            node: None, // New node, no previous value
3920                        }
3921                    ),
3922                ]
3923            );
3924
3925            // Check wiped storage trie changesets
3926            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3927            let entries2 =
3928                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3929
3930            assert_eq!(
3931                entries2,
3932                vec![
3933                    (
3934                        key2,
3935                        TrieChangeSetsEntry {
3936                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3937                            node: Some(storage_node1), // Value from overlay
3938                        }
3939                    ),
3940                    (
3941                        key2,
3942                        TrieChangeSetsEntry {
3943                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3944                            node: None, // Was not in overlay
3945                        }
3946                    ),
3947                    (
3948                        key2,
3949                        TrieChangeSetsEntry {
3950                            nibbles: StoredNibblesSubKey(storage_nibbles3),
3951                            node: Some(storage_node2), /* Existing node from overlay in wiped
3952                                                        * storage */
3953                        }
3954                    ),
3955                ]
3956            );
3957        }
3958
3959        provider_rw.commit().unwrap();
3960    }
3961
3962    #[test]
3963    fn test_clear_trie_changesets_from() {
3964        use alloy_primitives::hex_literal::hex;
3965        use reth_db_api::models::BlockNumberHashedAddress;
3966        use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
3967
3968        let factory = create_test_provider_factory();
3969
3970        // Create some test data for different block numbers
3971        let block1 = 100u64;
3972        let block2 = 101u64;
3973        let block3 = 102u64;
3974        let block4 = 103u64;
3975        let block5 = 104u64;
3976
3977        // Create test addresses for storage changesets
3978        let storage_address1 =
3979            B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
3980        let storage_address2 =
3981            B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
3982
3983        // Create test nibbles
3984        let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
3985        let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
3986        let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
3987
3988        // Create test nodes
3989        let node1 = BranchNodeCompact::new(
3990            0b1111_1111_1111_1111,
3991            0b1111_1111_1111_1111,
3992            0b0000_0000_0000_0001,
3993            vec![B256::from(hex!(
3994                "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
3995            ))],
3996            None,
3997        );
3998        let node2 = BranchNodeCompact::new(
3999            0b1111_1111_1111_1110,
4000            0b1111_1111_1111_1110,
4001            0b0000_0000_0000_0010,
4002            vec![B256::from(hex!(
4003                "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
4004            ))],
4005            Some(B256::from(hex!(
4006                "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
4007            ))),
4008        );
4009
4010        // Populate AccountsTrieChangeSets with data across multiple blocks
4011        {
4012            let provider_rw = factory.provider_rw().unwrap();
4013            let mut cursor =
4014                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4015
4016            // Block 100: 2 entries (will be kept - before start block)
4017            cursor
4018                .upsert(
4019                    block1,
4020                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4021                )
4022                .unwrap();
4023            cursor
4024                .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4025                .unwrap();
4026
4027            // Block 101: 3 entries with duplicates (will be deleted - from this block onwards)
4028            cursor
4029                .upsert(
4030                    block2,
4031                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4032                )
4033                .unwrap();
4034            cursor
4035                .upsert(
4036                    block2,
4037                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4038                )
4039                .unwrap(); // duplicate key
4040            cursor
4041                .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4042                .unwrap();
4043
4044            // Block 102: 2 entries (will be deleted - after start block)
4045            cursor
4046                .upsert(
4047                    block3,
4048                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4049                )
4050                .unwrap();
4051            cursor
4052                .upsert(
4053                    block3,
4054                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
4055                )
4056                .unwrap();
4057
4058            // Block 103: 1 entry (will be deleted - after start block)
4059            cursor
4060                .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4061                .unwrap();
4062
4063            // Block 104: 2 entries (will be deleted - after start block)
4064            cursor
4065                .upsert(
4066                    block5,
4067                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4068                )
4069                .unwrap();
4070            cursor
4071                .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4072                .unwrap();
4073
4074            provider_rw.commit().unwrap();
4075        }
4076
4077        // Populate StoragesTrieChangeSets with data across multiple blocks
4078        {
4079            let provider_rw = factory.provider_rw().unwrap();
4080            let mut cursor =
4081                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4082
4083            // Block 100, address1: 2 entries (will be kept - before start block)
4084            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4085            cursor
4086                .upsert(
4087                    key1_block1,
4088                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4089                )
4090                .unwrap();
4091            cursor
4092                .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4093                .unwrap();
4094
4095            // Block 101, address1: 3 entries with duplicates (will be deleted - from this block
4096            // onwards)
4097            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4098            cursor
4099                .upsert(
4100                    key1_block2,
4101                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4102                )
4103                .unwrap();
4104            cursor
4105                .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4106                .unwrap(); // duplicate key
4107            cursor
4108                .upsert(
4109                    key1_block2,
4110                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4111                )
4112                .unwrap();
4113
4114            // Block 102, address2: 2 entries (will be deleted - after start block)
4115            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4116            cursor
4117                .upsert(
4118                    key2_block3,
4119                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4120                )
4121                .unwrap();
4122            cursor
4123                .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4124                .unwrap();
4125
4126            // Block 103, address1: 2 entries with duplicate (will be deleted - after start block)
4127            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4128            cursor
4129                .upsert(
4130                    key1_block4,
4131                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4132                )
4133                .unwrap();
4134            cursor
4135                .upsert(
4136                    key1_block4,
4137                    &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4138                )
4139                .unwrap(); // duplicate key
4140
4141            // Block 104, address2: 2 entries (will be deleted - after start block)
4142            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4143            cursor
4144                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4145                .unwrap();
4146            cursor
4147                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4148                .unwrap();
4149
4150            provider_rw.commit().unwrap();
4151        }
4152
4153        // Clear all changesets from block 101 onwards
4154        {
4155            let provider_rw = factory.provider_rw().unwrap();
4156            provider_rw.clear_trie_changesets_from(block2).unwrap();
4157            provider_rw.commit().unwrap();
4158        }
4159
4160        // Verify AccountsTrieChangeSets after clearing
4161        {
4162            let provider = factory.provider().unwrap();
4163            let mut cursor =
4164                provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4165
4166            // Block 100 should still exist (before range)
4167            let block1_entries = cursor
4168                .walk_dup(Some(block1), None)
4169                .unwrap()
4170                .collect::<Result<Vec<_>, _>>()
4171                .unwrap();
4172            assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4173            assert_eq!(block1_entries[0].0, block1);
4174            assert_eq!(block1_entries[1].0, block1);
4175
4176            // Blocks 101-104 should be deleted
4177            let block2_entries = cursor
4178                .walk_dup(Some(block2), None)
4179                .unwrap()
4180                .collect::<Result<Vec<_>, _>>()
4181                .unwrap();
4182            assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4183
4184            let block3_entries = cursor
4185                .walk_dup(Some(block3), None)
4186                .unwrap()
4187                .collect::<Result<Vec<_>, _>>()
4188                .unwrap();
4189            assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4190
4191            let block4_entries = cursor
4192                .walk_dup(Some(block4), None)
4193                .unwrap()
4194                .collect::<Result<Vec<_>, _>>()
4195                .unwrap();
4196            assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4197
4198            // Block 104 should also be deleted
4199            let block5_entries = cursor
4200                .walk_dup(Some(block5), None)
4201                .unwrap()
4202                .collect::<Result<Vec<_>, _>>()
4203                .unwrap();
4204            assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4205        }
4206
4207        // Verify StoragesTrieChangeSets after clearing
4208        {
4209            let provider = factory.provider().unwrap();
4210            let mut cursor =
4211                provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4212
4213            // Block 100 entries should still exist (before range)
4214            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4215            let block1_entries = cursor
4216                .walk_dup(Some(key1_block1), None)
4217                .unwrap()
4218                .collect::<Result<Vec<_>, _>>()
4219                .unwrap();
4220            assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4221
4222            // Blocks 101-104 entries should be deleted
4223            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4224            let block2_entries = cursor
4225                .walk_dup(Some(key1_block2), None)
4226                .unwrap()
4227                .collect::<Result<Vec<_>, _>>()
4228                .unwrap();
4229            assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4230
4231            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4232            let block3_entries = cursor
4233                .walk_dup(Some(key2_block3), None)
4234                .unwrap()
4235                .collect::<Result<Vec<_>, _>>()
4236                .unwrap();
4237            assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4238
4239            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4240            let block4_entries = cursor
4241                .walk_dup(Some(key1_block4), None)
4242                .unwrap()
4243                .collect::<Result<Vec<_>, _>>()
4244                .unwrap();
4245            assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4246
4247            // Block 104 entries should also be deleted
4248            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4249            let block5_entries = cursor
4250                .walk_dup(Some(key2_block5), None)
4251                .unwrap()
4252                .collect::<Result<Vec<_>, _>>()
4253                .unwrap();
4254            assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4255        }
4256    }
4257
4258    #[test]
4259    fn test_write_trie_updates_sorted() {
4260        use reth_trie::{
4261            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4262            BranchNodeCompact, StorageTrieEntry,
4263        };
4264
4265        let factory = create_test_provider_factory();
4266        let provider_rw = factory.provider_rw().unwrap();
4267
4268        // Pre-populate account trie with data that will be deleted
4269        {
4270            let tx = provider_rw.tx_ref();
4271            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4272
4273            // Add account node that will be deleted
4274            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4275            cursor
4276                .upsert(
4277                    to_delete,
4278                    &BranchNodeCompact::new(
4279                        0b1010_1010_1010_1010, // state_mask
4280                        0b0000_0000_0000_0000, // tree_mask
4281                        0b0000_0000_0000_0000, // hash_mask
4282                        vec![],
4283                        None,
4284                    ),
4285                )
4286                .unwrap();
4287
4288            // Add account node that will be updated
4289            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4290            cursor
4291                .upsert(
4292                    to_update,
4293                    &BranchNodeCompact::new(
4294                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4295                        0b0000_0000_0000_0000, // tree_mask
4296                        0b0000_0000_0000_0000, // hash_mask
4297                        vec![],
4298                        None,
4299                    ),
4300                )
4301                .unwrap();
4302        }
4303
4304        // Pre-populate storage tries with data
4305        let storage_address1 = B256::from([1u8; 32]);
4306        let storage_address2 = B256::from([2u8; 32]);
4307        {
4308            let tx = provider_rw.tx_ref();
4309            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4310
4311            // Add storage nodes for address1 (one will be deleted)
4312            storage_cursor
4313                .upsert(
4314                    storage_address1,
4315                    &StorageTrieEntry {
4316                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4317                        node: BranchNodeCompact::new(
4318                            0b0011_0011_0011_0011, // will be deleted
4319                            0b0000_0000_0000_0000,
4320                            0b0000_0000_0000_0000,
4321                            vec![],
4322                            None,
4323                        ),
4324                    },
4325                )
4326                .unwrap();
4327
4328            // Add storage nodes for address2 (will be wiped)
4329            storage_cursor
4330                .upsert(
4331                    storage_address2,
4332                    &StorageTrieEntry {
4333                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4334                        node: BranchNodeCompact::new(
4335                            0b1100_1100_1100_1100, // will be wiped
4336                            0b0000_0000_0000_0000,
4337                            0b0000_0000_0000_0000,
4338                            vec![],
4339                            None,
4340                        ),
4341                    },
4342                )
4343                .unwrap();
4344            storage_cursor
4345                .upsert(
4346                    storage_address2,
4347                    &StorageTrieEntry {
4348                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4349                        node: BranchNodeCompact::new(
4350                            0b0011_1100_0011_1100, // will be wiped
4351                            0b0000_0000_0000_0000,
4352                            0b0000_0000_0000_0000,
4353                            vec![],
4354                            None,
4355                        ),
4356                    },
4357                )
4358                .unwrap();
4359        }
4360
4361        // Create sorted account trie updates
4362        let account_nodes = vec![
4363            (
4364                Nibbles::from_nibbles([0x1, 0x2]),
4365                Some(BranchNodeCompact::new(
4366                    0b1111_1111_1111_1111, // state_mask (updated)
4367                    0b0000_0000_0000_0000, // tree_mask
4368                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4369                    vec![],
4370                    None,
4371                )),
4372            ),
4373            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4374            (
4375                Nibbles::from_nibbles([0x5, 0x6]),
4376                Some(BranchNodeCompact::new(
4377                    0b1111_1111_1111_1111, // state_mask
4378                    0b0000_0000_0000_0000, // tree_mask
4379                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4380                    vec![],
4381                    None,
4382                )),
4383            ),
4384        ];
4385
4386        // Create sorted storage trie updates
4387        let storage_trie1 = StorageTrieUpdatesSorted {
4388            is_deleted: false,
4389            storage_nodes: vec![
4390                (
4391                    Nibbles::from_nibbles([0x1, 0x0]),
4392                    Some(BranchNodeCompact::new(
4393                        0b1111_0000_0000_0000, // state_mask
4394                        0b0000_0000_0000_0000, // tree_mask
4395                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4396                        vec![],
4397                        None,
4398                    )),
4399                ),
4400                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4401            ],
4402        };
4403
4404        let storage_trie2 = StorageTrieUpdatesSorted {
4405            is_deleted: true, // Wipe all storage for this address
4406            storage_nodes: vec![],
4407        };
4408
4409        let mut storage_tries = B256Map::default();
4410        storage_tries.insert(storage_address1, storage_trie1);
4411        storage_tries.insert(storage_address2, storage_trie2);
4412
4413        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4414
4415        // Write the sorted trie updates
4416        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4417
4418        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4419        // storage deletion = 5
4420        assert_eq!(num_entries, 5);
4421
4422        // Verify account trie updates were written correctly
4423        let tx = provider_rw.tx_ref();
4424        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4425
4426        // Check first account node was updated
4427        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4428        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4429        assert!(entry1.is_some(), "Updated account node should exist");
4430        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4431        assert_eq!(
4432            entry1.unwrap().1.state_mask,
4433            expected_mask,
4434            "Account node should have updated state_mask"
4435        );
4436
4437        // Check deleted account node no longer exists
4438        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4439        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4440        assert!(entry2.is_none(), "Deleted account node should not exist");
4441
4442        // Check new account node exists
4443        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4444        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4445        assert!(entry3.is_some(), "New account node should exist");
4446
4447        // Verify storage trie updates were written correctly
4448        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4449
4450        // Check storage for address1
4451        let storage_entries1: Vec<_> = storage_cursor
4452            .walk_dup(Some(storage_address1), None)
4453            .unwrap()
4454            .collect::<Result<Vec<_>, _>>()
4455            .unwrap();
4456        assert_eq!(
4457            storage_entries1.len(),
4458            1,
4459            "Storage address1 should have 1 entry after deletion"
4460        );
4461        assert_eq!(
4462            storage_entries1[0].1.nibbles.0,
4463            Nibbles::from_nibbles([0x1, 0x0]),
4464            "Remaining entry should be [0x1, 0x0]"
4465        );
4466
4467        // Check storage for address2 was wiped
4468        let storage_entries2: Vec<_> = storage_cursor
4469            .walk_dup(Some(storage_address2), None)
4470            .unwrap()
4471            .collect::<Result<Vec<_>, _>>()
4472            .unwrap();
4473        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4474
4475        provider_rw.commit().unwrap();
4476    }
4477
4478    #[test]
4479    fn test_get_block_trie_updates() {
4480        use reth_db_api::models::BlockNumberHashedAddress;
4481        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4482
4483        let factory = create_test_provider_factory();
4484        let provider_rw = factory.provider_rw().unwrap();
4485
4486        let target_block = 2u64;
4487        let next_block = 3u64;
4488
4489        // Create test nibbles and nodes for accounts
4490        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4491        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4492        let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4493
4494        let node1 = BranchNodeCompact::new(
4495            0b1111_1111_0000_0000,
4496            0b0000_0000_0000_0000,
4497            0b0000_0000_0000_0000,
4498            vec![],
4499            None,
4500        );
4501
4502        let node2 = BranchNodeCompact::new(
4503            0b0000_0000_1111_1111,
4504            0b0000_0000_0000_0000,
4505            0b0000_0000_0000_0000,
4506            vec![],
4507            None,
4508        );
4509
4510        let node3 = BranchNodeCompact::new(
4511            0b1010_1010_1010_1010,
4512            0b0000_0000_0000_0000,
4513            0b0000_0000_0000_0000,
4514            vec![],
4515            None,
4516        );
4517
4518        // Pre-populate AccountsTrie with nodes that will be the final state
4519        {
4520            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4521            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4522            cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4523            // account_nibbles3 will be deleted (not in final state)
4524        }
4525
4526        // Insert trie changesets for target_block
4527        {
4528            let mut cursor =
4529                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4530            // nibbles1 was updated in target_block (old value stored)
4531            cursor
4532                .append_dup(
4533                    target_block,
4534                    TrieChangeSetsEntry {
4535                        nibbles: StoredNibblesSubKey(account_nibbles1),
4536                        node: Some(BranchNodeCompact::new(
4537                            0b1111_0000_0000_0000, // old value
4538                            0b0000_0000_0000_0000,
4539                            0b0000_0000_0000_0000,
4540                            vec![],
4541                            None,
4542                        )),
4543                    },
4544                )
4545                .unwrap();
4546            // nibbles2 was created in target_block (no old value)
4547            cursor
4548                .append_dup(
4549                    target_block,
4550                    TrieChangeSetsEntry {
4551                        nibbles: StoredNibblesSubKey(account_nibbles2),
4552                        node: None,
4553                    },
4554                )
4555                .unwrap();
4556        }
4557
4558        // Insert trie changesets for next_block (to test overlay)
4559        {
4560            let mut cursor =
4561                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4562            // nibbles3 was deleted in next_block (old value stored)
4563            cursor
4564                .append_dup(
4565                    next_block,
4566                    TrieChangeSetsEntry {
4567                        nibbles: StoredNibblesSubKey(account_nibbles3),
4568                        node: Some(node3),
4569                    },
4570                )
4571                .unwrap();
4572        }
4573
4574        // Storage trie updates
4575        let storage_address1 = B256::from([1u8; 32]);
4576        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4577        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4578
4579        let storage_node1 = BranchNodeCompact::new(
4580            0b1111_1111_1111_0000,
4581            0b0000_0000_0000_0000,
4582            0b0000_0000_0000_0000,
4583            vec![],
4584            None,
4585        );
4586
4587        let storage_node2 = BranchNodeCompact::new(
4588            0b0101_0101_0101_0101,
4589            0b0000_0000_0000_0000,
4590            0b0000_0000_0000_0000,
4591            vec![],
4592            None,
4593        );
4594
4595        // Pre-populate StoragesTrie with final state
4596        {
4597            let mut cursor =
4598                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4599            cursor
4600                .upsert(
4601                    storage_address1,
4602                    &StorageTrieEntry {
4603                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4604                        node: storage_node1.clone(),
4605                    },
4606                )
4607                .unwrap();
4608            // storage_nibbles2 was deleted in next_block, so it's not in final state
4609        }
4610
4611        // Insert storage trie changesets for target_block
4612        {
4613            let mut cursor =
4614                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4615            let key = BlockNumberHashedAddress((target_block, storage_address1));
4616
4617            // storage_nibbles1 was updated
4618            cursor
4619                .append_dup(
4620                    key,
4621                    TrieChangeSetsEntry {
4622                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4623                        node: Some(BranchNodeCompact::new(
4624                            0b0000_0000_1111_1111, // old value
4625                            0b0000_0000_0000_0000,
4626                            0b0000_0000_0000_0000,
4627                            vec![],
4628                            None,
4629                        )),
4630                    },
4631                )
4632                .unwrap();
4633
4634            // storage_nibbles2 was created
4635            cursor
4636                .append_dup(
4637                    key,
4638                    TrieChangeSetsEntry {
4639                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4640                        node: None,
4641                    },
4642                )
4643                .unwrap();
4644        }
4645
4646        // Insert storage trie changesets for next_block (to test overlay)
4647        {
4648            let mut cursor =
4649                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4650            let key = BlockNumberHashedAddress((next_block, storage_address1));
4651
4652            // storage_nibbles2 was deleted in next_block
4653            cursor
4654                .append_dup(
4655                    key,
4656                    TrieChangeSetsEntry {
4657                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4658                        node: Some(BranchNodeCompact::new(
4659                            0b0101_0101_0101_0101, // value that was deleted
4660                            0b0000_0000_0000_0000,
4661                            0b0000_0000_0000_0000,
4662                            vec![],
4663                            None,
4664                        )),
4665                    },
4666                )
4667                .unwrap();
4668        }
4669
4670        provider_rw.commit().unwrap();
4671
4672        // Now test get_block_trie_updates
4673        let provider = factory.provider().unwrap();
4674        let result = provider.get_block_trie_updates(target_block).unwrap();
4675
4676        // Verify account trie updates
4677        assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4678
4679        // Check nibbles1 - should have the current value (node1)
4680        let nibbles1_update = result
4681            .account_nodes_ref()
4682            .iter()
4683            .find(|(n, _)| n == &account_nibbles1)
4684            .expect("Should find nibbles1");
4685        assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4686        assert_eq!(
4687            nibbles1_update.1.as_ref().unwrap().state_mask,
4688            node1.state_mask,
4689            "nibbles1 should have current value"
4690        );
4691
4692        // Check nibbles2 - should have the current value (node2)
4693        let nibbles2_update = result
4694            .account_nodes_ref()
4695            .iter()
4696            .find(|(n, _)| n == &account_nibbles2)
4697            .expect("Should find nibbles2");
4698        assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4699        assert_eq!(
4700            nibbles2_update.1.as_ref().unwrap().state_mask,
4701            node2.state_mask,
4702            "nibbles2 should have current value"
4703        );
4704
4705        // nibbles3 should NOT be in the result (it was changed in next_block, not target_block)
4706        assert!(
4707            !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4708            "nibbles3 should not be in target_block updates"
4709        );
4710
4711        // Verify storage trie updates
4712        assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4713        let storage_updates = result
4714            .storage_tries_ref()
4715            .get(&storage_address1)
4716            .expect("Should have storage updates for address1");
4717
4718        assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4719
4720        // Check storage_nibbles1 - should have current value
4721        let storage1_update = storage_updates
4722            .storage_nodes
4723            .iter()
4724            .find(|(n, _)| n == &storage_nibbles1)
4725            .expect("Should find storage_nibbles1");
4726        assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4727        assert_eq!(
4728            storage1_update.1.as_ref().unwrap().state_mask,
4729            storage_node1.state_mask,
4730            "storage_nibbles1 should have current value"
4731        );
4732
4733        // Check storage_nibbles2 - was created in target_block, will be deleted in next_block
4734        // So it should have a value (the value that will be deleted)
4735        let storage2_update = storage_updates
4736            .storage_nodes
4737            .iter()
4738            .find(|(n, _)| n == &storage_nibbles2)
4739            .expect("Should find storage_nibbles2");
4740        assert!(
4741            storage2_update.1.is_some(),
4742            "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4743        );
4744        assert_eq!(
4745            storage2_update.1.as_ref().unwrap().state_mask,
4746            storage_node2.state_mask,
4747            "storage_nibbles2 should have the value that was created and will be deleted"
4748        );
4749    }
4750
4751    #[test]
4752    fn test_prunable_receipts_logic() {
4753        let insert_blocks =
4754            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4755                let mut rng = generators::rng();
4756                for block_num in 0..=tip_block {
4757                    let block = random_block(
4758                        &mut rng,
4759                        block_num,
4760                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4761                    );
4762                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4763                }
4764            };
4765
4766        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4767            let outcome = ExecutionOutcome {
4768                first_block: block,
4769                receipts: vec![vec![Receipt {
4770                    tx_type: Default::default(),
4771                    success: true,
4772                    cumulative_gas_used: block, // identifier to assert against
4773                    logs: vec![],
4774                }]],
4775                ..Default::default()
4776            };
4777            provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
4778            provider_rw.commit().unwrap();
4779        };
4780
4781        // Legacy mode (receipts in DB) - should be prunable
4782        {
4783            let factory = create_test_provider_factory();
4784            let storage_settings = StorageSettings::legacy();
4785            factory.set_storage_settings_cache(storage_settings);
4786            let factory = factory.with_prune_modes(PruneModes {
4787                receipts: Some(PruneMode::Before(100)),
4788                ..Default::default()
4789            });
4790
4791            let tip_block = 200u64;
4792            let first_block = 1u64;
4793
4794            // create chain
4795            let provider_rw = factory.provider_rw().unwrap();
4796            insert_blocks(&provider_rw, tip_block, 1);
4797            provider_rw.commit().unwrap();
4798
4799            write_receipts(
4800                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4801                first_block,
4802            );
4803            write_receipts(
4804                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4805                tip_block - 1,
4806            );
4807
4808            let provider = factory.provider().unwrap();
4809
4810            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4811                assert!(provider
4812                    .receipts_by_block(block.into())
4813                    .unwrap()
4814                    .is_some_and(|r| r.len() == num_receipts));
4815            }
4816        }
4817
4818        // Static files mode
4819        {
4820            let factory = create_test_provider_factory();
4821            let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
4822            factory.set_storage_settings_cache(storage_settings);
4823            let factory = factory.with_prune_modes(PruneModes {
4824                receipts: Some(PruneMode::Before(2)),
4825                ..Default::default()
4826            });
4827
4828            let tip_block = 200u64;
4829
4830            // create chain
4831            let provider_rw = factory.provider_rw().unwrap();
4832            insert_blocks(&provider_rw, tip_block, 1);
4833            provider_rw.commit().unwrap();
4834
4835            // Attempt to write receipts for block 0 and 1 (should be skipped)
4836            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4837            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4838
4839            assert!(factory
4840                .static_file_provider()
4841                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4842                .is_none(),);
4843            assert!(factory
4844                .static_file_provider()
4845                .get_highest_static_file_block(StaticFileSegment::Receipts)
4846                .is_some_and(|b| b == 1),);
4847
4848            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4849            // static files.
4850            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4851            assert!(factory
4852                .static_file_provider()
4853                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4854                .is_some_and(|num| num == 2),);
4855
4856            // After having a receipt already in static files, attempt to skip the next receipt by
4857            // changing the prune mode. It should NOT skip it and should still write the receipt,
4858            // since static files do not support gaps.
4859            let factory = factory.with_prune_modes(PruneModes {
4860                receipts: Some(PruneMode::Before(100)),
4861                ..Default::default()
4862            });
4863            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4864            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4865            write_receipts(provider_rw, 3);
4866
4867            // Ensure we can only fetch the 2 last receipts.
4868            //
4869            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4870            // to the block number it belongs to easily identify and assert.
4871            let provider = factory.provider().unwrap();
4872            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4873            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4874                assert!(provider
4875                    .receipts_by_block(num.into())
4876                    .unwrap()
4877                    .is_some_and(|r| r.len() == num_receipts));
4878
4879                let receipt = provider.receipt(num).unwrap();
4880                if num_receipts > 0 {
4881                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4882                } else {
4883                    assert!(receipt.is_none());
4884                }
4885            }
4886        }
4887    }
4888}