reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::{
3        storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4    },
5    providers::{
6        database::{chain::ChainStorage, metrics},
7        rocksdb::RocksDBProvider,
8        static_file::StaticFileWriter,
9        NodeTypesForProvider, StaticFileProvider,
10    },
11    to_range,
12    traits::{
13        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
14    },
15    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
16    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
17    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
18    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
19    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
20    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
21    RocksDBProviderFactory, RocksTxRefArg, StageCheckpointReader, StateProviderBox, StateWriter,
22    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
23    TransactionsProvider, TransactionsProviderExt, TrieReader, TrieWriter,
24};
25use alloy_consensus::{
26    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
27    BlockHeader, TxReceipt,
28};
29use alloy_eips::BlockHashOrNumber;
30use alloy_primitives::{
31    keccak256,
32    map::{hash_map, B256Map, HashMap, HashSet},
33    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
34};
35use itertools::Itertools;
36use parking_lot::RwLock;
37use rayon::slice::ParallelSliceMut;
38use reth_chain_state::ExecutedBlock;
39use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
40use reth_db_api::{
41    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
42    database::Database,
43    models::{
44        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
45        BlockNumberHashedAddress, ShardedKey, StorageSettings, StoredBlockBodyIndices,
46    },
47    table::Table,
48    tables,
49    transaction::{DbTx, DbTxMut},
50    BlockNumberList, PlainAccountState, PlainStorageState,
51};
52use reth_execution_types::{Chain, ExecutionOutcome};
53use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
54use reth_primitives_traits::{
55    Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
65    TryIntoHistoricalStateProvider,
66};
67use reth_storage_errors::provider::ProviderResult;
68use reth_trie::{
69    trie_cursor::{
70        InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
71        TrieCursorIter,
72    },
73    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
74    HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
75};
76use reth_trie_db::{
77    DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
78};
79use revm_database::states::{
80    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
81};
82use std::{
83    cmp::Ordering,
84    collections::{BTreeMap, BTreeSet},
85    fmt::Debug,
86    ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
87    sync::Arc,
88    time::{Duration, Instant},
89};
90use tracing::{debug, trace};
91
92/// A [`DatabaseProvider`] that holds a read-only database transaction.
93pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
94
95/// A [`DatabaseProvider`] that holds a read-write database transaction.
96///
97/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
98/// Once that issue is solved, we can probably revert back to being an alias type.
99#[derive(Debug)]
100pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
101    pub DatabaseProvider<<DB as Database>::TXMut, N>,
102);
103
104impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
105    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
106
107    fn deref(&self) -> &Self::Target {
108        &self.0
109    }
110}
111
112impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
113    fn deref_mut(&mut self) -> &mut Self::Target {
114        &mut self.0
115    }
116}
117
118impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
119    for DatabaseProviderRW<DB, N>
120{
121    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
122        &self.0
123    }
124}
125
126impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
127    /// Commit database transaction and static file if it exists.
128    pub fn commit(self) -> ProviderResult<bool> {
129        self.0.commit()
130    }
131
132    /// Consume `DbTx` or `DbTxMut`.
133    pub fn into_tx(self) -> <DB as Database>::TXMut {
134        self.0.into_tx()
135    }
136
137    /// Override the minimum pruning distance for testing purposes.
138    #[cfg(any(test, feature = "test-utils"))]
139    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
140        self.0.minimum_pruning_distance = distance;
141        self
142    }
143}
144
145impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
146    for DatabaseProvider<<DB as Database>::TXMut, N>
147{
148    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
149        provider.0
150    }
151}
152
153/// A provider struct that fetches data from the database.
154/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
155pub struct DatabaseProvider<TX, N: NodeTypes> {
156    /// Database transaction.
157    tx: TX,
158    /// Chain spec
159    chain_spec: Arc<N::ChainSpec>,
160    /// Static File provider
161    static_file_provider: StaticFileProvider<N::Primitives>,
162    /// Pruning configuration
163    prune_modes: PruneModes,
164    /// Node storage handler.
165    storage: Arc<N::Storage>,
166    /// Storage configuration settings for this node
167    storage_settings: Arc<RwLock<StorageSettings>>,
168    /// `RocksDB` provider
169    rocksdb_provider: RocksDBProvider,
170    /// Pending `RocksDB` batches to be committed at provider commit time.
171    #[cfg(all(unix, feature = "rocksdb"))]
172    pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
173    /// Minimum distance from tip required for pruning
174    minimum_pruning_distance: u64,
175    /// Database provider metrics
176    metrics: metrics::DatabaseProviderMetrics,
177}
178
179impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        let mut s = f.debug_struct("DatabaseProvider");
182        s.field("tx", &self.tx)
183            .field("chain_spec", &self.chain_spec)
184            .field("static_file_provider", &self.static_file_provider)
185            .field("prune_modes", &self.prune_modes)
186            .field("storage", &self.storage)
187            .field("storage_settings", &self.storage_settings)
188            .field("rocksdb_provider", &self.rocksdb_provider);
189        #[cfg(all(unix, feature = "rocksdb"))]
190        s.field("pending_rocksdb_batches", &"<pending batches>");
191        s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
192    }
193}
194
195impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
196    /// Returns reference to prune modes.
197    pub const fn prune_modes_ref(&self) -> &PruneModes {
198        &self.prune_modes
199    }
200}
201
202impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
203    /// State provider for latest state
204    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
205        trace!(target: "providers::db", "Returning latest state provider");
206        Box::new(LatestStateProviderRef::new(self))
207    }
208
209    /// Storage provider for state at that given block hash
210    pub fn history_by_block_hash<'a>(
211        &'a self,
212        block_hash: BlockHash,
213    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
214        let mut block_number =
215            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
216        if block_number == self.best_block_number().unwrap_or_default() &&
217            block_number == self.last_block_number().unwrap_or_default()
218        {
219            return Ok(Box::new(LatestStateProviderRef::new(self)))
220        }
221
222        // +1 as the changeset that we want is the one that was applied after this block.
223        block_number += 1;
224
225        let account_history_prune_checkpoint =
226            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
227        let storage_history_prune_checkpoint =
228            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
229
230        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
231
232        // If we pruned account or storage history, we can't return state on every historical block.
233        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
234        if let Some(prune_checkpoint_block_number) =
235            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
236        {
237            state_provider = state_provider.with_lowest_available_account_history_block_number(
238                prune_checkpoint_block_number + 1,
239            );
240        }
241        if let Some(prune_checkpoint_block_number) =
242            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
243        {
244            state_provider = state_provider.with_lowest_available_storage_history_block_number(
245                prune_checkpoint_block_number + 1,
246            );
247        }
248
249        Ok(Box::new(state_provider))
250    }
251
252    #[cfg(feature = "test-utils")]
253    /// Sets the prune modes for provider.
254    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
255        self.prune_modes = prune_modes;
256    }
257}
258
259impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
260    type Primitives = N::Primitives;
261}
262
263impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
264    /// Returns a static file provider
265    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
266        self.static_file_provider.clone()
267    }
268
269    fn get_static_file_writer(
270        &self,
271        block: BlockNumber,
272        segment: StaticFileSegment,
273    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
274        self.static_file_provider.get_writer(block, segment)
275    }
276}
277
278impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
279    /// Returns the `RocksDB` provider.
280    fn rocksdb_provider(&self) -> RocksDBProvider {
281        self.rocksdb_provider.clone()
282    }
283
284    #[cfg(all(unix, feature = "rocksdb"))]
285    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
286        self.pending_rocksdb_batches.lock().push(batch);
287    }
288}
289
290impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
291    for DatabaseProvider<TX, N>
292{
293    type ChainSpec = N::ChainSpec;
294
295    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
296        self.chain_spec.clone()
297    }
298}
299
300impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
301    /// Creates a provider with an inner read-write transaction.
302    pub fn new_rw(
303        tx: TX,
304        chain_spec: Arc<N::ChainSpec>,
305        static_file_provider: StaticFileProvider<N::Primitives>,
306        prune_modes: PruneModes,
307        storage: Arc<N::Storage>,
308        storage_settings: Arc<RwLock<StorageSettings>>,
309        rocksdb_provider: RocksDBProvider,
310    ) -> Self {
311        Self {
312            tx,
313            chain_spec,
314            static_file_provider,
315            prune_modes,
316            storage,
317            storage_settings,
318            rocksdb_provider,
319            #[cfg(all(unix, feature = "rocksdb"))]
320            pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
321            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
322            metrics: metrics::DatabaseProviderMetrics::default(),
323        }
324    }
325}
326
327impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
328    fn as_ref(&self) -> &Self {
329        self
330    }
331}
332
333impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
334    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
335    ///
336    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
337    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
338    where
339        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
340    {
341        #[cfg(all(unix, feature = "rocksdb"))]
342        let rocksdb = self.rocksdb_provider();
343        #[cfg(all(unix, feature = "rocksdb"))]
344        let rocksdb_batch = rocksdb.batch();
345        #[cfg(not(all(unix, feature = "rocksdb")))]
346        let rocksdb_batch = ();
347
348        let (result, raw_batch) = f(rocksdb_batch)?;
349
350        #[cfg(all(unix, feature = "rocksdb"))]
351        if let Some(batch) = raw_batch {
352            self.set_pending_rocksdb_batch(batch);
353        }
354        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
355
356        Ok(result)
357    }
358
359    /// Writes executed blocks and state to storage.
360    pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
361        if blocks.is_empty() {
362            debug!(target: "providers::db", "Attempted to write empty block range");
363            return Ok(())
364        }
365
366        // NOTE: checked non-empty above
367        let first_block = blocks.first().unwrap().recovered_block();
368
369        let last_block = blocks.last().unwrap().recovered_block();
370        let first_number = first_block.number();
371        let last_block_number = last_block.number();
372
373        debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
374
375        // Accumulate durations for each step
376        let mut total_insert_block = Duration::ZERO;
377        let mut total_write_state = Duration::ZERO;
378        let mut total_write_hashed_state = Duration::ZERO;
379        let mut total_write_trie_changesets = Duration::ZERO;
380        let mut total_write_trie_updates = Duration::ZERO;
381
382        // TODO: Do performant / batched writes for each type of object
383        // instead of a loop over all blocks,
384        // meaning:
385        //  * blocks
386        //  * state
387        //  * hashed state
388        //  * trie updates (cannot naively extend, need helper)
389        //  * indices (already done basically)
390        // Insert the blocks
391        for block in blocks {
392            let trie_data = block.trie_data();
393            let ExecutedBlock { recovered_block, execution_output, .. } = block;
394            let block_number = recovered_block.number();
395
396            let start = Instant::now();
397            self.insert_block(&recovered_block)?;
398            total_insert_block += start.elapsed();
399
400            // Write state and changesets to the database.
401            // Must be written after blocks because of the receipt lookup.
402            let start = Instant::now();
403            self.write_state(&execution_output, OriginalValuesKnown::No)?;
404            total_write_state += start.elapsed();
405
406            // insert hashes and intermediate merkle nodes
407            let start = Instant::now();
408            self.write_hashed_state(&trie_data.hashed_state)?;
409            total_write_hashed_state += start.elapsed();
410
411            let start = Instant::now();
412            self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
413            total_write_trie_changesets += start.elapsed();
414
415            let start = Instant::now();
416            self.write_trie_updates_sorted(&trie_data.trie_updates)?;
417            total_write_trie_updates += start.elapsed();
418        }
419
420        // update history indices
421        let start = Instant::now();
422        self.update_history_indices(first_number..=last_block_number)?;
423        let duration_update_history_indices = start.elapsed();
424
425        // Update pipeline progress
426        let start = Instant::now();
427        self.update_pipeline_stages(last_block_number, false)?;
428        let duration_update_pipeline_stages = start.elapsed();
429
430        // Record all metrics at the end
431        self.metrics.record_duration(metrics::Action::SaveBlocksInsertBlock, total_insert_block);
432        self.metrics.record_duration(metrics::Action::SaveBlocksWriteState, total_write_state);
433        self.metrics
434            .record_duration(metrics::Action::SaveBlocksWriteHashedState, total_write_hashed_state);
435        self.metrics.record_duration(
436            metrics::Action::SaveBlocksWriteTrieChangesets,
437            total_write_trie_changesets,
438        );
439        self.metrics
440            .record_duration(metrics::Action::SaveBlocksWriteTrieUpdates, total_write_trie_updates);
441        self.metrics.record_duration(
442            metrics::Action::SaveBlocksUpdateHistoryIndices,
443            duration_update_history_indices,
444        );
445        self.metrics.record_duration(
446            metrics::Action::SaveBlocksUpdatePipelineStages,
447            duration_update_pipeline_stages,
448        );
449
450        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
451
452        Ok(())
453    }
454
455    /// Unwinds trie state starting at and including the given block.
456    ///
457    /// This includes calculating the resulted state root and comparing it with the parent block
458    /// state root.
459    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
460        let changed_accounts = self
461            .tx
462            .cursor_read::<tables::AccountChangeSets>()?
463            .walk_range(from..)?
464            .collect::<Result<Vec<_>, _>>()?;
465
466        // Unwind account hashes.
467        self.unwind_account_hashing(changed_accounts.iter())?;
468
469        // Unwind account history indices.
470        self.unwind_account_history_indices(changed_accounts.iter())?;
471
472        let storage_start = BlockNumberAddress((from, Address::ZERO));
473        let changed_storages = self
474            .tx
475            .cursor_read::<tables::StorageChangeSets>()?
476            .walk_range(storage_start..)?
477            .collect::<Result<Vec<_>, _>>()?;
478
479        // Unwind storage hashes.
480        self.unwind_storage_hashing(changed_storages.iter().copied())?;
481
482        // Unwind storage history indices.
483        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
484
485        // Unwind accounts/storages trie tables using the revert.
486        let trie_revert = self.trie_reverts(from)?;
487        self.write_trie_updates_sorted(&trie_revert)?;
488
489        // Clear trie changesets which have been unwound.
490        self.clear_trie_changesets_from(from)?;
491
492        Ok(())
493    }
494
495    /// Removes receipts from all transactions starting with provided number (inclusive).
496    fn remove_receipts_from(
497        &self,
498        from_tx: TxNumber,
499        last_block: BlockNumber,
500    ) -> ProviderResult<()> {
501        // iterate over block body and remove receipts
502        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
503
504        if EitherWriter::receipts_destination(self).is_static_file() {
505            let static_file_receipt_num =
506                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
507
508            let to_delete = static_file_receipt_num
509                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
510                .unwrap_or_default();
511
512            self.static_file_provider
513                .latest_writer(StaticFileSegment::Receipts)?
514                .prune_receipts(to_delete, last_block)?;
515        }
516
517        Ok(())
518    }
519}
520
521impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
522    fn try_into_history_at_block(
523        self,
524        mut block_number: BlockNumber,
525    ) -> ProviderResult<StateProviderBox> {
526        // if the block number is the same as the currently best block number on disk we can use the
527        // latest state provider here
528        if block_number == self.best_block_number().unwrap_or_default() {
529            return Ok(Box::new(LatestStateProvider::new(self)))
530        }
531
532        // +1 as the changeset that we want is the one that was applied after this block.
533        block_number += 1;
534
535        let account_history_prune_checkpoint =
536            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
537        let storage_history_prune_checkpoint =
538            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
539
540        let mut state_provider = HistoricalStateProvider::new(self, block_number);
541
542        // If we pruned account or storage history, we can't return state on every historical block.
543        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
544        if let Some(prune_checkpoint_block_number) =
545            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
546        {
547            state_provider = state_provider.with_lowest_available_account_history_block_number(
548                prune_checkpoint_block_number + 1,
549            );
550        }
551        if let Some(prune_checkpoint_block_number) =
552            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
553        {
554            state_provider = state_provider.with_lowest_available_storage_history_block_number(
555                prune_checkpoint_block_number + 1,
556            );
557        }
558
559        Ok(Box::new(state_provider))
560    }
561}
562
563/// For a given key, unwind all history shards that contain block numbers at or above the given
564/// block number.
565///
566/// S - Sharded key subtype.
567/// T - Table to walk over.
568/// C - Cursor implementation.
569///
570/// This function walks the entries from the given start key and deletes all shards that belong to
571/// the key and contain block numbers at or above the given block number. Shards entirely below
572/// the block number are preserved.
573///
574/// The boundary shard (the shard that spans across the block number) is removed from the database.
575/// Any indices that are below the block number are filtered out and returned for reinsertion.
576/// The boundary shard is returned for reinsertion (if it's not empty).
577fn unwind_history_shards<S, T, C>(
578    cursor: &mut C,
579    start_key: T::Key,
580    block_number: BlockNumber,
581    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
582) -> ProviderResult<Vec<u64>>
583where
584    T: Table<Value = BlockNumberList>,
585    T::Key: AsRef<ShardedKey<S>>,
586    C: DbCursorRO<T> + DbCursorRW<T>,
587{
588    // Start from the given key and iterate through shards
589    let mut item = cursor.seek_exact(start_key)?;
590    while let Some((sharded_key, list)) = item {
591        // If the shard does not belong to the key, break.
592        if !shard_belongs_to_key(&sharded_key) {
593            break
594        }
595
596        // Always delete the current shard from the database first
597        // We'll decide later what (if anything) to reinsert
598        cursor.delete_current()?;
599
600        // Get the first (lowest) block number in this shard
601        // All block numbers in a shard are sorted in ascending order
602        let first = list.iter().next().expect("List can't be empty");
603
604        // Case 1: Entire shard is at or above the unwinding point
605        // Keep it deleted (don't return anything for reinsertion)
606        if first >= block_number {
607            item = cursor.prev()?;
608            continue
609        }
610        // Case 2: This is a boundary shard (spans across the unwinding point)
611        // The shard contains some blocks below and some at/above the unwinding point
612        else if block_number <= sharded_key.as_ref().highest_block_number {
613            // Return only the block numbers that are below the unwinding point
614            // These will be reinserted to preserve the historical data
615            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
616        }
617        // Case 3: Entire shard is below the unwinding point
618        // Return all block numbers for reinsertion (preserve entire shard)
619        return Ok(list.iter().collect::<Vec<_>>())
620    }
621
622    // No shards found or all processed
623    Ok(Vec::new())
624}
625
626impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
627    /// Creates a provider with an inner read-only transaction.
628    pub fn new(
629        tx: TX,
630        chain_spec: Arc<N::ChainSpec>,
631        static_file_provider: StaticFileProvider<N::Primitives>,
632        prune_modes: PruneModes,
633        storage: Arc<N::Storage>,
634        storage_settings: Arc<RwLock<StorageSettings>>,
635        rocksdb_provider: RocksDBProvider,
636    ) -> Self {
637        Self {
638            tx,
639            chain_spec,
640            static_file_provider,
641            prune_modes,
642            storage,
643            storage_settings,
644            rocksdb_provider,
645            #[cfg(all(unix, feature = "rocksdb"))]
646            pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
647            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
648            metrics: metrics::DatabaseProviderMetrics::default(),
649        }
650    }
651
652    /// Consume `DbTx` or `DbTxMut`.
653    pub fn into_tx(self) -> TX {
654        self.tx
655    }
656
657    /// Pass `DbTx` or `DbTxMut` mutable reference.
658    pub const fn tx_mut(&mut self) -> &mut TX {
659        &mut self.tx
660    }
661
662    /// Pass `DbTx` or `DbTxMut` immutable reference.
663    pub const fn tx_ref(&self) -> &TX {
664        &self.tx
665    }
666
667    /// Returns a reference to the chain specification.
668    pub fn chain_spec(&self) -> &N::ChainSpec {
669        &self.chain_spec
670    }
671
672    /// Executes a closure with a `RocksDB` transaction for reading.
673    ///
674    /// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads.
675    fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
676    where
677        F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
678    {
679        #[cfg(all(unix, feature = "rocksdb"))]
680        let rocksdb = self.rocksdb_provider();
681        #[cfg(all(unix, feature = "rocksdb"))]
682        let rocksdb_tx = rocksdb.tx();
683        #[cfg(all(unix, feature = "rocksdb"))]
684        let rocksdb_tx_ref = &rocksdb_tx;
685        #[cfg(not(all(unix, feature = "rocksdb")))]
686        let rocksdb_tx_ref = ();
687
688        f(rocksdb_tx_ref)
689    }
690}
691
692impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
693    fn recovered_block<H, HF, B, BF>(
694        &self,
695        id: BlockHashOrNumber,
696        _transaction_kind: TransactionVariant,
697        header_by_number: HF,
698        construct_block: BF,
699    ) -> ProviderResult<Option<B>>
700    where
701        H: AsRef<HeaderTy<N>>,
702        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
703        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
704    {
705        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
706        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
707
708        // Get the block body
709        //
710        // If the body indices are not found, this means that the transactions either do not exist
711        // in the database yet, or they do exit but are not indexed. If they exist but are not
712        // indexed, we don't have enough information to return the block anyways, so we return
713        // `None`.
714        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
715
716        let tx_range = body.tx_num_range();
717
718        let (transactions, senders) = if tx_range.is_empty() {
719            (vec![], vec![])
720        } else {
721            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
722        };
723
724        let body = self
725            .storage
726            .reader()
727            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
728            .pop()
729            .ok_or(ProviderError::InvalidStorageOutput)?;
730
731        construct_block(header, body, senders)
732    }
733
734    /// Returns a range of blocks from the database.
735    ///
736    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
737    /// construct blocks from the following inputs:
738    ///     – Header
739    ///     - Range of transaction numbers
740    ///     – Ommers
741    ///     – Withdrawals
742    ///     – Senders
743    fn block_range<F, H, HF, R>(
744        &self,
745        range: RangeInclusive<BlockNumber>,
746        headers_range: HF,
747        mut assemble_block: F,
748    ) -> ProviderResult<Vec<R>>
749    where
750        H: AsRef<HeaderTy<N>>,
751        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
752        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
753    {
754        if range.is_empty() {
755            return Ok(Vec::new())
756        }
757
758        let len = range.end().saturating_sub(*range.start()) as usize;
759        let mut blocks = Vec::with_capacity(len);
760
761        let headers = headers_range(range.clone())?;
762
763        // If the body indices are not found, this means that the transactions either do
764        // not exist in the database yet, or they do exit but are
765        // not indexed. If they exist but are not indexed, we don't
766        // have enough information to return the block anyways, so
767        // we skip the block.
768        let present_headers = self
769            .block_body_indices_range(range)?
770            .into_iter()
771            .map(|b| b.tx_num_range())
772            .zip(headers)
773            .collect::<Vec<_>>();
774
775        let mut inputs = Vec::with_capacity(present_headers.len());
776        for (tx_range, header) in &present_headers {
777            let transactions = if tx_range.is_empty() {
778                Vec::new()
779            } else {
780                self.transactions_by_tx_range(tx_range.clone())?
781            };
782
783            inputs.push((header.as_ref(), transactions));
784        }
785
786        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
787
788        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
789            blocks.push(assemble_block(header, body, tx_range)?);
790        }
791
792        Ok(blocks)
793    }
794
795    /// Returns a range of blocks from the database, along with the senders of each
796    /// transaction in the blocks.
797    ///
798    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
799    /// construct blocks from the following inputs:
800    ///     – Header
801    ///     - Transactions
802    ///     – Ommers
803    ///     – Withdrawals
804    ///     – Senders
805    fn block_with_senders_range<H, HF, B, BF>(
806        &self,
807        range: RangeInclusive<BlockNumber>,
808        headers_range: HF,
809        assemble_block: BF,
810    ) -> ProviderResult<Vec<B>>
811    where
812        H: AsRef<HeaderTy<N>>,
813        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
814        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
815    {
816        self.block_range(range, headers_range, |header, body, tx_range| {
817            let senders = if tx_range.is_empty() {
818                Vec::new()
819            } else {
820                let known_senders: HashMap<TxNumber, Address> =
821                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
822
823                let mut senders = Vec::with_capacity(body.transactions().len());
824                for (tx_num, tx) in tx_range.zip(body.transactions()) {
825                    match known_senders.get(&tx_num) {
826                        None => {
827                            // recover the sender from the transaction if not found
828                            let sender = tx.recover_signer_unchecked()?;
829                            senders.push(sender);
830                        }
831                        Some(sender) => senders.push(*sender),
832                    }
833                }
834
835                senders
836            };
837
838            assemble_block(header, body, senders)
839        })
840    }
841
842    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
843    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
844    /// account changesets.
845    fn populate_bundle_state<A, S>(
846        &self,
847        account_changeset: Vec<(u64, AccountBeforeTx)>,
848        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
849        plain_accounts_cursor: &mut A,
850        plain_storage_cursor: &mut S,
851    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
852    where
853        A: DbCursorRO<PlainAccountState>,
854        S: DbDupCursorRO<PlainStorageState>,
855    {
856        // iterate previous value and get plain state value to create changeset
857        // Double option around Account represent if Account state is know (first option) and
858        // account is removed (Second Option)
859        let mut state: BundleStateInit = HashMap::default();
860
861        // This is not working for blocks that are not at tip. as plain state is not the last
862        // state of end range. We should rename the functions or add support to access
863        // History state. Accessing history state can be tricky but we are not gaining
864        // anything.
865
866        let mut reverts: RevertsInit = HashMap::default();
867
868        // add account changeset changes
869        for (block_number, account_before) in account_changeset.into_iter().rev() {
870            let AccountBeforeTx { info: old_info, address } = account_before;
871            match state.entry(address) {
872                hash_map::Entry::Vacant(entry) => {
873                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
874                    entry.insert((old_info, new_info, HashMap::default()));
875                }
876                hash_map::Entry::Occupied(mut entry) => {
877                    // overwrite old account state.
878                    entry.get_mut().0 = old_info;
879                }
880            }
881            // insert old info into reverts.
882            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
883        }
884
885        // add storage changeset changes
886        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
887            let BlockNumberAddress((block_number, address)) = block_and_address;
888            // get account state or insert from plain state.
889            let account_state = match state.entry(address) {
890                hash_map::Entry::Vacant(entry) => {
891                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
892                    entry.insert((present_info, present_info, HashMap::default()))
893                }
894                hash_map::Entry::Occupied(entry) => entry.into_mut(),
895            };
896
897            // match storage.
898            match account_state.2.entry(old_storage.key) {
899                hash_map::Entry::Vacant(entry) => {
900                    let new_storage = plain_storage_cursor
901                        .seek_by_key_subkey(address, old_storage.key)?
902                        .filter(|storage| storage.key == old_storage.key)
903                        .unwrap_or_default();
904                    entry.insert((old_storage.value, new_storage.value));
905                }
906                hash_map::Entry::Occupied(mut entry) => {
907                    entry.get_mut().0 = old_storage.value;
908                }
909            };
910
911            reverts
912                .entry(block_number)
913                .or_default()
914                .entry(address)
915                .or_default()
916                .1
917                .push(old_storage);
918        }
919
920        Ok((state, reverts))
921    }
922}
923
924impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
925    /// Insert history index to the database.
926    ///
927    /// For each updated partial key, this function retrieves the last shard from the database
928    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
929    /// the shards back into the database.
930    ///
931    /// This function is used by history indexing stages.
932    fn append_history_index<P, T>(
933        &self,
934        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
935        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
936    ) -> ProviderResult<()>
937    where
938        P: Copy,
939        T: Table<Value = BlockNumberList>,
940    {
941        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
942        // will append duplicate entries instead of updating existing ones, causing data corruption.
943        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
944
945        let mut cursor = self.tx.cursor_write::<T>()?;
946
947        for (partial_key, indices) in index_updates {
948            let last_key = sharded_key_factory(partial_key, u64::MAX);
949            let mut last_shard = cursor
950                .seek_exact(last_key.clone())?
951                .map(|(_, list)| list)
952                .unwrap_or_else(BlockNumberList::empty);
953
954            last_shard.append(indices).map_err(ProviderError::other)?;
955
956            // fast path: all indices fit in one shard
957            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
958                cursor.upsert(last_key, &last_shard)?;
959                continue;
960            }
961
962            // slow path: rechunk into multiple shards
963            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
964            let mut chunks_peekable = chunks.into_iter().peekable();
965
966            while let Some(chunk) = chunks_peekable.next() {
967                let shard = BlockNumberList::new_pre_sorted(chunk);
968                let highest_block_number = if chunks_peekable.peek().is_some() {
969                    shard.iter().next_back().expect("`chunks` does not return empty list")
970                } else {
971                    // Insert last list with `u64::MAX`.
972                    u64::MAX
973                };
974
975                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
976            }
977        }
978
979        Ok(())
980    }
981}
982
983impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
984    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
985        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
986    }
987}
988
989impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
990    fn changed_accounts_with_range(
991        &self,
992        range: RangeInclusive<BlockNumber>,
993    ) -> ProviderResult<BTreeSet<Address>> {
994        let mut reader = EitherReader::new_account_changesets(self)?;
995
996        reader.changed_accounts_with_range(range)
997    }
998
999    fn basic_accounts(
1000        &self,
1001        iter: impl IntoIterator<Item = Address>,
1002    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1003        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1004        Ok(iter
1005            .into_iter()
1006            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
1007            .collect::<Result<Vec<_>, _>>()?)
1008    }
1009
1010    fn changed_accounts_and_blocks_with_range(
1011        &self,
1012        range: RangeInclusive<BlockNumber>,
1013    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1014        let highest_static_block = self
1015            .static_file_provider
1016            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1017
1018        if let Some(highest) = highest_static_block &&
1019            self.cached_storage_settings().account_changesets_in_static_files
1020        {
1021            let start = *range.start();
1022            let static_end = (*range.end()).min(highest + 1);
1023
1024            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1025            if start <= static_end {
1026                for block in start..=static_end {
1027                    let block_changesets = self.account_block_changeset(block)?;
1028                    for changeset in block_changesets {
1029                        changed_accounts_and_blocks
1030                            .entry(changeset.address)
1031                            .or_default()
1032                            .push(block);
1033                    }
1034                }
1035            }
1036
1037            Ok(changed_accounts_and_blocks)
1038        } else {
1039            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1040
1041            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1042                BTreeMap::new(),
1043                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1044                    let (index, account) = entry?;
1045                    accounts.entry(account.address).or_default().push(index);
1046                    Ok(accounts)
1047                },
1048            )?;
1049
1050            Ok(account_transitions)
1051        }
1052    }
1053}
1054
1055impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1056    fn storage_changeset(
1057        &self,
1058        block_number: BlockNumber,
1059    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1060        let range = block_number..=block_number;
1061        let storage_range = BlockNumberAddress::range(range);
1062        self.tx
1063            .cursor_dup_read::<tables::StorageChangeSets>()?
1064            .walk_range(storage_range)?
1065            .map(|result| -> ProviderResult<_> { Ok(result?) })
1066            .collect()
1067    }
1068}
1069
1070impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1071    fn account_block_changeset(
1072        &self,
1073        block_number: BlockNumber,
1074    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1075        if self.cached_storage_settings().account_changesets_in_static_files {
1076            let static_changesets =
1077                self.static_file_provider.account_block_changeset(block_number)?;
1078            Ok(static_changesets)
1079        } else {
1080            let range = block_number..=block_number;
1081            self.tx
1082                .cursor_read::<tables::AccountChangeSets>()?
1083                .walk_range(range)?
1084                .map(|result| -> ProviderResult<_> {
1085                    let (_, account_before) = result?;
1086                    Ok(account_before)
1087                })
1088                .collect()
1089        }
1090    }
1091
1092    fn get_account_before_block(
1093        &self,
1094        block_number: BlockNumber,
1095        address: Address,
1096    ) -> ProviderResult<Option<AccountBeforeTx>> {
1097        if self.cached_storage_settings().account_changesets_in_static_files {
1098            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1099        } else {
1100            self.tx
1101                .cursor_dup_read::<tables::AccountChangeSets>()?
1102                .seek_by_key_subkey(block_number, address)?
1103                .filter(|acc| acc.address == address)
1104                .map(Ok)
1105                .transpose()
1106        }
1107    }
1108
1109    fn account_changesets_range(
1110        &self,
1111        range: impl core::ops::RangeBounds<BlockNumber>,
1112    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1113        let range = to_range(range);
1114        let mut changesets = Vec::new();
1115        if self.cached_storage_settings().account_changesets_in_static_files &&
1116            let Some(highest) = self
1117                .static_file_provider
1118                .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1119        {
1120            let static_end = range.end.min(highest + 1);
1121            if range.start < static_end {
1122                for block in range.start..static_end {
1123                    let block_changesets = self.account_block_changeset(block)?;
1124                    for changeset in block_changesets {
1125                        changesets.push((block, changeset));
1126                    }
1127                }
1128            }
1129        } else {
1130            // Fetch from database for blocks not in static files
1131            let mut cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1132            for entry in cursor.walk_range(range)? {
1133                let (block_num, account_before) = entry?;
1134                changesets.push((block_num, account_before));
1135            }
1136        }
1137
1138        Ok(changesets)
1139    }
1140
1141    fn account_changeset_count(&self) -> ProviderResult<usize> {
1142        // check if account changesets are in static files, otherwise just count the changeset
1143        // entries in the DB
1144        if self.cached_storage_settings().account_changesets_in_static_files {
1145            self.static_file_provider.account_changeset_count()
1146        } else {
1147            Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1148        }
1149    }
1150}
1151
1152impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1153    for DatabaseProvider<TX, N>
1154{
1155    type Header = HeaderTy<N>;
1156
1157    fn local_tip_header(
1158        &self,
1159        highest_uninterrupted_block: BlockNumber,
1160    ) -> ProviderResult<SealedHeader<Self::Header>> {
1161        let static_file_provider = self.static_file_provider();
1162
1163        // Make sure Headers static file is at the same height. If it's further, this
1164        // input execution was interrupted previously and we need to unwind the static file.
1165        let next_static_file_block_num = static_file_provider
1166            .get_highest_static_file_block(StaticFileSegment::Headers)
1167            .map(|id| id + 1)
1168            .unwrap_or_default();
1169        let next_block = highest_uninterrupted_block + 1;
1170
1171        match next_static_file_block_num.cmp(&next_block) {
1172            // The node shutdown between an executed static file commit and before the database
1173            // commit, so we need to unwind the static files.
1174            Ordering::Greater => {
1175                let mut static_file_producer =
1176                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1177                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1178                // Since this is a database <-> static file inconsistency, we commit the change
1179                // straight away.
1180                static_file_producer.commit()?
1181            }
1182            Ordering::Less => {
1183                // There's either missing or corrupted files.
1184                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1185            }
1186            Ordering::Equal => {}
1187        }
1188
1189        let local_head = static_file_provider
1190            .sealed_header(highest_uninterrupted_block)?
1191            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1192
1193        Ok(local_head)
1194    }
1195}
1196
1197impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1198    type Header = HeaderTy<N>;
1199
1200    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1201        if let Some(num) = self.block_number(block_hash)? {
1202            Ok(self.header_by_number(num)?)
1203        } else {
1204            Ok(None)
1205        }
1206    }
1207
1208    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1209        self.static_file_provider.header_by_number(num)
1210    }
1211
1212    fn headers_range(
1213        &self,
1214        range: impl RangeBounds<BlockNumber>,
1215    ) -> ProviderResult<Vec<Self::Header>> {
1216        self.static_file_provider.headers_range(range)
1217    }
1218
1219    fn sealed_header(
1220        &self,
1221        number: BlockNumber,
1222    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1223        self.static_file_provider.sealed_header(number)
1224    }
1225
1226    fn sealed_headers_while(
1227        &self,
1228        range: impl RangeBounds<BlockNumber>,
1229        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1230    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1231        self.static_file_provider.sealed_headers_while(range, predicate)
1232    }
1233}
1234
1235impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1236    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1237        self.static_file_provider.block_hash(number)
1238    }
1239
1240    fn canonical_hashes_range(
1241        &self,
1242        start: BlockNumber,
1243        end: BlockNumber,
1244    ) -> ProviderResult<Vec<B256>> {
1245        self.static_file_provider.canonical_hashes_range(start, end)
1246    }
1247}
1248
1249impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1250    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1251        let best_number = self.best_block_number()?;
1252        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1253        Ok(ChainInfo { best_hash, best_number })
1254    }
1255
1256    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1257        // The best block number is tracked via the finished stage which gets updated in the same tx
1258        // when new blocks committed
1259        Ok(self
1260            .get_stage_checkpoint(StageId::Finish)?
1261            .map(|checkpoint| checkpoint.block_number)
1262            .unwrap_or_default())
1263    }
1264
1265    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1266        self.static_file_provider.last_block_number()
1267    }
1268
1269    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1270        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1271    }
1272}
1273
1274impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1275    type Block = BlockTy<N>;
1276
1277    fn find_block_by_hash(
1278        &self,
1279        hash: B256,
1280        source: BlockSource,
1281    ) -> ProviderResult<Option<Self::Block>> {
1282        if source.is_canonical() {
1283            self.block(hash.into())
1284        } else {
1285            Ok(None)
1286        }
1287    }
1288
1289    /// Returns the block with matching number from database.
1290    ///
1291    /// If the header for this block is not found, this returns `None`.
1292    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1293    /// will return None.
1294    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1295        if let Some(number) = self.convert_hash_or_number(id)? &&
1296            let Some(header) = self.header_by_number(number)?
1297        {
1298            // If the body indices are not found, this means that the transactions either do not
1299            // exist in the database yet, or they do exit but are not indexed.
1300            // If they exist but are not indexed, we don't have enough
1301            // information to return the block anyways, so we return `None`.
1302            let Some(transactions) = self.transactions_by_block(number.into())? else {
1303                return Ok(None)
1304            };
1305
1306            let body = self
1307                .storage
1308                .reader()
1309                .read_block_bodies(self, vec![(&header, transactions)])?
1310                .pop()
1311                .ok_or(ProviderError::InvalidStorageOutput)?;
1312
1313            return Ok(Some(Self::Block::new(header, body)))
1314        }
1315
1316        Ok(None)
1317    }
1318
1319    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1320        Ok(None)
1321    }
1322
1323    fn pending_block_and_receipts(
1324        &self,
1325    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1326        Ok(None)
1327    }
1328
1329    /// Returns the block with senders with matching number or hash from database.
1330    ///
1331    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1332    /// spot, and we want fast querying.**
1333    ///
1334    /// If the header for this block is not found, this returns `None`.
1335    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1336    /// will return None.
1337    fn recovered_block(
1338        &self,
1339        id: BlockHashOrNumber,
1340        transaction_kind: TransactionVariant,
1341    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1342        self.recovered_block(
1343            id,
1344            transaction_kind,
1345            |block_number| self.header_by_number(block_number),
1346            |header, body, senders| {
1347                Self::Block::new(header, body)
1348                    // Note: we're using unchecked here because we know the block contains valid txs
1349                    // wrt to its height and can ignore the s value check so pre
1350                    // EIP-2 txs are allowed
1351                    .try_into_recovered_unchecked(senders)
1352                    .map(Some)
1353                    .map_err(|_| ProviderError::SenderRecoveryError)
1354            },
1355        )
1356    }
1357
1358    fn sealed_block_with_senders(
1359        &self,
1360        id: BlockHashOrNumber,
1361        transaction_kind: TransactionVariant,
1362    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1363        self.recovered_block(
1364            id,
1365            transaction_kind,
1366            |block_number| self.sealed_header(block_number),
1367            |header, body, senders| {
1368                Self::Block::new_sealed(header, body)
1369                    // Note: we're using unchecked here because we know the block contains valid txs
1370                    // wrt to its height and can ignore the s value check so pre
1371                    // EIP-2 txs are allowed
1372                    .try_with_senders_unchecked(senders)
1373                    .map(Some)
1374                    .map_err(|_| ProviderError::SenderRecoveryError)
1375            },
1376        )
1377    }
1378
1379    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1380        self.block_range(
1381            range,
1382            |range| self.headers_range(range),
1383            |header, body, _| Ok(Self::Block::new(header, body)),
1384        )
1385    }
1386
1387    fn block_with_senders_range(
1388        &self,
1389        range: RangeInclusive<BlockNumber>,
1390    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1391        self.block_with_senders_range(
1392            range,
1393            |range| self.headers_range(range),
1394            |header, body, senders| {
1395                Self::Block::new(header, body)
1396                    .try_into_recovered_unchecked(senders)
1397                    .map_err(|_| ProviderError::SenderRecoveryError)
1398            },
1399        )
1400    }
1401
1402    fn recovered_block_range(
1403        &self,
1404        range: RangeInclusive<BlockNumber>,
1405    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1406        self.block_with_senders_range(
1407            range,
1408            |range| self.sealed_headers_range(range),
1409            |header, body, senders| {
1410                Self::Block::new_sealed(header, body)
1411                    .try_with_senders(senders)
1412                    .map_err(|_| ProviderError::SenderRecoveryError)
1413            },
1414        )
1415    }
1416
1417    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1418        Ok(self
1419            .tx
1420            .cursor_read::<tables::TransactionBlocks>()?
1421            .seek(id)
1422            .map(|b| b.map(|(_, bn)| bn))?)
1423    }
1424}
1425
1426impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1427    for DatabaseProvider<TX, N>
1428{
1429    /// Recovers transaction hashes by walking through `Transactions` table and
1430    /// calculating them in a parallel manner. Returned unsorted.
1431    fn transaction_hashes_by_range(
1432        &self,
1433        tx_range: Range<TxNumber>,
1434    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1435        self.static_file_provider.transaction_hashes_by_range(tx_range)
1436    }
1437}
1438
1439// Calculates the hash of the given transaction
1440impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1441    type Transaction = TxTy<N>;
1442
1443    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1444        self.with_rocksdb_tx(|tx_ref| {
1445            let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1446            reader.get_transaction_hash_number(tx_hash)
1447        })
1448    }
1449
1450    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1451        self.static_file_provider.transaction_by_id(id)
1452    }
1453
1454    fn transaction_by_id_unhashed(
1455        &self,
1456        id: TxNumber,
1457    ) -> ProviderResult<Option<Self::Transaction>> {
1458        self.static_file_provider.transaction_by_id_unhashed(id)
1459    }
1460
1461    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1462        if let Some(id) = self.transaction_id(hash)? {
1463            Ok(self.transaction_by_id_unhashed(id)?)
1464        } else {
1465            Ok(None)
1466        }
1467    }
1468
1469    fn transaction_by_hash_with_meta(
1470        &self,
1471        tx_hash: TxHash,
1472    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1473        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1474            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1475            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1476            let Some(sealed_header) = self.sealed_header(block_number)?
1477        {
1478            let (header, block_hash) = sealed_header.split();
1479            if let Some(block_body) = self.block_body_indices(block_number)? {
1480                // the index of the tx in the block is the offset:
1481                // len([start..tx_id])
1482                // NOTE: `transaction_id` is always `>=` the block's first
1483                // index
1484                let index = transaction_id - block_body.first_tx_num();
1485
1486                let meta = TransactionMeta {
1487                    tx_hash,
1488                    index,
1489                    block_hash,
1490                    block_number,
1491                    base_fee: header.base_fee_per_gas(),
1492                    excess_blob_gas: header.excess_blob_gas(),
1493                    timestamp: header.timestamp(),
1494                };
1495
1496                return Ok(Some((transaction, meta)))
1497            }
1498        }
1499
1500        Ok(None)
1501    }
1502
1503    fn transactions_by_block(
1504        &self,
1505        id: BlockHashOrNumber,
1506    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1507        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1508            let Some(body) = self.block_body_indices(block_number)?
1509        {
1510            let tx_range = body.tx_num_range();
1511            return if tx_range.is_empty() {
1512                Ok(Some(Vec::new()))
1513            } else {
1514                self.transactions_by_tx_range(tx_range).map(Some)
1515            }
1516        }
1517        Ok(None)
1518    }
1519
1520    fn transactions_by_block_range(
1521        &self,
1522        range: impl RangeBounds<BlockNumber>,
1523    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1524        let range = to_range(range);
1525
1526        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1527            .into_iter()
1528            .map(|body| {
1529                let tx_num_range = body.tx_num_range();
1530                if tx_num_range.is_empty() {
1531                    Ok(Vec::new())
1532                } else {
1533                    self.transactions_by_tx_range(tx_num_range)
1534                }
1535            })
1536            .collect()
1537    }
1538
1539    fn transactions_by_tx_range(
1540        &self,
1541        range: impl RangeBounds<TxNumber>,
1542    ) -> ProviderResult<Vec<Self::Transaction>> {
1543        self.static_file_provider.transactions_by_tx_range(range)
1544    }
1545
1546    fn senders_by_tx_range(
1547        &self,
1548        range: impl RangeBounds<TxNumber>,
1549    ) -> ProviderResult<Vec<Address>> {
1550        if EitherWriterDestination::senders(self).is_static_file() {
1551            self.static_file_provider.senders_by_tx_range(range)
1552        } else {
1553            self.cursor_read_collect::<tables::TransactionSenders>(range)
1554        }
1555    }
1556
1557    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1558        if EitherWriterDestination::senders(self).is_static_file() {
1559            self.static_file_provider.transaction_sender(id)
1560        } else {
1561            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1562        }
1563    }
1564}
1565
1566impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1567    type Receipt = ReceiptTy<N>;
1568
1569    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1570        self.static_file_provider.get_with_static_file_or_database(
1571            StaticFileSegment::Receipts,
1572            id,
1573            |static_file| static_file.receipt(id),
1574            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1575        )
1576    }
1577
1578    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1579        if let Some(id) = self.transaction_id(hash)? {
1580            self.receipt(id)
1581        } else {
1582            Ok(None)
1583        }
1584    }
1585
1586    fn receipts_by_block(
1587        &self,
1588        block: BlockHashOrNumber,
1589    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1590        if let Some(number) = self.convert_hash_or_number(block)? &&
1591            let Some(body) = self.block_body_indices(number)?
1592        {
1593            let tx_range = body.tx_num_range();
1594            return if tx_range.is_empty() {
1595                Ok(Some(Vec::new()))
1596            } else {
1597                self.receipts_by_tx_range(tx_range).map(Some)
1598            }
1599        }
1600        Ok(None)
1601    }
1602
1603    fn receipts_by_tx_range(
1604        &self,
1605        range: impl RangeBounds<TxNumber>,
1606    ) -> ProviderResult<Vec<Self::Receipt>> {
1607        self.static_file_provider.get_range_with_static_file_or_database(
1608            StaticFileSegment::Receipts,
1609            to_range(range),
1610            |static_file, range, _| static_file.receipts_by_tx_range(range),
1611            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1612            |_| true,
1613        )
1614    }
1615
1616    fn receipts_by_block_range(
1617        &self,
1618        block_range: RangeInclusive<BlockNumber>,
1619    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1620        if block_range.is_empty() {
1621            return Ok(Vec::new());
1622        }
1623
1624        // collect block body indices for each block in the range
1625        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
1626        let mut block_body_indices = Vec::with_capacity(range_len);
1627        for block_num in block_range {
1628            if let Some(indices) = self.block_body_indices(block_num)? {
1629                block_body_indices.push(indices);
1630            } else {
1631                // use default indices for missing blocks (empty block)
1632                block_body_indices.push(StoredBlockBodyIndices::default());
1633            }
1634        }
1635
1636        if block_body_indices.is_empty() {
1637            return Ok(Vec::new());
1638        }
1639
1640        // find blocks with transactions to determine transaction range
1641        let non_empty_blocks: Vec<_> =
1642            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1643
1644        if non_empty_blocks.is_empty() {
1645            // all blocks are empty
1646            return Ok(vec![Vec::new(); block_body_indices.len()]);
1647        }
1648
1649        // calculate the overall transaction range
1650        let first_tx = non_empty_blocks[0].first_tx_num();
1651        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1652
1653        // fetch all receipts in the transaction range
1654        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1655        let mut receipts_iter = all_receipts.into_iter();
1656
1657        // distribute receipts to their respective blocks
1658        let mut result = Vec::with_capacity(block_body_indices.len());
1659        for indices in &block_body_indices {
1660            if indices.tx_count == 0 {
1661                result.push(Vec::new());
1662            } else {
1663                let block_receipts =
1664                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1665                result.push(block_receipts);
1666            }
1667        }
1668
1669        Ok(result)
1670    }
1671}
1672
1673impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1674    for DatabaseProvider<TX, N>
1675{
1676    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1677        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1678    }
1679
1680    fn block_body_indices_range(
1681        &self,
1682        range: RangeInclusive<BlockNumber>,
1683    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1684        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1685    }
1686}
1687
1688impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1689    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1690        Ok(if let Some(encoded) = id.get_pre_encoded() {
1691            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1692        } else {
1693            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1694        })
1695    }
1696
1697    /// Get stage checkpoint progress.
1698    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1699        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1700    }
1701
1702    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1703        self.tx
1704            .cursor_read::<tables::StageCheckpoints>()?
1705            .walk(None)?
1706            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1707            .map_err(ProviderError::Database)
1708    }
1709}
1710
1711impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1712    /// Save stage checkpoint.
1713    fn save_stage_checkpoint(
1714        &self,
1715        id: StageId,
1716        checkpoint: StageCheckpoint,
1717    ) -> ProviderResult<()> {
1718        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1719    }
1720
1721    /// Save stage checkpoint progress.
1722    fn save_stage_checkpoint_progress(
1723        &self,
1724        id: StageId,
1725        checkpoint: Vec<u8>,
1726    ) -> ProviderResult<()> {
1727        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1728    }
1729
1730    fn update_pipeline_stages(
1731        &self,
1732        block_number: BlockNumber,
1733        drop_stage_checkpoint: bool,
1734    ) -> ProviderResult<()> {
1735        // iterate over all existing stages in the table and update its progress.
1736        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1737        for stage_id in StageId::ALL {
1738            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1739            cursor.upsert(
1740                stage_id.to_string(),
1741                &StageCheckpoint {
1742                    block_number,
1743                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1744                },
1745            )?;
1746        }
1747
1748        Ok(())
1749    }
1750}
1751
1752impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1753    fn plain_state_storages(
1754        &self,
1755        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1756    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1757        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1758
1759        addresses_with_keys
1760            .into_iter()
1761            .map(|(address, storage)| {
1762                storage
1763                    .into_iter()
1764                    .map(|key| -> ProviderResult<_> {
1765                        Ok(plain_storage
1766                            .seek_by_key_subkey(address, key)?
1767                            .filter(|v| v.key == key)
1768                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1769                    })
1770                    .collect::<ProviderResult<Vec<_>>>()
1771                    .map(|storage| (address, storage))
1772            })
1773            .collect::<ProviderResult<Vec<(_, _)>>>()
1774    }
1775
1776    fn changed_storages_with_range(
1777        &self,
1778        range: RangeInclusive<BlockNumber>,
1779    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1780        self.tx
1781            .cursor_read::<tables::StorageChangeSets>()?
1782            .walk_range(BlockNumberAddress::range(range))?
1783            // fold all storages and save its old state so we can remove it from HashedStorage
1784            // it is needed as it is dup table.
1785            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1786                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1787                accounts.entry(address).or_default().insert(storage_entry.key);
1788                Ok(accounts)
1789            })
1790    }
1791
1792    fn changed_storages_and_blocks_with_range(
1793        &self,
1794        range: RangeInclusive<BlockNumber>,
1795    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1796        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1797
1798        let storage_changeset_lists =
1799            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1800                BTreeMap::new(),
1801                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1802                    let (index, storage) = entry?;
1803                    storages
1804                        .entry((index.address(), storage.key))
1805                        .or_default()
1806                        .push(index.block_number());
1807                    Ok(storages)
1808                },
1809            )?;
1810
1811        Ok(storage_changeset_lists)
1812    }
1813}
1814
1815impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1816    for DatabaseProvider<TX, N>
1817{
1818    type Receipt = ReceiptTy<N>;
1819
1820    fn write_state(
1821        &self,
1822        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1823        is_value_known: OriginalValuesKnown,
1824    ) -> ProviderResult<()> {
1825        let first_block = execution_outcome.first_block();
1826        let block_count = execution_outcome.len() as u64;
1827        let last_block = execution_outcome.last_block();
1828        let block_range = first_block..=last_block;
1829
1830        let tip = self.last_block_number()?.max(last_block);
1831
1832        let (plain_state, reverts) =
1833            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1834
1835        self.write_state_reverts(reverts, first_block)?;
1836        self.write_state_changes(plain_state)?;
1837
1838        // Fetch the first transaction number for each block in the range
1839        let block_indices: Vec<_> = self
1840            .block_body_indices_range(block_range)?
1841            .into_iter()
1842            .map(|b| b.first_tx_num)
1843            .collect();
1844
1845        // Ensure all expected blocks are present.
1846        if block_indices.len() < block_count as usize {
1847            let missing_blocks = block_count - block_indices.len() as u64;
1848            return Err(ProviderError::BlockBodyIndicesNotFound(
1849                last_block.saturating_sub(missing_blocks - 1),
1850            ));
1851        }
1852
1853        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
1854
1855        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1856        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1857
1858        // All receipts from the last 128 blocks are required for blockchain tree, even with
1859        // [`PruneSegment::ContractLogs`].
1860        //
1861        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
1862        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
1863        // files yet. Once receipts exist in static files, we must continue writing to maintain
1864        // continuity and have no gaps.
1865        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
1866            self.static_file_provider()
1867                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1868                .is_none()) &&
1869            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
1870
1871        // Prepare set of addresses which logs should not be pruned.
1872        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1873        for (_, addresses) in contract_log_pruner.range(..first_block) {
1874            allowed_addresses.extend(addresses.iter().copied());
1875        }
1876
1877        for (idx, (receipts, first_tx_index)) in
1878            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1879        {
1880            let block_number = first_block + idx as u64;
1881
1882            // Increment block number for receipts static file writer
1883            receipts_writer.increment_block(block_number)?;
1884
1885            // Skip writing receipts if pruning configuration requires us to.
1886            if prunable_receipts &&
1887                self.prune_modes
1888                    .receipts
1889                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1890            {
1891                continue
1892            }
1893
1894            // If there are new addresses to retain after this block number, track them
1895            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1896                allowed_addresses.extend(new_addresses.iter().copied());
1897            }
1898
1899            for (idx, receipt) in receipts.iter().enumerate() {
1900                let receipt_idx = first_tx_index + idx as u64;
1901                // Skip writing receipt if log filter is active and it does not have any logs to
1902                // retain
1903                if prunable_receipts &&
1904                    has_contract_log_filter &&
1905                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1906                {
1907                    continue
1908                }
1909
1910                receipts_writer.append_receipt(receipt_idx, receipt)?;
1911            }
1912        }
1913
1914        Ok(())
1915    }
1916
1917    fn write_state_reverts(
1918        &self,
1919        reverts: PlainStateReverts,
1920        first_block: BlockNumber,
1921    ) -> ProviderResult<()> {
1922        // Write storage changes
1923        tracing::trace!("Writing storage changes");
1924        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1925        let mut storage_changeset_cursor =
1926            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1927        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1928            let block_number = first_block + block_index as BlockNumber;
1929
1930            tracing::trace!(block_number, "Writing block change");
1931            // sort changes by address.
1932            storage_changes.par_sort_unstable_by_key(|a| a.address);
1933            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1934                let storage_id = BlockNumberAddress((block_number, address));
1935
1936                let mut storage = storage_revert
1937                    .into_iter()
1938                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1939                    .collect::<Vec<_>>();
1940                // sort storage slots by key.
1941                storage.par_sort_unstable_by_key(|a| a.0);
1942
1943                // If we are writing the primary storage wipe transition, the pre-existing plain
1944                // storage state has to be taken from the database and written to storage history.
1945                // See [StorageWipe::Primary] for more details.
1946                //
1947                // TODO(mediocregopher): This could be rewritten in a way which doesn't require
1948                // collecting wiped entries into a Vec like this, see
1949                // `write_storage_trie_changesets`.
1950                let mut wiped_storage = Vec::new();
1951                if wiped {
1952                    tracing::trace!(?address, "Wiping storage");
1953                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1954                        wiped_storage.push((entry.key, entry.value));
1955                        while let Some(entry) = storages_cursor.next_dup_val()? {
1956                            wiped_storage.push((entry.key, entry.value))
1957                        }
1958                    }
1959                }
1960
1961                tracing::trace!(?address, ?storage, "Writing storage reverts");
1962                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1963                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1964                }
1965            }
1966        }
1967
1968        // Write account changes to static files
1969        tracing::debug!(target: "sync::stages::merkle_changesets", ?first_block, "Writing account changes");
1970        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1971            let block_number = first_block + block_index as BlockNumber;
1972            let changeset = account_block_reverts
1973                .into_iter()
1974                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1975                .collect::<Vec<_>>();
1976            let mut account_changesets_writer =
1977                EitherWriter::new_account_changesets(self, block_number)?;
1978
1979            account_changesets_writer.append_account_changeset(block_number, changeset)?;
1980        }
1981
1982        Ok(())
1983    }
1984
1985    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1986        // sort all entries so they can be written to database in more performant way.
1987        // and take smaller memory footprint.
1988        changes.accounts.par_sort_by_key(|a| a.0);
1989        changes.storage.par_sort_by_key(|a| a.address);
1990        changes.contracts.par_sort_by_key(|a| a.0);
1991
1992        // Write new account state
1993        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1994        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1995        // write account to database.
1996        for (address, account) in changes.accounts {
1997            if let Some(account) = account {
1998                tracing::trace!(?address, "Updating plain state account");
1999                accounts_cursor.upsert(address, &account.into())?;
2000            } else if accounts_cursor.seek_exact(address)?.is_some() {
2001                tracing::trace!(?address, "Deleting plain state account");
2002                accounts_cursor.delete_current()?;
2003            }
2004        }
2005
2006        // Write bytecode
2007        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2008        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
2009        for (hash, bytecode) in changes.contracts {
2010            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
2011        }
2012
2013        // Write new storage state and wipe storage if needed.
2014        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2015        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2016        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2017            // Wiping of storage.
2018            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2019                storages_cursor.delete_current_duplicates()?;
2020            }
2021            // cast storages to B256.
2022            let mut storage = storage
2023                .into_iter()
2024                .map(|(k, value)| StorageEntry { key: k.into(), value })
2025                .collect::<Vec<_>>();
2026            // sort storage slots by key.
2027            storage.par_sort_unstable_by_key(|a| a.key);
2028
2029            for entry in storage {
2030                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2031                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2032                    db_entry.key == entry.key
2033                {
2034                    storages_cursor.delete_current()?;
2035                }
2036
2037                if !entry.value.is_zero() {
2038                    storages_cursor.upsert(address, &entry)?;
2039                }
2040            }
2041        }
2042
2043        Ok(())
2044    }
2045
2046    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2047        // Write hashed account updates.
2048        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2049        for (hashed_address, account) in hashed_state.accounts() {
2050            if let Some(account) = account {
2051                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2052            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2053                hashed_accounts_cursor.delete_current()?;
2054            }
2055        }
2056
2057        // Write hashed storage changes.
2058        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2059        let mut hashed_storage_cursor =
2060            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2061        for (hashed_address, storage) in sorted_storages {
2062            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2063                hashed_storage_cursor.delete_current_duplicates()?;
2064            }
2065
2066            for (hashed_slot, value) in storage.storage_slots_ref() {
2067                let entry = StorageEntry { key: *hashed_slot, value: *value };
2068
2069                if let Some(db_entry) =
2070                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2071                    db_entry.key == entry.key
2072                {
2073                    hashed_storage_cursor.delete_current()?;
2074                }
2075
2076                if !entry.value.is_zero() {
2077                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2078                }
2079            }
2080        }
2081
2082        Ok(())
2083    }
2084
2085    /// Remove the last N blocks of state.
2086    ///
2087    /// The latest state will be unwound
2088    ///
2089    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2090    ///    transaction ids.
2091    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2092    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2093    ///    the changesets.
2094    ///    - In order to have both the old and new values in the changesets, we also access the
2095    ///      plain state tables.
2096    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2097    ///    we:
2098    ///     1. Take the old value from the changeset
2099    ///     2. Take the new value from the plain state
2100    ///     3. Save the old value to the local state
2101    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2102    ///    have seen before we:
2103    ///     1. Take the old value from the changeset
2104    ///     2. Take the new value from the local state
2105    ///     3. Set the local state to the value in the changeset
2106    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2107        let range = block + 1..=self.last_block_number()?;
2108
2109        if range.is_empty() {
2110            return Ok(());
2111        }
2112
2113        // We are not removing block meta as it is used to get block changesets.
2114        let block_bodies = self.block_body_indices_range(range.clone())?;
2115
2116        // get transaction receipts
2117        let from_transaction_num =
2118            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2119
2120        let storage_range = BlockNumberAddress::range(range.clone());
2121
2122        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2123        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2124
2125        // This is not working for blocks that are not at tip. as plain state is not the last
2126        // state of end range. We should rename the functions or add support to access
2127        // History state. Accessing history state can be tricky but we are not gaining
2128        // anything.
2129        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2130        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2131
2132        let (state, _) = self.populate_bundle_state(
2133            account_changeset,
2134            storage_changeset,
2135            &mut plain_accounts_cursor,
2136            &mut plain_storage_cursor,
2137        )?;
2138
2139        // iterate over local plain state remove all account and all storages.
2140        for (address, (old_account, new_account, storage)) in &state {
2141            // revert account if needed.
2142            if old_account != new_account {
2143                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2144                if let Some(account) = old_account {
2145                    plain_accounts_cursor.upsert(*address, account)?;
2146                } else if existing_entry.is_some() {
2147                    plain_accounts_cursor.delete_current()?;
2148                }
2149            }
2150
2151            // revert storages
2152            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2153                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2154                // delete previous value
2155                if plain_storage_cursor
2156                    .seek_by_key_subkey(*address, *storage_key)?
2157                    .filter(|s| s.key == *storage_key)
2158                    .is_some()
2159                {
2160                    plain_storage_cursor.delete_current()?
2161                }
2162
2163                // insert value if needed
2164                if !old_storage_value.is_zero() {
2165                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2166                }
2167            }
2168        }
2169
2170        self.remove_receipts_from(from_transaction_num, block)?;
2171
2172        Ok(())
2173    }
2174
2175    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2176    ///
2177    /// The latest state will be unwound and returned back with all the blocks
2178    ///
2179    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2180    ///    transaction ids.
2181    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2182    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2183    ///    the changesets.
2184    ///    - In order to have both the old and new values in the changesets, we also access the
2185    ///      plain state tables.
2186    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2187    ///    we:
2188    ///     1. Take the old value from the changeset
2189    ///     2. Take the new value from the plain state
2190    ///     3. Save the old value to the local state
2191    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2192    ///    have seen before we:
2193    ///     1. Take the old value from the changeset
2194    ///     2. Take the new value from the local state
2195    ///     3. Set the local state to the value in the changeset
2196    fn take_state_above(
2197        &self,
2198        block: BlockNumber,
2199    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2200        let range = block + 1..=self.last_block_number()?;
2201
2202        if range.is_empty() {
2203            return Ok(ExecutionOutcome::default())
2204        }
2205        let start_block_number = *range.start();
2206
2207        // We are not removing block meta as it is used to get block changesets.
2208        let block_bodies = self.block_body_indices_range(range.clone())?;
2209
2210        // get transaction receipts
2211        let from_transaction_num =
2212            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2213        let to_transaction_num =
2214            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2215
2216        let storage_range = BlockNumberAddress::range(range.clone());
2217
2218        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2219
2220        // This is not working for blocks that are not at tip. as plain state is not the last
2221        // state of end range. We should rename the functions or add support to access
2222        // History state. Accessing history state can be tricky but we are not gaining
2223        // anything.
2224        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2225        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2226
2227        // if there are static files for this segment, prune them.
2228        let highest_changeset_block = self
2229            .static_file_provider
2230            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2231        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2232            self.cached_storage_settings().account_changesets_in_static_files
2233        {
2234            // TODO: add a `take` method that removes and returns the items instead of doing this
2235            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2236            let mut changeset_writer =
2237                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2238            changeset_writer.prune_account_changesets(block)?;
2239
2240            changesets
2241        } else {
2242            // Have to remove from static files if they exist, otherwise remove using `take` for the
2243            // changeset tables
2244            self.take::<tables::AccountChangeSets>(range)?
2245        };
2246
2247        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2248        // remove, and return later
2249        let (state, reverts) = self.populate_bundle_state(
2250            account_changeset,
2251            storage_changeset,
2252            &mut plain_accounts_cursor,
2253            &mut plain_storage_cursor,
2254        )?;
2255
2256        // iterate over local plain state remove all account and all storages.
2257        for (address, (old_account, new_account, storage)) in &state {
2258            // revert account if needed.
2259            if old_account != new_account {
2260                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2261                if let Some(account) = old_account {
2262                    plain_accounts_cursor.upsert(*address, account)?;
2263                } else if existing_entry.is_some() {
2264                    plain_accounts_cursor.delete_current()?;
2265                }
2266            }
2267
2268            // revert storages
2269            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2270                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2271                // delete previous value
2272                if plain_storage_cursor
2273                    .seek_by_key_subkey(*address, *storage_key)?
2274                    .filter(|s| s.key == *storage_key)
2275                    .is_some()
2276                {
2277                    plain_storage_cursor.delete_current()?
2278                }
2279
2280                // insert value if needed
2281                if !old_storage_value.is_zero() {
2282                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2283                }
2284            }
2285        }
2286
2287        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2288        let mut receipts_iter = self
2289            .static_file_provider
2290            .get_range_with_static_file_or_database(
2291                StaticFileSegment::Receipts,
2292                from_transaction_num..to_transaction_num + 1,
2293                |static_file, range, _| {
2294                    static_file
2295                        .receipts_by_tx_range(range.clone())
2296                        .map(|r| range.into_iter().zip(r).collect())
2297                },
2298                |range, _| {
2299                    self.tx
2300                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2301                        .walk_range(range)?
2302                        .map(|r| r.map_err(Into::into))
2303                        .collect()
2304                },
2305                |_| true,
2306            )?
2307            .into_iter()
2308            .peekable();
2309
2310        let mut receipts = Vec::with_capacity(block_bodies.len());
2311        // loop break if we are at the end of the blocks.
2312        for block_body in block_bodies {
2313            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2314            for num in block_body.tx_num_range() {
2315                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2316                    block_receipts.push(receipts_iter.next().unwrap().1);
2317                }
2318            }
2319            receipts.push(block_receipts);
2320        }
2321
2322        self.remove_receipts_from(from_transaction_num, block)?;
2323
2324        Ok(ExecutionOutcome::new_init(
2325            state,
2326            reverts,
2327            Vec::new(),
2328            receipts,
2329            start_block_number,
2330            Vec::new(),
2331        ))
2332    }
2333}
2334
2335impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2336    /// Writes trie updates to the database with already sorted updates.
2337    ///
2338    /// Returns the number of entries modified.
2339    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2340        if trie_updates.is_empty() {
2341            return Ok(0)
2342        }
2343
2344        // Track the number of inserted entries.
2345        let mut num_entries = 0;
2346
2347        let tx = self.tx_ref();
2348        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2349
2350        // Process sorted account nodes
2351        for (key, updated_node) in trie_updates.account_nodes_ref() {
2352            let nibbles = StoredNibbles(*key);
2353            match updated_node {
2354                Some(node) => {
2355                    if !nibbles.0.is_empty() {
2356                        num_entries += 1;
2357                        account_trie_cursor.upsert(nibbles, node)?;
2358                    }
2359                }
2360                None => {
2361                    num_entries += 1;
2362                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2363                        account_trie_cursor.delete_current()?;
2364                    }
2365                }
2366            }
2367        }
2368
2369        num_entries +=
2370            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2371
2372        Ok(num_entries)
2373    }
2374
2375    /// Records the current values of all trie nodes which will be updated using the `TrieUpdates`
2376    /// into the trie changesets tables.
2377    ///
2378    /// The intended usage of this method is to call it _prior_ to calling `write_trie_updates` with
2379    /// the same `TrieUpdates`.
2380    ///
2381    /// Returns the number of keys written.
2382    fn write_trie_changesets(
2383        &self,
2384        block_number: BlockNumber,
2385        trie_updates: &TrieUpdatesSorted,
2386        updates_overlay: Option<&TrieUpdatesSorted>,
2387    ) -> ProviderResult<usize> {
2388        let mut num_entries = 0;
2389
2390        let mut changeset_cursor =
2391            self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2392        let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2393
2394        // Wrap the cursor in DatabaseAccountTrieCursor
2395        let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2396
2397        // Create empty TrieUpdatesSorted for when updates_overlay is None
2398        let empty_updates = TrieUpdatesSorted::default();
2399        let overlay = updates_overlay.unwrap_or(&empty_updates);
2400
2401        // Wrap the cursor in InMemoryTrieCursor with the overlay
2402        let mut in_memory_account_cursor =
2403            InMemoryTrieCursor::new_account(&mut db_account_cursor, overlay);
2404
2405        for (path, _) in trie_updates.account_nodes_ref() {
2406            num_entries += 1;
2407            let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2408            changeset_cursor.append_dup(
2409                block_number,
2410                TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2411            )?;
2412        }
2413
2414        let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2415        storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2416
2417        num_entries += self.write_storage_trie_changesets(
2418            block_number,
2419            storage_updates.into_iter(),
2420            updates_overlay,
2421        )?;
2422
2423        Ok(num_entries)
2424    }
2425
2426    fn clear_trie_changesets(&self) -> ProviderResult<()> {
2427        let tx = self.tx_ref();
2428        tx.clear::<tables::AccountsTrieChangeSets>()?;
2429        tx.clear::<tables::StoragesTrieChangeSets>()?;
2430        Ok(())
2431    }
2432
2433    fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2434        let tx = self.tx_ref();
2435        {
2436            let range = from..;
2437            let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2438            let mut walker = cursor.walk_range(range)?;
2439
2440            while walker.next().transpose()?.is_some() {
2441                walker.delete_current()?;
2442            }
2443        }
2444
2445        {
2446            let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2447            let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2448            let mut walker = cursor.walk_range(range)?;
2449
2450            while walker.next().transpose()?.is_some() {
2451                walker.delete_current()?;
2452            }
2453        }
2454
2455        Ok(())
2456    }
2457}
2458
2459impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2460    fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2461        let tx = self.tx_ref();
2462
2463        // Read account trie changes directly into a Vec - data is already sorted by nibbles
2464        // within each block, and we want the oldest (first) version of each node sorted by path.
2465        let mut account_nodes = Vec::new();
2466        let mut seen_account_keys = HashSet::new();
2467        let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2468
2469        for entry in accounts_cursor.walk_range(from..)? {
2470            let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2471            // Only keep the first (oldest) version of each node
2472            if seen_account_keys.insert(nibbles.0) {
2473                account_nodes.push((nibbles.0, node));
2474            }
2475        }
2476
2477        account_nodes.sort_by_key(|(path, _)| *path);
2478
2479        // Read storage trie changes - data is sorted by (block, hashed_address, nibbles)
2480        // Keep track of seen (address, nibbles) pairs to only keep the oldest version per address,
2481        // sorted by path.
2482        let mut storage_tries = B256Map::<Vec<_>>::default();
2483        let mut seen_storage_keys = HashSet::new();
2484        let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2485
2486        // Create storage range starting from `from` block
2487        let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2488
2489        for entry in storages_cursor.walk_range(storage_range_start..)? {
2490            let (
2491                BlockNumberHashedAddress((_, hashed_address)),
2492                TrieChangeSetsEntry { nibbles, node },
2493            ) = entry?;
2494
2495            // Only keep the first (oldest) version of each node for this address
2496            if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2497                storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2498            }
2499        }
2500
2501        // Convert to StorageTrieUpdatesSorted
2502        let storage_tries = storage_tries
2503            .into_iter()
2504            .map(|(address, mut nodes)| {
2505                nodes.sort_by_key(|(path, _)| *path);
2506                (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2507            })
2508            .collect();
2509
2510        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2511    }
2512
2513    fn get_block_trie_updates(
2514        &self,
2515        block_number: BlockNumber,
2516    ) -> ProviderResult<TrieUpdatesSorted> {
2517        let tx = self.tx_ref();
2518
2519        // Step 1: Get the trie reverts for the state after the target block
2520        let reverts = self.trie_reverts(block_number + 1)?;
2521
2522        // Step 2: Create an InMemoryTrieCursorFactory with the reverts
2523        // This gives us the trie state as it was after the target block was processed
2524        let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2525        let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2526
2527        // Step 3: Collect all account trie nodes that changed in the target block
2528        let mut account_nodes = Vec::new();
2529
2530        // Walk through all account trie changes for this block
2531        let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2532        let mut account_cursor = cursor_factory.account_trie_cursor()?;
2533
2534        for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2535            let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2536            // Look up the current value of this trie node using the overlay cursor
2537            let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2538            account_nodes.push((nibbles.0, node_value));
2539        }
2540
2541        // Step 4: Collect all storage trie nodes that changed in the target block
2542        let mut storage_tries = B256Map::default();
2543        let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2544        let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2545        let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2546
2547        let mut current_hashed_address = None;
2548        let mut storage_cursor = None;
2549
2550        for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2551            let (
2552                BlockNumberHashedAddress((_, hashed_address)),
2553                TrieChangeSetsEntry { nibbles, .. },
2554            ) = entry?;
2555
2556            // Check if we need to create a new storage cursor for a different account
2557            if current_hashed_address != Some(hashed_address) {
2558                storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2559                current_hashed_address = Some(hashed_address);
2560            }
2561
2562            // Look up the current value of this storage trie node
2563            let cursor =
2564                storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2565            let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2566            storage_tries
2567                .entry(hashed_address)
2568                .or_insert_with(|| StorageTrieUpdatesSorted {
2569                    storage_nodes: Vec::new(),
2570                    is_deleted: false,
2571                })
2572                .storage_nodes
2573                .push((nibbles.0, node_value));
2574        }
2575
2576        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2577    }
2578}
2579
2580impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2581    /// Writes storage trie updates from the given storage trie map with already sorted updates.
2582    ///
2583    /// Expects the storage trie updates to already be sorted by the hashed address key.
2584    ///
2585    /// Returns the number of entries modified.
2586    fn write_storage_trie_updates_sorted<'a>(
2587        &self,
2588        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2589    ) -> ProviderResult<usize> {
2590        let mut num_entries = 0;
2591        let mut storage_tries = storage_tries.collect::<Vec<_>>();
2592        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2593        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2594        for (hashed_address, storage_trie_updates) in storage_tries {
2595            let mut db_storage_trie_cursor =
2596                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2597            num_entries +=
2598                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2599            cursor = db_storage_trie_cursor.cursor;
2600        }
2601
2602        Ok(num_entries)
2603    }
2604
2605    /// Records the current values of all trie nodes which will be updated using the
2606    /// `StorageTrieUpdates` into the storage trie changesets table.
2607    ///
2608    /// The intended usage of this method is to call it _prior_ to calling
2609    /// `write_storage_trie_updates` with the same set of `StorageTrieUpdates`.
2610    ///
2611    /// Returns the number of keys written.
2612    fn write_storage_trie_changesets<'a>(
2613        &self,
2614        block_number: BlockNumber,
2615        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2616        updates_overlay: Option<&TrieUpdatesSorted>,
2617    ) -> ProviderResult<usize> {
2618        let mut num_written = 0;
2619
2620        let mut changeset_cursor =
2621            self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2622
2623        // We hold two cursors to the same table because we use them simultaneously when an
2624        // account's storage is wiped. We keep them outside the for-loop so they can be re-used
2625        // between accounts.
2626        let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2627        let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2628
2629        // DatabaseStorageTrieCursor requires ownership of the cursor. The easiest way to deal with
2630        // this is to create this outer variable with an initial dummy account, and overwrite it on
2631        // every loop for every real account.
2632        let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2633            changed_curr_values_cursor,
2634            B256::default(), // Will be set per iteration
2635        );
2636        let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2637            wiped_nodes_cursor,
2638            B256::default(), // Will be set per iteration
2639        );
2640
2641        // Create empty TrieUpdatesSorted for when updates_overlay is None
2642        let empty_updates = TrieUpdatesSorted::default();
2643
2644        for (hashed_address, storage_trie_updates) in storage_tries {
2645            let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2646
2647            // Update the hashed address for the cursors
2648            changed_curr_values_cursor =
2649                DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2650
2651            // Get the overlay updates, or use empty updates
2652            let overlay = updates_overlay.unwrap_or(&empty_updates);
2653
2654            // Wrap the cursor in InMemoryTrieCursor with the overlay
2655            let mut in_memory_changed_cursor = InMemoryTrieCursor::new_storage(
2656                &mut changed_curr_values_cursor,
2657                overlay,
2658                *hashed_address,
2659            );
2660
2661            // Create an iterator which produces the current values of all updated paths, or None if
2662            // they are currently unset.
2663            let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2664                storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2665                &mut in_memory_changed_cursor,
2666            )?;
2667
2668            if storage_trie_updates.is_deleted() {
2669                // Create an iterator that starts from the beginning of the storage trie for this
2670                // account
2671                wiped_nodes_cursor =
2672                    DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2673
2674                // Wrap the wiped nodes cursor in InMemoryTrieCursor with the overlay
2675                let mut in_memory_wiped_cursor = InMemoryTrieCursor::new_storage(
2676                    &mut wiped_nodes_cursor,
2677                    overlay,
2678                    *hashed_address,
2679                );
2680
2681                let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2682
2683                for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2684                    let (path, node) = wiped?;
2685                    num_written += 1;
2686                    changeset_cursor.append_dup(
2687                        changeset_key,
2688                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2689                    )?;
2690                }
2691            } else {
2692                for curr_value in curr_values_of_changed {
2693                    let (path, node) = curr_value?;
2694                    num_written += 1;
2695                    changeset_cursor.append_dup(
2696                        changeset_key,
2697                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2698                    )?;
2699                }
2700            }
2701        }
2702
2703        Ok(num_written)
2704    }
2705}
2706
2707impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2708    fn unwind_account_hashing<'a>(
2709        &self,
2710        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2711    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2712        // Aggregate all block changesets and make a list of accounts that have been changed.
2713        // Note that collecting and then reversing the order is necessary to ensure that the
2714        // changes are applied in the correct order.
2715        let hashed_accounts = changesets
2716            .into_iter()
2717            .map(|(_, e)| (keccak256(e.address), e.info))
2718            .collect::<Vec<_>>()
2719            .into_iter()
2720            .rev()
2721            .collect::<BTreeMap<_, _>>();
2722
2723        // Apply values to HashedState, and remove the account if it's None.
2724        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2725        for (hashed_address, account) in &hashed_accounts {
2726            if let Some(account) = account {
2727                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2728            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2729                hashed_accounts_cursor.delete_current()?;
2730            }
2731        }
2732
2733        Ok(hashed_accounts)
2734    }
2735
2736    fn unwind_account_hashing_range(
2737        &self,
2738        range: impl RangeBounds<BlockNumber>,
2739    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2740        let changesets = self
2741            .tx
2742            .cursor_read::<tables::AccountChangeSets>()?
2743            .walk_range(range)?
2744            .collect::<Result<Vec<_>, _>>()?;
2745        self.unwind_account_hashing(changesets.iter())
2746    }
2747
2748    fn insert_account_for_hashing(
2749        &self,
2750        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2751    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2752        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2753        let hashed_accounts =
2754            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2755        for (hashed_address, account) in &hashed_accounts {
2756            if let Some(account) = account {
2757                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2758            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2759                hashed_accounts_cursor.delete_current()?;
2760            }
2761        }
2762        Ok(hashed_accounts)
2763    }
2764
2765    fn unwind_storage_hashing(
2766        &self,
2767        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2768    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2769        // Aggregate all block changesets and make list of accounts that have been changed.
2770        let mut hashed_storages = changesets
2771            .into_iter()
2772            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2773                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2774            })
2775            .collect::<Vec<_>>();
2776        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2777
2778        // Apply values to HashedState, and remove the account if it's None.
2779        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2780            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2781        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2782        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2783            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2784
2785            if hashed_storage
2786                .seek_by_key_subkey(hashed_address, key)?
2787                .filter(|entry| entry.key == key)
2788                .is_some()
2789            {
2790                hashed_storage.delete_current()?;
2791            }
2792
2793            if !value.is_zero() {
2794                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2795            }
2796        }
2797        Ok(hashed_storage_keys)
2798    }
2799
2800    fn unwind_storage_hashing_range(
2801        &self,
2802        range: impl RangeBounds<BlockNumberAddress>,
2803    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2804        let changesets = self
2805            .tx
2806            .cursor_read::<tables::StorageChangeSets>()?
2807            .walk_range(range)?
2808            .collect::<Result<Vec<_>, _>>()?;
2809        self.unwind_storage_hashing(changesets.into_iter())
2810    }
2811
2812    fn insert_storage_for_hashing(
2813        &self,
2814        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2815    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2816        // hash values
2817        let hashed_storages =
2818            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2819                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2820                    map.insert(keccak256(entry.key), entry.value);
2821                    map
2822                });
2823                map.insert(keccak256(address), storage);
2824                map
2825            });
2826
2827        let hashed_storage_keys = hashed_storages
2828            .iter()
2829            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2830            .collect();
2831
2832        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2833        // Hash the address and key and apply them to HashedStorage (if Storage is None
2834        // just remove it);
2835        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2836            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2837                if hashed_storage_cursor
2838                    .seek_by_key_subkey(hashed_address, key)?
2839                    .filter(|entry| entry.key == key)
2840                    .is_some()
2841                {
2842                    hashed_storage_cursor.delete_current()?;
2843                }
2844
2845                if !value.is_zero() {
2846                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2847                }
2848                Ok(())
2849            })
2850        })?;
2851
2852        Ok(hashed_storage_keys)
2853    }
2854}
2855
2856impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2857    fn unwind_account_history_indices<'a>(
2858        &self,
2859        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2860    ) -> ProviderResult<usize> {
2861        let mut last_indices = changesets
2862            .into_iter()
2863            .map(|(index, account)| (account.address, *index))
2864            .collect::<Vec<_>>();
2865        last_indices.sort_by_key(|(a, _)| *a);
2866
2867        // Unwind the account history index.
2868        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2869        for &(address, rem_index) in &last_indices {
2870            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2871                &mut cursor,
2872                ShardedKey::last(address),
2873                rem_index,
2874                |sharded_key| sharded_key.key == address,
2875            )?;
2876
2877            // Check the last returned partial shard.
2878            // If it's not empty, the shard needs to be reinserted.
2879            if !partial_shard.is_empty() {
2880                cursor.insert(
2881                    ShardedKey::last(address),
2882                    &BlockNumberList::new_pre_sorted(partial_shard),
2883                )?;
2884            }
2885        }
2886
2887        let changesets = last_indices.len();
2888        Ok(changesets)
2889    }
2890
2891    fn unwind_account_history_indices_range(
2892        &self,
2893        range: impl RangeBounds<BlockNumber>,
2894    ) -> ProviderResult<usize> {
2895        let changesets = self
2896            .tx
2897            .cursor_read::<tables::AccountChangeSets>()?
2898            .walk_range(range)?
2899            .collect::<Result<Vec<_>, _>>()?;
2900        self.unwind_account_history_indices(changesets.iter())
2901    }
2902
2903    fn insert_account_history_index(
2904        &self,
2905        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2906    ) -> ProviderResult<()> {
2907        self.append_history_index::<_, tables::AccountsHistory>(
2908            account_transitions,
2909            ShardedKey::new,
2910        )
2911    }
2912
2913    fn unwind_storage_history_indices(
2914        &self,
2915        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2916    ) -> ProviderResult<usize> {
2917        let mut storage_changesets = changesets
2918            .into_iter()
2919            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2920            .collect::<Vec<_>>();
2921        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2922
2923        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2924        for &(address, storage_key, rem_index) in &storage_changesets {
2925            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2926                &mut cursor,
2927                StorageShardedKey::last(address, storage_key),
2928                rem_index,
2929                |storage_sharded_key| {
2930                    storage_sharded_key.address == address &&
2931                        storage_sharded_key.sharded_key.key == storage_key
2932                },
2933            )?;
2934
2935            // Check the last returned partial shard.
2936            // If it's not empty, the shard needs to be reinserted.
2937            if !partial_shard.is_empty() {
2938                cursor.insert(
2939                    StorageShardedKey::last(address, storage_key),
2940                    &BlockNumberList::new_pre_sorted(partial_shard),
2941                )?;
2942            }
2943        }
2944
2945        let changesets = storage_changesets.len();
2946        Ok(changesets)
2947    }
2948
2949    fn unwind_storage_history_indices_range(
2950        &self,
2951        range: impl RangeBounds<BlockNumberAddress>,
2952    ) -> ProviderResult<usize> {
2953        let changesets = self
2954            .tx
2955            .cursor_read::<tables::StorageChangeSets>()?
2956            .walk_range(range)?
2957            .collect::<Result<Vec<_>, _>>()?;
2958        self.unwind_storage_history_indices(changesets.into_iter())
2959    }
2960
2961    fn insert_storage_history_index(
2962        &self,
2963        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2964    ) -> ProviderResult<()> {
2965        self.append_history_index::<_, tables::StoragesHistory>(
2966            storage_transitions,
2967            |(address, storage_key), highest_block_number| {
2968                StorageShardedKey::new(address, storage_key, highest_block_number)
2969            },
2970        )
2971    }
2972
2973    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2974        // account history stage
2975        {
2976            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2977            self.insert_account_history_index(indices)?;
2978        }
2979
2980        // storage history stage
2981        {
2982            let indices = self.changed_storages_and_blocks_with_range(range)?;
2983            self.insert_storage_history_index(indices)?;
2984        }
2985
2986        Ok(())
2987    }
2988}
2989
2990impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2991    for DatabaseProvider<TX, N>
2992{
2993    fn take_block_and_execution_above(
2994        &self,
2995        block: BlockNumber,
2996    ) -> ProviderResult<Chain<Self::Primitives>> {
2997        let range = block + 1..=self.last_block_number()?;
2998
2999        self.unwind_trie_state_from(block + 1)?;
3000
3001        // get execution res
3002        let execution_state = self.take_state_above(block)?;
3003
3004        let blocks = self.recovered_block_range(range)?;
3005
3006        // remove block bodies it is needed for both get block range and get block execution results
3007        // that is why it is deleted afterwards.
3008        self.remove_blocks_above(block)?;
3009
3010        // Update pipeline progress
3011        self.update_pipeline_stages(block, true)?;
3012
3013        Ok(Chain::new(blocks, execution_state, BTreeMap::new(), BTreeMap::new()))
3014    }
3015
3016    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3017        self.unwind_trie_state_from(block + 1)?;
3018
3019        // remove execution res
3020        self.remove_state_above(block)?;
3021
3022        // remove block bodies it is needed for both get block range and get block execution results
3023        // that is why it is deleted afterwards.
3024        self.remove_blocks_above(block)?;
3025
3026        // Update pipeline progress
3027        self.update_pipeline_stages(block, true)?;
3028
3029        Ok(())
3030    }
3031}
3032
3033impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
3034    for DatabaseProvider<TX, N>
3035{
3036    type Block = BlockTy<N>;
3037    type Receipt = ReceiptTy<N>;
3038
3039    /// Inserts the block into the database, always modifying the following static file segments and
3040    /// tables:
3041    /// * [`StaticFileSegment::Headers`]
3042    /// * [`tables::HeaderNumbers`]
3043    /// * [`tables::BlockBodyIndices`]
3044    ///
3045    /// If there are transactions in the block, the following static file segments and tables will
3046    /// be modified:
3047    /// * [`StaticFileSegment::Transactions`]
3048    /// * [`tables::TransactionBlocks`]
3049    ///
3050    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
3051    /// If withdrawals are not empty, this will modify
3052    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
3053    ///
3054    /// If the provider has __not__ configured full sender pruning, this will modify either:
3055    /// * [`StaticFileSegment::TransactionSenders`] if senders are written to static files
3056    /// * [`tables::TransactionSenders`] if senders are written to the database
3057    ///
3058    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
3059    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
3060    fn insert_block(
3061        &self,
3062        block: &RecoveredBlock<Self::Block>,
3063    ) -> ProviderResult<StoredBlockBodyIndices> {
3064        let block_number = block.number();
3065        let tx_count = block.body().transaction_count() as u64;
3066
3067        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3068
3069        self.static_file_provider
3070            .get_writer(block_number, StaticFileSegment::Headers)?
3071            .append_header(block.header(), &block.hash())?;
3072
3073        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
3074        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
3075
3076        let first_tx_num = self
3077            .tx
3078            .cursor_read::<tables::TransactionBlocks>()?
3079            .last()?
3080            .map(|(n, _)| n + 1)
3081            .unwrap_or_default();
3082        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
3083
3084        let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
3085
3086        if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
3087            let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
3088            senders_writer.increment_block(block.number())?;
3089            senders_writer
3090                .append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
3091            durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
3092        }
3093
3094        if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
3095            self.with_rocksdb_batch(|batch| {
3096                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3097                for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
3098                    let hash = transaction.tx_hash();
3099                    writer.put_transaction_hash_number(*hash, tx_num, false)?;
3100                }
3101                Ok(((), writer.into_raw_rocksdb_batch()))
3102            })?;
3103            durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
3104        }
3105
3106        self.append_block_bodies(vec![(block_number, Some(block.body()))])?;
3107
3108        debug!(
3109            target: "providers::db",
3110            ?block_number,
3111            actions = ?durations_recorder.actions,
3112            "Inserted block"
3113        );
3114
3115        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
3116    }
3117
3118    fn append_block_bodies(
3119        &self,
3120        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3121    ) -> ProviderResult<()> {
3122        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3123
3124        // Initialize writer if we will be writing transactions to staticfiles
3125        let mut tx_writer =
3126            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3127
3128        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3129        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3130
3131        // Get id for the next tx_num or zero if there are no transactions.
3132        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3133
3134        for (block_number, body) in &bodies {
3135            // Increment block on static file header.
3136            tx_writer.increment_block(*block_number)?;
3137
3138            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3139            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3140
3141            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3142
3143            // insert block meta
3144            block_indices_cursor.append(*block_number, &block_indices)?;
3145
3146            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3147
3148            let Some(body) = body else { continue };
3149
3150            // write transaction block index
3151            if !body.transactions().is_empty() {
3152                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3153                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3154            }
3155
3156            // write transactions
3157            for transaction in body.transactions() {
3158                tx_writer.append_transaction(next_tx_num, transaction)?;
3159
3160                // Increment transaction id for each transaction.
3161                next_tx_num += 1;
3162            }
3163        }
3164
3165        self.storage.writer().write_block_bodies(self, bodies)?;
3166
3167        Ok(())
3168    }
3169
3170    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3171        let last_block_number = self.last_block_number()?;
3172        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3173        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3174            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3175        }
3176
3177        // Get highest static file block for the total block range
3178        let highest_static_file_block = self
3179            .static_file_provider()
3180            .get_highest_static_file_block(StaticFileSegment::Headers)
3181            .expect("todo: error handling, headers should exist");
3182
3183        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3184        // we remove only what is ABOVE the block.
3185        //
3186        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3187        // will have three blocks to remove, which will be block 8, 7, and 6.
3188        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3189        self.static_file_provider()
3190            .get_writer(block, StaticFileSegment::Headers)?
3191            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3192
3193        // First transaction to be removed
3194        let unwind_tx_from = self
3195            .block_body_indices(block)?
3196            .map(|b| b.next_tx_num())
3197            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3198
3199        // Last transaction to be removed
3200        let unwind_tx_to = self
3201            .tx
3202            .cursor_read::<tables::BlockBodyIndices>()?
3203            .last()?
3204            // shouldn't happen because this was OK above
3205            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3206            .1
3207            .last_tx_num();
3208
3209        if unwind_tx_from <= unwind_tx_to {
3210            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3211            self.with_rocksdb_batch(|batch| {
3212                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3213                for (hash, _) in hashes {
3214                    writer.delete_transaction_hash_number(hash)?;
3215                }
3216                Ok(((), writer.into_raw_rocksdb_batch()))
3217            })?;
3218        }
3219
3220        EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3221
3222        self.remove_bodies_above(block)?;
3223
3224        Ok(())
3225    }
3226
3227    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3228        self.storage.writer().remove_block_bodies_above(self, block)?;
3229
3230        // First transaction to be removed
3231        let unwind_tx_from = self
3232            .block_body_indices(block)?
3233            .map(|b| b.next_tx_num())
3234            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3235
3236        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3237        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3238
3239        let static_file_tx_num =
3240            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3241
3242        let to_delete = static_file_tx_num
3243            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3244            .unwrap_or_default();
3245
3246        self.static_file_provider
3247            .latest_writer(StaticFileSegment::Transactions)?
3248            .prune_transactions(to_delete, block)?;
3249
3250        Ok(())
3251    }
3252
3253    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3254    fn append_blocks_with_state(
3255        &self,
3256        blocks: Vec<RecoveredBlock<Self::Block>>,
3257        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3258        hashed_state: HashedPostStateSorted,
3259    ) -> ProviderResult<()> {
3260        if blocks.is_empty() {
3261            debug!(target: "providers::db", "Attempted to append empty block range");
3262            return Ok(())
3263        }
3264
3265        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3266        // `None`.
3267        let first_number = blocks[0].number();
3268
3269        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3270        // `None`.
3271        let last_block_number = blocks[blocks.len() - 1].number();
3272
3273        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3274
3275        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3276        // This is necessary because with edge storage, changesets are written to static files
3277        // whose index isn't updated until commit, making them invisible to subsequent reads
3278        // within the same transaction.
3279        let (account_transitions, storage_transitions) = {
3280            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3281            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3282            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3283                let block_number = first_number + block_idx as u64;
3284                for (address, account_revert) in block_reverts {
3285                    account_transitions.entry(*address).or_default().push(block_number);
3286                    for storage_key in account_revert.storage.keys() {
3287                        let key = B256::new(storage_key.to_be_bytes());
3288                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3289                    }
3290                }
3291            }
3292            (account_transitions, storage_transitions)
3293        };
3294
3295        // Insert the blocks
3296        for block in blocks {
3297            self.insert_block(&block)?;
3298            durations_recorder.record_relative(metrics::Action::InsertBlock);
3299        }
3300
3301        self.write_state(execution_outcome, OriginalValuesKnown::No)?;
3302        durations_recorder.record_relative(metrics::Action::InsertState);
3303
3304        // insert hashes and intermediate merkle nodes
3305        self.write_hashed_state(&hashed_state)?;
3306        durations_recorder.record_relative(metrics::Action::InsertHashes);
3307
3308        // Use pre-computed transitions for history indices since static file
3309        // writes aren't visible until commit.
3310        self.insert_account_history_index(account_transitions)?;
3311        self.insert_storage_history_index(storage_transitions)?;
3312        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3313
3314        // Update pipeline progress
3315        self.update_pipeline_stages(last_block_number, false)?;
3316        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3317
3318        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3319
3320        Ok(())
3321    }
3322}
3323
3324impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3325    fn get_prune_checkpoint(
3326        &self,
3327        segment: PruneSegment,
3328    ) -> ProviderResult<Option<PruneCheckpoint>> {
3329        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3330    }
3331
3332    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3333        Ok(PruneSegment::variants()
3334            .filter_map(|segment| {
3335                self.tx
3336                    .get::<tables::PruneCheckpoints>(segment)
3337                    .transpose()
3338                    .map(|chk| chk.map(|chk| (segment, chk)))
3339            })
3340            .collect::<Result<_, _>>()?)
3341    }
3342}
3343
3344impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3345    fn save_prune_checkpoint(
3346        &self,
3347        segment: PruneSegment,
3348        checkpoint: PruneCheckpoint,
3349    ) -> ProviderResult<()> {
3350        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3351    }
3352}
3353
3354impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3355    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3356        let db_entries = self.tx.entries::<T>()?;
3357        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3358            Ok(entries) => entries,
3359            Err(ProviderError::UnsupportedProvider) => 0,
3360            Err(err) => return Err(err),
3361        };
3362
3363        Ok(db_entries + static_file_entries)
3364    }
3365}
3366
3367impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3368    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3369        let mut finalized_blocks = self
3370            .tx
3371            .cursor_read::<tables::ChainState>()?
3372            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3373            .take(1)
3374            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3375
3376        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3377        Ok(last_finalized_block_number)
3378    }
3379
3380    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3381        let mut finalized_blocks = self
3382            .tx
3383            .cursor_read::<tables::ChainState>()?
3384            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3385            .take(1)
3386            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3387
3388        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3389        Ok(last_finalized_block_number)
3390    }
3391}
3392
3393impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3394    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3395        Ok(self
3396            .tx
3397            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3398    }
3399
3400    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3401        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3402    }
3403}
3404
3405impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3406    type Tx = TX;
3407
3408    fn tx_ref(&self) -> &Self::Tx {
3409        &self.tx
3410    }
3411
3412    fn tx_mut(&mut self) -> &mut Self::Tx {
3413        &mut self.tx
3414    }
3415
3416    fn into_tx(self) -> Self::Tx {
3417        self.tx
3418    }
3419
3420    fn prune_modes_ref(&self) -> &PruneModes {
3421        self.prune_modes_ref()
3422    }
3423
3424    /// Commit database transaction, static files, and pending `RocksDB` batches.
3425    fn commit(self) -> ProviderResult<bool> {
3426        // For unwinding it makes more sense to commit the database first, since if
3427        // it is interrupted before the static files commit, we can just
3428        // truncate the static files according to the
3429        // checkpoints on the next start-up.
3430        if self.static_file_provider.has_unwind_queued() {
3431            self.tx.commit()?;
3432
3433            #[cfg(all(unix, feature = "rocksdb"))]
3434            {
3435                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3436                for batch in batches {
3437                    self.rocksdb_provider.commit_batch(batch)?;
3438                }
3439            }
3440
3441            self.static_file_provider.commit()?;
3442        } else {
3443            self.static_file_provider.commit()?;
3444
3445            #[cfg(all(unix, feature = "rocksdb"))]
3446            {
3447                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3448                for batch in batches {
3449                    self.rocksdb_provider.commit_batch(batch)?;
3450                }
3451            }
3452
3453            self.tx.commit()?;
3454        }
3455
3456        Ok(true)
3457    }
3458}
3459
3460impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3461    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3462        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3463    }
3464}
3465
3466impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3467    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3468        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3469    }
3470}
3471
3472impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3473    fn cached_storage_settings(&self) -> StorageSettings {
3474        *self.storage_settings.read()
3475    }
3476
3477    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3478        *self.storage_settings.write() = settings;
3479    }
3480}
3481
3482#[cfg(test)]
3483mod tests {
3484    use super::*;
3485    use crate::{
3486        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3487        BlockWriter,
3488    };
3489    use reth_ethereum_primitives::Receipt;
3490    use reth_testing_utils::generators::{self, random_block, BlockParams};
3491    use reth_trie::Nibbles;
3492
3493    #[test]
3494    fn test_receipts_by_block_range_empty_range() {
3495        let factory = create_test_provider_factory();
3496        let provider = factory.provider().unwrap();
3497
3498        // empty range should return empty vec
3499        let start = 10u64;
3500        let end = 9u64;
3501        let result = provider.receipts_by_block_range(start..=end).unwrap();
3502        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3503    }
3504
3505    #[test]
3506    fn test_receipts_by_block_range_nonexistent_blocks() {
3507        let factory = create_test_provider_factory();
3508        let provider = factory.provider().unwrap();
3509
3510        // non-existent blocks should return empty vecs for each block
3511        let result = provider.receipts_by_block_range(10..=12).unwrap();
3512        assert_eq!(result, vec![vec![], vec![], vec![]]);
3513    }
3514
3515    #[test]
3516    fn test_receipts_by_block_range_single_block() {
3517        let factory = create_test_provider_factory();
3518        let data = BlockchainTestData::default();
3519
3520        let provider_rw = factory.provider_rw().unwrap();
3521        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3522        provider_rw
3523            .write_state(
3524                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3525                crate::OriginalValuesKnown::No,
3526            )
3527            .unwrap();
3528        provider_rw.insert_block(&data.blocks[0].0).unwrap();
3529        provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3530        provider_rw.commit().unwrap();
3531
3532        let provider = factory.provider().unwrap();
3533        let result = provider.receipts_by_block_range(1..=1).unwrap();
3534
3535        // should have one vec with one receipt
3536        assert_eq!(result.len(), 1);
3537        assert_eq!(result[0].len(), 1);
3538        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3539    }
3540
3541    #[test]
3542    fn test_receipts_by_block_range_multiple_blocks() {
3543        let factory = create_test_provider_factory();
3544        let data = BlockchainTestData::default();
3545
3546        let provider_rw = factory.provider_rw().unwrap();
3547        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3548        provider_rw
3549            .write_state(
3550                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3551                crate::OriginalValuesKnown::No,
3552            )
3553            .unwrap();
3554        for i in 0..3 {
3555            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3556            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3557        }
3558        provider_rw.commit().unwrap();
3559
3560        let provider = factory.provider().unwrap();
3561        let result = provider.receipts_by_block_range(1..=3).unwrap();
3562
3563        // should have 3 vecs, each with one receipt
3564        assert_eq!(result.len(), 3);
3565        for (i, block_receipts) in result.iter().enumerate() {
3566            assert_eq!(block_receipts.len(), 1);
3567            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3568        }
3569    }
3570
3571    #[test]
3572    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3573        let factory = create_test_provider_factory();
3574        let data = BlockchainTestData::default();
3575
3576        let provider_rw = factory.provider_rw().unwrap();
3577        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3578        provider_rw
3579            .write_state(
3580                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3581                crate::OriginalValuesKnown::No,
3582            )
3583            .unwrap();
3584
3585        // insert blocks 1-3 with receipts
3586        for i in 0..3 {
3587            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3588            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3589        }
3590        provider_rw.commit().unwrap();
3591
3592        let provider = factory.provider().unwrap();
3593        let result = provider.receipts_by_block_range(1..=3).unwrap();
3594
3595        // verify each block has one receipt
3596        assert_eq!(result.len(), 3);
3597        for block_receipts in &result {
3598            assert_eq!(block_receipts.len(), 1);
3599        }
3600    }
3601
3602    #[test]
3603    fn test_receipts_by_block_range_partial_range() {
3604        let factory = create_test_provider_factory();
3605        let data = BlockchainTestData::default();
3606
3607        let provider_rw = factory.provider_rw().unwrap();
3608        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3609        provider_rw
3610            .write_state(
3611                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3612                crate::OriginalValuesKnown::No,
3613            )
3614            .unwrap();
3615        for i in 0..3 {
3616            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3617            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3618        }
3619        provider_rw.commit().unwrap();
3620
3621        let provider = factory.provider().unwrap();
3622
3623        // request range that includes both existing and non-existing blocks
3624        let result = provider.receipts_by_block_range(2..=5).unwrap();
3625        assert_eq!(result.len(), 4);
3626
3627        // blocks 2-3 should have receipts, blocks 4-5 should be empty
3628        assert_eq!(result[0].len(), 1); // block 2
3629        assert_eq!(result[1].len(), 1); // block 3
3630        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
3631        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
3632
3633        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3634        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3635    }
3636
3637    #[test]
3638    fn test_receipts_by_block_range_all_empty_blocks() {
3639        let factory = create_test_provider_factory();
3640        let mut rng = generators::rng();
3641
3642        // create blocks with no transactions
3643        let mut blocks = Vec::new();
3644        for i in 0..3 {
3645            let block =
3646                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3647            blocks.push(block);
3648        }
3649
3650        let provider_rw = factory.provider_rw().unwrap();
3651        for block in blocks {
3652            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
3653        }
3654        provider_rw.commit().unwrap();
3655
3656        let provider = factory.provider().unwrap();
3657        let result = provider.receipts_by_block_range(1..=3).unwrap();
3658
3659        assert_eq!(result.len(), 3);
3660        for block_receipts in result {
3661            assert_eq!(block_receipts.len(), 0);
3662        }
3663    }
3664
3665    #[test]
3666    fn test_receipts_by_block_range_consistency_with_individual_calls() {
3667        let factory = create_test_provider_factory();
3668        let data = BlockchainTestData::default();
3669
3670        let provider_rw = factory.provider_rw().unwrap();
3671        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3672        provider_rw
3673            .write_state(
3674                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3675                crate::OriginalValuesKnown::No,
3676            )
3677            .unwrap();
3678        for i in 0..3 {
3679            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3680            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3681        }
3682        provider_rw.commit().unwrap();
3683
3684        let provider = factory.provider().unwrap();
3685
3686        // get receipts using block range method
3687        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3688
3689        // get receipts using individual block calls
3690        let mut individual_results = Vec::new();
3691        for block_num in 1..=3 {
3692            let receipts =
3693                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3694            individual_results.push(receipts);
3695        }
3696
3697        assert_eq!(range_result, individual_results);
3698    }
3699
3700    #[test]
3701    fn test_write_trie_changesets() {
3702        use reth_db_api::models::BlockNumberHashedAddress;
3703        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3704
3705        let factory = create_test_provider_factory();
3706        let provider_rw = factory.provider_rw().unwrap();
3707
3708        let block_number = 1u64;
3709
3710        // Create some test nibbles and nodes
3711        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3712        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3713
3714        let node1 = BranchNodeCompact::new(
3715            0b1111_1111_1111_1111, // state_mask
3716            0b0000_0000_0000_0000, // tree_mask
3717            0b0000_0000_0000_0000, // hash_mask
3718            vec![],                // hashes
3719            None,                  // root hash
3720        );
3721
3722        // Pre-populate AccountsTrie with a node that will be updated (for account_nibbles1)
3723        {
3724            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3725            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3726        }
3727
3728        // Create account trie updates: one Some (update) and one None (removal)
3729        let account_nodes = vec![
3730            (account_nibbles1, Some(node1.clone())), // This will update existing node
3731            (account_nibbles2, None),                // This will be a removal (no existing node)
3732        ];
3733
3734        // Create storage trie updates
3735        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3736        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3737
3738        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3739        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3740        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3741
3742        let storage_node1 = BranchNodeCompact::new(
3743            0b1111_0000_0000_0000,
3744            0b0000_0000_0000_0000,
3745            0b0000_0000_0000_0000,
3746            vec![],
3747            None,
3748        );
3749
3750        let storage_node2 = BranchNodeCompact::new(
3751            0b0000_1111_0000_0000,
3752            0b0000_0000_0000_0000,
3753            0b0000_0000_0000_0000,
3754            vec![],
3755            None,
3756        );
3757
3758        // Create an old version of storage_node1 to prepopulate
3759        let storage_node1_old = BranchNodeCompact::new(
3760            0b1010_0000_0000_0000, // Different mask to show it's an old value
3761            0b0000_0000_0000_0000,
3762            0b0000_0000_0000_0000,
3763            vec![],
3764            None,
3765        );
3766
3767        // Pre-populate StoragesTrie for normal storage (storage_address1)
3768        {
3769            let mut cursor =
3770                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3771            // Add node that will be updated (storage_nibbles1) with old value
3772            let entry = StorageTrieEntry {
3773                nibbles: StoredNibblesSubKey(storage_nibbles1),
3774                node: storage_node1_old.clone(),
3775            };
3776            cursor.upsert(storage_address1, &entry).unwrap();
3777        }
3778
3779        // Pre-populate StoragesTrie for wiped storage (storage_address2)
3780        {
3781            let mut cursor =
3782                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3783            // Add node that will be updated (storage_nibbles1)
3784            let entry1 = StorageTrieEntry {
3785                nibbles: StoredNibblesSubKey(storage_nibbles1),
3786                node: storage_node1.clone(),
3787            };
3788            cursor.upsert(storage_address2, &entry1).unwrap();
3789            // Add node that won't be updated but exists (storage_nibbles3)
3790            let entry3 = StorageTrieEntry {
3791                nibbles: StoredNibblesSubKey(storage_nibbles3),
3792                node: storage_node2.clone(),
3793            };
3794            cursor.upsert(storage_address2, &entry3).unwrap();
3795        }
3796
3797        // Normal storage trie: one Some (update) and one None (new)
3798        let storage_trie1 = StorageTrieUpdatesSorted {
3799            is_deleted: false,
3800            storage_nodes: vec![
3801                (storage_nibbles1, Some(storage_node1.clone())), // This will update existing node
3802                (storage_nibbles2, None),                        // This is a new node
3803            ],
3804        };
3805
3806        // Wiped storage trie
3807        let storage_trie2 = StorageTrieUpdatesSorted {
3808            is_deleted: true,
3809            storage_nodes: vec![
3810                (storage_nibbles1, Some(storage_node1.clone())), // Updated node already in db
3811                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in db
3812                                                                  * storage_nibbles3 is in db
3813                                                                  * but not updated */
3814            ],
3815        };
3816
3817        let mut storage_tries = B256Map::default();
3818        storage_tries.insert(storage_address1, storage_trie1);
3819        storage_tries.insert(storage_address2, storage_trie2);
3820
3821        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3822
3823        // Write the changesets
3824        let num_written =
3825            provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3826
3827        // Verify number of entries written
3828        // Account changesets: 2 (one update, one removal)
3829        // Storage changesets:
3830        //   - Normal storage: 2 (one update, one removal)
3831        //   - Wiped storage: 3 (two updated, one existing not updated)
3832        // Total: 2 + 2 + 3 = 7
3833        assert_eq!(num_written, 7);
3834
3835        // Verify account changesets were written correctly
3836        {
3837            let mut cursor =
3838                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3839
3840            // Get all entries for this block to see what was written
3841            let all_entries = cursor
3842                .walk_dup(Some(block_number), None)
3843                .unwrap()
3844                .collect::<Result<Vec<_>, _>>()
3845                .unwrap();
3846
3847            // Assert the full value of all_entries in a single assert_eq
3848            assert_eq!(
3849                all_entries,
3850                vec![
3851                    (
3852                        block_number,
3853                        TrieChangeSetsEntry {
3854                            nibbles: StoredNibblesSubKey(account_nibbles1),
3855                            node: Some(node1),
3856                        }
3857                    ),
3858                    (
3859                        block_number,
3860                        TrieChangeSetsEntry {
3861                            nibbles: StoredNibblesSubKey(account_nibbles2),
3862                            node: None,
3863                        }
3864                    ),
3865                ]
3866            );
3867        }
3868
3869        // Verify storage changesets were written correctly
3870        {
3871            let mut cursor =
3872                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3873
3874            // Check normal storage trie changesets
3875            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3876            let entries1 =
3877                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3878
3879            assert_eq!(
3880                entries1,
3881                vec![
3882                    (
3883                        key1,
3884                        TrieChangeSetsEntry {
3885                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3886                            node: Some(storage_node1_old), // Old value that was prepopulated
3887                        }
3888                    ),
3889                    (
3890                        key1,
3891                        TrieChangeSetsEntry {
3892                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3893                            node: None, // New node, no previous value
3894                        }
3895                    ),
3896                ]
3897            );
3898
3899            // Check wiped storage trie changesets
3900            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3901            let entries2 =
3902                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3903
3904            assert_eq!(
3905                entries2,
3906                vec![
3907                    (
3908                        key2,
3909                        TrieChangeSetsEntry {
3910                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3911                            node: Some(storage_node1), // Was in db, so has old value
3912                        }
3913                    ),
3914                    (
3915                        key2,
3916                        TrieChangeSetsEntry {
3917                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3918                            node: None, // Was not in db
3919                        }
3920                    ),
3921                    (
3922                        key2,
3923                        TrieChangeSetsEntry {
3924                            nibbles: StoredNibblesSubKey(storage_nibbles3),
3925                            node: Some(storage_node2), // Existing node in wiped storage
3926                        }
3927                    ),
3928                ]
3929            );
3930        }
3931
3932        provider_rw.commit().unwrap();
3933    }
3934
3935    #[test]
3936    fn test_write_trie_changesets_with_overlay() {
3937        use reth_db_api::models::BlockNumberHashedAddress;
3938        use reth_trie::BranchNodeCompact;
3939
3940        let factory = create_test_provider_factory();
3941        let provider_rw = factory.provider_rw().unwrap();
3942
3943        let block_number = 1u64;
3944
3945        // Create some test nibbles and nodes
3946        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3947        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3948
3949        let node1 = BranchNodeCompact::new(
3950            0b1111_1111_1111_1111, // state_mask
3951            0b0000_0000_0000_0000, // tree_mask
3952            0b0000_0000_0000_0000, // hash_mask
3953            vec![],                // hashes
3954            None,                  // root hash
3955        );
3956
3957        // NOTE: Unlike the previous test, we're NOT pre-populating the database
3958        // All node values will come from the overlay
3959
3960        // Create the overlay with existing values that would normally be in the DB
3961        let node1_old = BranchNodeCompact::new(
3962            0b1010_1010_1010_1010, // Different mask to show it's the overlay "existing" value
3963            0b0000_0000_0000_0000,
3964            0b0000_0000_0000_0000,
3965            vec![],
3966            None,
3967        );
3968
3969        // Create overlay account nodes
3970        let overlay_account_nodes = vec![
3971            (account_nibbles1, Some(node1_old.clone())), // This simulates existing node in overlay
3972        ];
3973
3974        // Create account trie updates: one Some (update) and one None (removal)
3975        let account_nodes = vec![
3976            (account_nibbles1, Some(node1)), // This will update overlay node
3977            (account_nibbles2, None),        // This will be a removal (no existing node)
3978        ];
3979
3980        // Create storage trie updates
3981        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3982        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3983
3984        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3985        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3986        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3987
3988        let storage_node1 = BranchNodeCompact::new(
3989            0b1111_0000_0000_0000,
3990            0b0000_0000_0000_0000,
3991            0b0000_0000_0000_0000,
3992            vec![],
3993            None,
3994        );
3995
3996        let storage_node2 = BranchNodeCompact::new(
3997            0b0000_1111_0000_0000,
3998            0b0000_0000_0000_0000,
3999            0b0000_0000_0000_0000,
4000            vec![],
4001            None,
4002        );
4003
4004        // Create old versions for overlay
4005        let storage_node1_old = BranchNodeCompact::new(
4006            0b1010_0000_0000_0000, // Different mask to show it's an old value
4007            0b0000_0000_0000_0000,
4008            0b0000_0000_0000_0000,
4009            vec![],
4010            None,
4011        );
4012
4013        // Create overlay storage nodes
4014        let mut overlay_storage_tries = B256Map::default();
4015
4016        // Overlay for normal storage (storage_address1)
4017        let overlay_storage_trie1 = StorageTrieUpdatesSorted {
4018            is_deleted: false,
4019            storage_nodes: vec![
4020                (storage_nibbles1, Some(storage_node1_old.clone())), /* Simulates existing in
4021                                                                      * overlay */
4022            ],
4023        };
4024
4025        // Overlay for wiped storage (storage_address2)
4026        let overlay_storage_trie2 = StorageTrieUpdatesSorted {
4027            is_deleted: false,
4028            storage_nodes: vec![
4029                (storage_nibbles1, Some(storage_node1.clone())), // Existing in overlay
4030                (storage_nibbles3, Some(storage_node2.clone())), // Also existing in overlay
4031            ],
4032        };
4033
4034        overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
4035        overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
4036
4037        let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
4038
4039        // Normal storage trie: one Some (update) and one None (new)
4040        let storage_trie1 = StorageTrieUpdatesSorted {
4041            is_deleted: false,
4042            storage_nodes: vec![
4043                (storage_nibbles1, Some(storage_node1.clone())), // This will update overlay node
4044                (storage_nibbles2, None),                        // This is a new node
4045            ],
4046        };
4047
4048        // Wiped storage trie
4049        let storage_trie2 = StorageTrieUpdatesSorted {
4050            is_deleted: true,
4051            storage_nodes: vec![
4052                (storage_nibbles1, Some(storage_node1.clone())), // Updated node from overlay
4053                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in overlay
4054                                                                  * storage_nibbles3 is in
4055                                                                  * overlay
4056                                                                  * but not updated */
4057            ],
4058        };
4059
4060        let mut storage_tries = B256Map::default();
4061        storage_tries.insert(storage_address1, storage_trie1);
4062        storage_tries.insert(storage_address2, storage_trie2);
4063
4064        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4065
4066        // Write the changesets WITH OVERLAY
4067        let num_written =
4068            provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
4069
4070        // Verify number of entries written
4071        // Account changesets: 2 (one update from overlay, one removal)
4072        // Storage changesets:
4073        //   - Normal storage: 2 (one update from overlay, one new)
4074        //   - Wiped storage: 3 (two updated, one existing from overlay not updated)
4075        // Total: 2 + 2 + 3 = 7
4076        assert_eq!(num_written, 7);
4077
4078        // Verify account changesets were written correctly
4079        {
4080            let mut cursor =
4081                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4082
4083            // Get all entries for this block to see what was written
4084            let all_entries = cursor
4085                .walk_dup(Some(block_number), None)
4086                .unwrap()
4087                .collect::<Result<Vec<_>, _>>()
4088                .unwrap();
4089
4090            // Assert the full value of all_entries in a single assert_eq
4091            assert_eq!(
4092                all_entries,
4093                vec![
4094                    (
4095                        block_number,
4096                        TrieChangeSetsEntry {
4097                            nibbles: StoredNibblesSubKey(account_nibbles1),
4098                            node: Some(node1_old), // Value from overlay, not DB
4099                        }
4100                    ),
4101                    (
4102                        block_number,
4103                        TrieChangeSetsEntry {
4104                            nibbles: StoredNibblesSubKey(account_nibbles2),
4105                            node: None,
4106                        }
4107                    ),
4108                ]
4109            );
4110        }
4111
4112        // Verify storage changesets were written correctly
4113        {
4114            let mut cursor =
4115                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4116
4117            // Check normal storage trie changesets
4118            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
4119            let entries1 =
4120                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
4121
4122            assert_eq!(
4123                entries1,
4124                vec![
4125                    (
4126                        key1,
4127                        TrieChangeSetsEntry {
4128                            nibbles: StoredNibblesSubKey(storage_nibbles1),
4129                            node: Some(storage_node1_old), // Old value from overlay
4130                        }
4131                    ),
4132                    (
4133                        key1,
4134                        TrieChangeSetsEntry {
4135                            nibbles: StoredNibblesSubKey(storage_nibbles2),
4136                            node: None, // New node, no previous value
4137                        }
4138                    ),
4139                ]
4140            );
4141
4142            // Check wiped storage trie changesets
4143            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
4144            let entries2 =
4145                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
4146
4147            assert_eq!(
4148                entries2,
4149                vec![
4150                    (
4151                        key2,
4152                        TrieChangeSetsEntry {
4153                            nibbles: StoredNibblesSubKey(storage_nibbles1),
4154                            node: Some(storage_node1), // Value from overlay
4155                        }
4156                    ),
4157                    (
4158                        key2,
4159                        TrieChangeSetsEntry {
4160                            nibbles: StoredNibblesSubKey(storage_nibbles2),
4161                            node: None, // Was not in overlay
4162                        }
4163                    ),
4164                    (
4165                        key2,
4166                        TrieChangeSetsEntry {
4167                            nibbles: StoredNibblesSubKey(storage_nibbles3),
4168                            node: Some(storage_node2), /* Existing node from overlay in wiped
4169                                                        * storage */
4170                        }
4171                    ),
4172                ]
4173            );
4174        }
4175
4176        provider_rw.commit().unwrap();
4177    }
4178
4179    #[test]
4180    fn test_clear_trie_changesets_from() {
4181        use alloy_primitives::hex_literal::hex;
4182        use reth_db_api::models::BlockNumberHashedAddress;
4183        use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
4184
4185        let factory = create_test_provider_factory();
4186
4187        // Create some test data for different block numbers
4188        let block1 = 100u64;
4189        let block2 = 101u64;
4190        let block3 = 102u64;
4191        let block4 = 103u64;
4192        let block5 = 104u64;
4193
4194        // Create test addresses for storage changesets
4195        let storage_address1 =
4196            B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
4197        let storage_address2 =
4198            B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
4199
4200        // Create test nibbles
4201        let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
4202        let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
4203        let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
4204
4205        // Create test nodes
4206        let node1 = BranchNodeCompact::new(
4207            0b1111_1111_1111_1111,
4208            0b1111_1111_1111_1111,
4209            0b0000_0000_0000_0001,
4210            vec![B256::from(hex!(
4211                "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
4212            ))],
4213            None,
4214        );
4215        let node2 = BranchNodeCompact::new(
4216            0b1111_1111_1111_1110,
4217            0b1111_1111_1111_1110,
4218            0b0000_0000_0000_0010,
4219            vec![B256::from(hex!(
4220                "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
4221            ))],
4222            Some(B256::from(hex!(
4223                "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
4224            ))),
4225        );
4226
4227        // Populate AccountsTrieChangeSets with data across multiple blocks
4228        {
4229            let provider_rw = factory.provider_rw().unwrap();
4230            let mut cursor =
4231                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4232
4233            // Block 100: 2 entries (will be kept - before start block)
4234            cursor
4235                .upsert(
4236                    block1,
4237                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4238                )
4239                .unwrap();
4240            cursor
4241                .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4242                .unwrap();
4243
4244            // Block 101: 3 entries with duplicates (will be deleted - from this block onwards)
4245            cursor
4246                .upsert(
4247                    block2,
4248                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4249                )
4250                .unwrap();
4251            cursor
4252                .upsert(
4253                    block2,
4254                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4255                )
4256                .unwrap(); // duplicate key
4257            cursor
4258                .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4259                .unwrap();
4260
4261            // Block 102: 2 entries (will be deleted - after start block)
4262            cursor
4263                .upsert(
4264                    block3,
4265                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4266                )
4267                .unwrap();
4268            cursor
4269                .upsert(
4270                    block3,
4271                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
4272                )
4273                .unwrap();
4274
4275            // Block 103: 1 entry (will be deleted - after start block)
4276            cursor
4277                .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4278                .unwrap();
4279
4280            // Block 104: 2 entries (will be deleted - after start block)
4281            cursor
4282                .upsert(
4283                    block5,
4284                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4285                )
4286                .unwrap();
4287            cursor
4288                .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4289                .unwrap();
4290
4291            provider_rw.commit().unwrap();
4292        }
4293
4294        // Populate StoragesTrieChangeSets with data across multiple blocks
4295        {
4296            let provider_rw = factory.provider_rw().unwrap();
4297            let mut cursor =
4298                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4299
4300            // Block 100, address1: 2 entries (will be kept - before start block)
4301            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4302            cursor
4303                .upsert(
4304                    key1_block1,
4305                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4306                )
4307                .unwrap();
4308            cursor
4309                .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4310                .unwrap();
4311
4312            // Block 101, address1: 3 entries with duplicates (will be deleted - from this block
4313            // onwards)
4314            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4315            cursor
4316                .upsert(
4317                    key1_block2,
4318                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4319                )
4320                .unwrap();
4321            cursor
4322                .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4323                .unwrap(); // duplicate key
4324            cursor
4325                .upsert(
4326                    key1_block2,
4327                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4328                )
4329                .unwrap();
4330
4331            // Block 102, address2: 2 entries (will be deleted - after start block)
4332            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4333            cursor
4334                .upsert(
4335                    key2_block3,
4336                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4337                )
4338                .unwrap();
4339            cursor
4340                .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4341                .unwrap();
4342
4343            // Block 103, address1: 2 entries with duplicate (will be deleted - after start block)
4344            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4345            cursor
4346                .upsert(
4347                    key1_block4,
4348                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4349                )
4350                .unwrap();
4351            cursor
4352                .upsert(
4353                    key1_block4,
4354                    &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4355                )
4356                .unwrap(); // duplicate key
4357
4358            // Block 104, address2: 2 entries (will be deleted - after start block)
4359            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4360            cursor
4361                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4362                .unwrap();
4363            cursor
4364                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4365                .unwrap();
4366
4367            provider_rw.commit().unwrap();
4368        }
4369
4370        // Clear all changesets from block 101 onwards
4371        {
4372            let provider_rw = factory.provider_rw().unwrap();
4373            provider_rw.clear_trie_changesets_from(block2).unwrap();
4374            provider_rw.commit().unwrap();
4375        }
4376
4377        // Verify AccountsTrieChangeSets after clearing
4378        {
4379            let provider = factory.provider().unwrap();
4380            let mut cursor =
4381                provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4382
4383            // Block 100 should still exist (before range)
4384            let block1_entries = cursor
4385                .walk_dup(Some(block1), None)
4386                .unwrap()
4387                .collect::<Result<Vec<_>, _>>()
4388                .unwrap();
4389            assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4390            assert_eq!(block1_entries[0].0, block1);
4391            assert_eq!(block1_entries[1].0, block1);
4392
4393            // Blocks 101-104 should be deleted
4394            let block2_entries = cursor
4395                .walk_dup(Some(block2), None)
4396                .unwrap()
4397                .collect::<Result<Vec<_>, _>>()
4398                .unwrap();
4399            assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4400
4401            let block3_entries = cursor
4402                .walk_dup(Some(block3), None)
4403                .unwrap()
4404                .collect::<Result<Vec<_>, _>>()
4405                .unwrap();
4406            assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4407
4408            let block4_entries = cursor
4409                .walk_dup(Some(block4), None)
4410                .unwrap()
4411                .collect::<Result<Vec<_>, _>>()
4412                .unwrap();
4413            assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4414
4415            // Block 104 should also be deleted
4416            let block5_entries = cursor
4417                .walk_dup(Some(block5), None)
4418                .unwrap()
4419                .collect::<Result<Vec<_>, _>>()
4420                .unwrap();
4421            assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4422        }
4423
4424        // Verify StoragesTrieChangeSets after clearing
4425        {
4426            let provider = factory.provider().unwrap();
4427            let mut cursor =
4428                provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4429
4430            // Block 100 entries should still exist (before range)
4431            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4432            let block1_entries = cursor
4433                .walk_dup(Some(key1_block1), None)
4434                .unwrap()
4435                .collect::<Result<Vec<_>, _>>()
4436                .unwrap();
4437            assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4438
4439            // Blocks 101-104 entries should be deleted
4440            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4441            let block2_entries = cursor
4442                .walk_dup(Some(key1_block2), None)
4443                .unwrap()
4444                .collect::<Result<Vec<_>, _>>()
4445                .unwrap();
4446            assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4447
4448            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4449            let block3_entries = cursor
4450                .walk_dup(Some(key2_block3), None)
4451                .unwrap()
4452                .collect::<Result<Vec<_>, _>>()
4453                .unwrap();
4454            assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4455
4456            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4457            let block4_entries = cursor
4458                .walk_dup(Some(key1_block4), None)
4459                .unwrap()
4460                .collect::<Result<Vec<_>, _>>()
4461                .unwrap();
4462            assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4463
4464            // Block 104 entries should also be deleted
4465            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4466            let block5_entries = cursor
4467                .walk_dup(Some(key2_block5), None)
4468                .unwrap()
4469                .collect::<Result<Vec<_>, _>>()
4470                .unwrap();
4471            assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4472        }
4473    }
4474
4475    #[test]
4476    fn test_write_trie_updates_sorted() {
4477        use reth_trie::{
4478            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4479            BranchNodeCompact, StorageTrieEntry,
4480        };
4481
4482        let factory = create_test_provider_factory();
4483        let provider_rw = factory.provider_rw().unwrap();
4484
4485        // Pre-populate account trie with data that will be deleted
4486        {
4487            let tx = provider_rw.tx_ref();
4488            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4489
4490            // Add account node that will be deleted
4491            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4492            cursor
4493                .upsert(
4494                    to_delete,
4495                    &BranchNodeCompact::new(
4496                        0b1010_1010_1010_1010, // state_mask
4497                        0b0000_0000_0000_0000, // tree_mask
4498                        0b0000_0000_0000_0000, // hash_mask
4499                        vec![],
4500                        None,
4501                    ),
4502                )
4503                .unwrap();
4504
4505            // Add account node that will be updated
4506            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4507            cursor
4508                .upsert(
4509                    to_update,
4510                    &BranchNodeCompact::new(
4511                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4512                        0b0000_0000_0000_0000, // tree_mask
4513                        0b0000_0000_0000_0000, // hash_mask
4514                        vec![],
4515                        None,
4516                    ),
4517                )
4518                .unwrap();
4519        }
4520
4521        // Pre-populate storage tries with data
4522        let storage_address1 = B256::from([1u8; 32]);
4523        let storage_address2 = B256::from([2u8; 32]);
4524        {
4525            let tx = provider_rw.tx_ref();
4526            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4527
4528            // Add storage nodes for address1 (one will be deleted)
4529            storage_cursor
4530                .upsert(
4531                    storage_address1,
4532                    &StorageTrieEntry {
4533                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4534                        node: BranchNodeCompact::new(
4535                            0b0011_0011_0011_0011, // will be deleted
4536                            0b0000_0000_0000_0000,
4537                            0b0000_0000_0000_0000,
4538                            vec![],
4539                            None,
4540                        ),
4541                    },
4542                )
4543                .unwrap();
4544
4545            // Add storage nodes for address2 (will be wiped)
4546            storage_cursor
4547                .upsert(
4548                    storage_address2,
4549                    &StorageTrieEntry {
4550                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4551                        node: BranchNodeCompact::new(
4552                            0b1100_1100_1100_1100, // will be wiped
4553                            0b0000_0000_0000_0000,
4554                            0b0000_0000_0000_0000,
4555                            vec![],
4556                            None,
4557                        ),
4558                    },
4559                )
4560                .unwrap();
4561            storage_cursor
4562                .upsert(
4563                    storage_address2,
4564                    &StorageTrieEntry {
4565                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4566                        node: BranchNodeCompact::new(
4567                            0b0011_1100_0011_1100, // will be wiped
4568                            0b0000_0000_0000_0000,
4569                            0b0000_0000_0000_0000,
4570                            vec![],
4571                            None,
4572                        ),
4573                    },
4574                )
4575                .unwrap();
4576        }
4577
4578        // Create sorted account trie updates
4579        let account_nodes = vec![
4580            (
4581                Nibbles::from_nibbles([0x1, 0x2]),
4582                Some(BranchNodeCompact::new(
4583                    0b1111_1111_1111_1111, // state_mask (updated)
4584                    0b0000_0000_0000_0000, // tree_mask
4585                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4586                    vec![],
4587                    None,
4588                )),
4589            ),
4590            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4591            (
4592                Nibbles::from_nibbles([0x5, 0x6]),
4593                Some(BranchNodeCompact::new(
4594                    0b1111_1111_1111_1111, // state_mask
4595                    0b0000_0000_0000_0000, // tree_mask
4596                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4597                    vec![],
4598                    None,
4599                )),
4600            ),
4601        ];
4602
4603        // Create sorted storage trie updates
4604        let storage_trie1 = StorageTrieUpdatesSorted {
4605            is_deleted: false,
4606            storage_nodes: vec![
4607                (
4608                    Nibbles::from_nibbles([0x1, 0x0]),
4609                    Some(BranchNodeCompact::new(
4610                        0b1111_0000_0000_0000, // state_mask
4611                        0b0000_0000_0000_0000, // tree_mask
4612                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4613                        vec![],
4614                        None,
4615                    )),
4616                ),
4617                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4618            ],
4619        };
4620
4621        let storage_trie2 = StorageTrieUpdatesSorted {
4622            is_deleted: true, // Wipe all storage for this address
4623            storage_nodes: vec![],
4624        };
4625
4626        let mut storage_tries = B256Map::default();
4627        storage_tries.insert(storage_address1, storage_trie1);
4628        storage_tries.insert(storage_address2, storage_trie2);
4629
4630        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4631
4632        // Write the sorted trie updates
4633        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4634
4635        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4636        // storage deletion = 5
4637        assert_eq!(num_entries, 5);
4638
4639        // Verify account trie updates were written correctly
4640        let tx = provider_rw.tx_ref();
4641        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4642
4643        // Check first account node was updated
4644        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4645        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4646        assert!(entry1.is_some(), "Updated account node should exist");
4647        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4648        assert_eq!(
4649            entry1.unwrap().1.state_mask,
4650            expected_mask,
4651            "Account node should have updated state_mask"
4652        );
4653
4654        // Check deleted account node no longer exists
4655        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4656        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4657        assert!(entry2.is_none(), "Deleted account node should not exist");
4658
4659        // Check new account node exists
4660        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4661        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4662        assert!(entry3.is_some(), "New account node should exist");
4663
4664        // Verify storage trie updates were written correctly
4665        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4666
4667        // Check storage for address1
4668        let storage_entries1: Vec<_> = storage_cursor
4669            .walk_dup(Some(storage_address1), None)
4670            .unwrap()
4671            .collect::<Result<Vec<_>, _>>()
4672            .unwrap();
4673        assert_eq!(
4674            storage_entries1.len(),
4675            1,
4676            "Storage address1 should have 1 entry after deletion"
4677        );
4678        assert_eq!(
4679            storage_entries1[0].1.nibbles.0,
4680            Nibbles::from_nibbles([0x1, 0x0]),
4681            "Remaining entry should be [0x1, 0x0]"
4682        );
4683
4684        // Check storage for address2 was wiped
4685        let storage_entries2: Vec<_> = storage_cursor
4686            .walk_dup(Some(storage_address2), None)
4687            .unwrap()
4688            .collect::<Result<Vec<_>, _>>()
4689            .unwrap();
4690        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4691
4692        provider_rw.commit().unwrap();
4693    }
4694
4695    #[test]
4696    fn test_get_block_trie_updates() {
4697        use reth_db_api::models::BlockNumberHashedAddress;
4698        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4699
4700        let factory = create_test_provider_factory();
4701        let provider_rw = factory.provider_rw().unwrap();
4702
4703        let target_block = 2u64;
4704        let next_block = 3u64;
4705
4706        // Create test nibbles and nodes for accounts
4707        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4708        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4709        let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4710
4711        let node1 = BranchNodeCompact::new(
4712            0b1111_1111_0000_0000,
4713            0b0000_0000_0000_0000,
4714            0b0000_0000_0000_0000,
4715            vec![],
4716            None,
4717        );
4718
4719        let node2 = BranchNodeCompact::new(
4720            0b0000_0000_1111_1111,
4721            0b0000_0000_0000_0000,
4722            0b0000_0000_0000_0000,
4723            vec![],
4724            None,
4725        );
4726
4727        let node3 = BranchNodeCompact::new(
4728            0b1010_1010_1010_1010,
4729            0b0000_0000_0000_0000,
4730            0b0000_0000_0000_0000,
4731            vec![],
4732            None,
4733        );
4734
4735        // Pre-populate AccountsTrie with nodes that will be the final state
4736        {
4737            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4738            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4739            cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4740            // account_nibbles3 will be deleted (not in final state)
4741        }
4742
4743        // Insert trie changesets for target_block
4744        {
4745            let mut cursor =
4746                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4747            // nibbles1 was updated in target_block (old value stored)
4748            cursor
4749                .append_dup(
4750                    target_block,
4751                    TrieChangeSetsEntry {
4752                        nibbles: StoredNibblesSubKey(account_nibbles1),
4753                        node: Some(BranchNodeCompact::new(
4754                            0b1111_0000_0000_0000, // old value
4755                            0b0000_0000_0000_0000,
4756                            0b0000_0000_0000_0000,
4757                            vec![],
4758                            None,
4759                        )),
4760                    },
4761                )
4762                .unwrap();
4763            // nibbles2 was created in target_block (no old value)
4764            cursor
4765                .append_dup(
4766                    target_block,
4767                    TrieChangeSetsEntry {
4768                        nibbles: StoredNibblesSubKey(account_nibbles2),
4769                        node: None,
4770                    },
4771                )
4772                .unwrap();
4773        }
4774
4775        // Insert trie changesets for next_block (to test overlay)
4776        {
4777            let mut cursor =
4778                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4779            // nibbles3 was deleted in next_block (old value stored)
4780            cursor
4781                .append_dup(
4782                    next_block,
4783                    TrieChangeSetsEntry {
4784                        nibbles: StoredNibblesSubKey(account_nibbles3),
4785                        node: Some(node3),
4786                    },
4787                )
4788                .unwrap();
4789        }
4790
4791        // Storage trie updates
4792        let storage_address1 = B256::from([1u8; 32]);
4793        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4794        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4795
4796        let storage_node1 = BranchNodeCompact::new(
4797            0b1111_1111_1111_0000,
4798            0b0000_0000_0000_0000,
4799            0b0000_0000_0000_0000,
4800            vec![],
4801            None,
4802        );
4803
4804        let storage_node2 = BranchNodeCompact::new(
4805            0b0101_0101_0101_0101,
4806            0b0000_0000_0000_0000,
4807            0b0000_0000_0000_0000,
4808            vec![],
4809            None,
4810        );
4811
4812        // Pre-populate StoragesTrie with final state
4813        {
4814            let mut cursor =
4815                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4816            cursor
4817                .upsert(
4818                    storage_address1,
4819                    &StorageTrieEntry {
4820                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4821                        node: storage_node1.clone(),
4822                    },
4823                )
4824                .unwrap();
4825            // storage_nibbles2 was deleted in next_block, so it's not in final state
4826        }
4827
4828        // Insert storage trie changesets for target_block
4829        {
4830            let mut cursor =
4831                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4832            let key = BlockNumberHashedAddress((target_block, storage_address1));
4833
4834            // storage_nibbles1 was updated
4835            cursor
4836                .append_dup(
4837                    key,
4838                    TrieChangeSetsEntry {
4839                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4840                        node: Some(BranchNodeCompact::new(
4841                            0b0000_0000_1111_1111, // old value
4842                            0b0000_0000_0000_0000,
4843                            0b0000_0000_0000_0000,
4844                            vec![],
4845                            None,
4846                        )),
4847                    },
4848                )
4849                .unwrap();
4850
4851            // storage_nibbles2 was created
4852            cursor
4853                .append_dup(
4854                    key,
4855                    TrieChangeSetsEntry {
4856                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4857                        node: None,
4858                    },
4859                )
4860                .unwrap();
4861        }
4862
4863        // Insert storage trie changesets for next_block (to test overlay)
4864        {
4865            let mut cursor =
4866                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4867            let key = BlockNumberHashedAddress((next_block, storage_address1));
4868
4869            // storage_nibbles2 was deleted in next_block
4870            cursor
4871                .append_dup(
4872                    key,
4873                    TrieChangeSetsEntry {
4874                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4875                        node: Some(BranchNodeCompact::new(
4876                            0b0101_0101_0101_0101, // value that was deleted
4877                            0b0000_0000_0000_0000,
4878                            0b0000_0000_0000_0000,
4879                            vec![],
4880                            None,
4881                        )),
4882                    },
4883                )
4884                .unwrap();
4885        }
4886
4887        provider_rw.commit().unwrap();
4888
4889        // Now test get_block_trie_updates
4890        let provider = factory.provider().unwrap();
4891        let result = provider.get_block_trie_updates(target_block).unwrap();
4892
4893        // Verify account trie updates
4894        assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4895
4896        // Check nibbles1 - should have the current value (node1)
4897        let nibbles1_update = result
4898            .account_nodes_ref()
4899            .iter()
4900            .find(|(n, _)| n == &account_nibbles1)
4901            .expect("Should find nibbles1");
4902        assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4903        assert_eq!(
4904            nibbles1_update.1.as_ref().unwrap().state_mask,
4905            node1.state_mask,
4906            "nibbles1 should have current value"
4907        );
4908
4909        // Check nibbles2 - should have the current value (node2)
4910        let nibbles2_update = result
4911            .account_nodes_ref()
4912            .iter()
4913            .find(|(n, _)| n == &account_nibbles2)
4914            .expect("Should find nibbles2");
4915        assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4916        assert_eq!(
4917            nibbles2_update.1.as_ref().unwrap().state_mask,
4918            node2.state_mask,
4919            "nibbles2 should have current value"
4920        );
4921
4922        // nibbles3 should NOT be in the result (it was changed in next_block, not target_block)
4923        assert!(
4924            !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4925            "nibbles3 should not be in target_block updates"
4926        );
4927
4928        // Verify storage trie updates
4929        assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4930        let storage_updates = result
4931            .storage_tries_ref()
4932            .get(&storage_address1)
4933            .expect("Should have storage updates for address1");
4934
4935        assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4936
4937        // Check storage_nibbles1 - should have current value
4938        let storage1_update = storage_updates
4939            .storage_nodes
4940            .iter()
4941            .find(|(n, _)| n == &storage_nibbles1)
4942            .expect("Should find storage_nibbles1");
4943        assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4944        assert_eq!(
4945            storage1_update.1.as_ref().unwrap().state_mask,
4946            storage_node1.state_mask,
4947            "storage_nibbles1 should have current value"
4948        );
4949
4950        // Check storage_nibbles2 - was created in target_block, will be deleted in next_block
4951        // So it should have a value (the value that will be deleted)
4952        let storage2_update = storage_updates
4953            .storage_nodes
4954            .iter()
4955            .find(|(n, _)| n == &storage_nibbles2)
4956            .expect("Should find storage_nibbles2");
4957        assert!(
4958            storage2_update.1.is_some(),
4959            "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4960        );
4961        assert_eq!(
4962            storage2_update.1.as_ref().unwrap().state_mask,
4963            storage_node2.state_mask,
4964            "storage_nibbles2 should have the value that was created and will be deleted"
4965        );
4966    }
4967
4968    #[test]
4969    fn test_prunable_receipts_logic() {
4970        let insert_blocks =
4971            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4972                let mut rng = generators::rng();
4973                for block_num in 0..=tip_block {
4974                    let block = random_block(
4975                        &mut rng,
4976                        block_num,
4977                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4978                    );
4979                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4980                }
4981            };
4982
4983        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4984            let outcome = ExecutionOutcome {
4985                first_block: block,
4986                receipts: vec![vec![Receipt {
4987                    tx_type: Default::default(),
4988                    success: true,
4989                    cumulative_gas_used: block, // identifier to assert against
4990                    logs: vec![],
4991                }]],
4992                ..Default::default()
4993            };
4994            provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
4995            provider_rw.commit().unwrap();
4996        };
4997
4998        // Legacy mode (receipts in DB) - should be prunable
4999        {
5000            let factory = create_test_provider_factory();
5001            let storage_settings = StorageSettings::legacy();
5002            factory.set_storage_settings_cache(storage_settings);
5003            let factory = factory.with_prune_modes(PruneModes {
5004                receipts: Some(PruneMode::Before(100)),
5005                ..Default::default()
5006            });
5007
5008            let tip_block = 200u64;
5009            let first_block = 1u64;
5010
5011            // create chain
5012            let provider_rw = factory.provider_rw().unwrap();
5013            insert_blocks(&provider_rw, tip_block, 1);
5014            provider_rw.commit().unwrap();
5015
5016            write_receipts(
5017                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
5018                first_block,
5019            );
5020            write_receipts(
5021                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
5022                tip_block - 1,
5023            );
5024
5025            let provider = factory.provider().unwrap();
5026
5027            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
5028                assert!(provider
5029                    .receipts_by_block(block.into())
5030                    .unwrap()
5031                    .is_some_and(|r| r.len() == num_receipts));
5032            }
5033        }
5034
5035        // Static files mode
5036        {
5037            let factory = create_test_provider_factory();
5038            let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
5039            factory.set_storage_settings_cache(storage_settings);
5040            let factory = factory.with_prune_modes(PruneModes {
5041                receipts: Some(PruneMode::Before(2)),
5042                ..Default::default()
5043            });
5044
5045            let tip_block = 200u64;
5046
5047            // create chain
5048            let provider_rw = factory.provider_rw().unwrap();
5049            insert_blocks(&provider_rw, tip_block, 1);
5050            provider_rw.commit().unwrap();
5051
5052            // Attempt to write receipts for block 0 and 1 (should be skipped)
5053            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
5054            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
5055
5056            assert!(factory
5057                .static_file_provider()
5058                .get_highest_static_file_tx(StaticFileSegment::Receipts)
5059                .is_none(),);
5060            assert!(factory
5061                .static_file_provider()
5062                .get_highest_static_file_block(StaticFileSegment::Receipts)
5063                .is_some_and(|b| b == 1),);
5064
5065            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
5066            // static files.
5067            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
5068            assert!(factory
5069                .static_file_provider()
5070                .get_highest_static_file_tx(StaticFileSegment::Receipts)
5071                .is_some_and(|num| num == 2),);
5072
5073            // After having a receipt already in static files, attempt to skip the next receipt by
5074            // changing the prune mode. It should NOT skip it and should still write the receipt,
5075            // since static files do not support gaps.
5076            let factory = factory.with_prune_modes(PruneModes {
5077                receipts: Some(PruneMode::Before(100)),
5078                ..Default::default()
5079            });
5080            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
5081            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
5082            write_receipts(provider_rw, 3);
5083
5084            // Ensure we can only fetch the 2 last receipts.
5085            //
5086            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
5087            // to the block number it belongs to easily identify and assert.
5088            let provider = factory.provider().unwrap();
5089            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
5090            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
5091                assert!(provider
5092                    .receipts_by_block(num.into())
5093                    .unwrap()
5094                    .is_some_and(|r| r.len() == num_receipts));
5095
5096                let receipt = provider.receipt(num).unwrap();
5097                if num_receipts > 0 {
5098                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
5099                } else {
5100                    assert!(receipt.is_none());
5101                }
5102            }
5103        }
5104    }
5105}