Skip to main content

reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, AddressSet, B256Map, HashMap},
31    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::Database,
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44        StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55    SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader, StoragePath,
65    StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77    cmp::Ordering,
78    collections::{BTreeMap, BTreeSet},
79    fmt::Debug,
80    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81    path::PathBuf,
82    sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86/// Determines the commit order for database operations.
87#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
90    #[default]
91    Normal,
92    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
93    /// Used for unwind operations to allow recovery by truncating static files on restart.
94    Unwind,
95}
96
97impl CommitOrder {
98    /// Returns true if this is unwind commit order.
99    pub const fn is_unwind(&self) -> bool {
100        matches!(self, Self::Unwind)
101    }
102}
103
104/// A [`DatabaseProvider`] that holds a read-only database transaction.
105pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107/// A [`DatabaseProvider`] that holds a read-write database transaction.
108///
109/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
110/// Once that issue is solved, we can probably revert back to being an alias type.
111#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113    pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119    fn deref(&self) -> &Self::Target {
120        &self.0
121    }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125    fn deref_mut(&mut self) -> &mut Self::Target {
126        &mut self.0
127    }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131    for DatabaseProviderRW<DB, N>
132{
133    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134        &self.0
135    }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139    /// Commit database transaction and static file if it exists.
140    pub fn commit(self) -> ProviderResult<()> {
141        self.0.commit()
142    }
143
144    /// Consume `DbTx` or `DbTxMut`.
145    pub fn into_tx(self) -> <DB as Database>::TXMut {
146        self.0.into_tx()
147    }
148
149    /// Override the minimum pruning distance for testing purposes.
150    #[cfg(any(test, feature = "test-utils"))]
151    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152        self.0.minimum_pruning_distance = distance;
153        self
154    }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158    for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161        provider.0
162    }
163}
164
165/// Mode for [`DatabaseProvider::save_blocks`].
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168    /// Full mode: write block structure + receipts + state + trie.
169    /// Used by engine/production code.
170    Full,
171    /// Blocks only: write block structure (headers, txs, senders, indices).
172    /// Receipts/state/trie are skipped - they may come later via separate calls.
173    /// Used by `insert_block`.
174    BlocksOnly,
175}
176
177impl SaveBlocksMode {
178    /// Returns `true` if this is [`SaveBlocksMode::Full`].
179    pub const fn with_state(self) -> bool {
180        matches!(self, Self::Full)
181    }
182}
183
184/// A provider struct that fetches data from the database.
185/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
186pub struct DatabaseProvider<TX, N: NodeTypes> {
187    /// Database transaction.
188    tx: TX,
189    /// Chain spec
190    chain_spec: Arc<N::ChainSpec>,
191    /// Static File provider
192    static_file_provider: StaticFileProvider<N::Primitives>,
193    /// Pruning configuration
194    prune_modes: PruneModes,
195    /// Node storage handler.
196    storage: Arc<N::Storage>,
197    /// Storage configuration settings for this node
198    storage_settings: Arc<RwLock<StorageSettings>>,
199    /// `RocksDB` provider
200    rocksdb_provider: RocksDBProvider,
201    /// Changeset cache for trie unwinding
202    changeset_cache: ChangesetCache,
203    /// Task runtime for spawning parallel I/O work.
204    runtime: reth_tasks::Runtime,
205    /// Path to the database directory.
206    db_path: PathBuf,
207    /// Pending `RocksDB` batches to be committed at provider commit time.
208    #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
209    pending_rocksdb_batches: PendingRocksDBBatches,
210    /// Commit order for database operations.
211    commit_order: CommitOrder,
212    /// Minimum distance from tip required for pruning
213    minimum_pruning_distance: u64,
214    /// Database provider metrics
215    metrics: metrics::DatabaseProviderMetrics,
216}
217
218impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220        let mut s = f.debug_struct("DatabaseProvider");
221        s.field("tx", &self.tx)
222            .field("chain_spec", &self.chain_spec)
223            .field("static_file_provider", &self.static_file_provider)
224            .field("prune_modes", &self.prune_modes)
225            .field("storage", &self.storage)
226            .field("storage_settings", &self.storage_settings)
227            .field("rocksdb_provider", &self.rocksdb_provider)
228            .field("changeset_cache", &self.changeset_cache)
229            .field("runtime", &self.runtime)
230            .field("pending_rocksdb_batches", &"<pending batches>")
231            .field("commit_order", &self.commit_order)
232            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
233            .finish()
234    }
235}
236
237impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
238    /// Returns reference to prune modes.
239    pub const fn prune_modes_ref(&self) -> &PruneModes {
240        &self.prune_modes
241    }
242}
243
244impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
245    /// State provider for latest state
246    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
247        trace!(target: "providers::db", "Returning latest state provider");
248        Box::new(LatestStateProviderRef::new(self))
249    }
250
251    /// Storage provider for state at that given block hash
252    pub fn history_by_block_hash<'a>(
253        &'a self,
254        block_hash: BlockHash,
255    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
256        let mut block_number =
257            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
258        if block_number == self.best_block_number().unwrap_or_default() &&
259            block_number == self.last_block_number().unwrap_or_default()
260        {
261            return Ok(Box::new(LatestStateProviderRef::new(self)))
262        }
263
264        // +1 as the changeset that we want is the one that was applied after this block.
265        block_number += 1;
266
267        let account_history_prune_checkpoint =
268            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
269        let storage_history_prune_checkpoint =
270            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
271
272        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
273
274        // If we pruned account or storage history, we can't return state on every historical block.
275        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
276        if let Some(prune_checkpoint_block_number) =
277            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
278        {
279            state_provider = state_provider.with_lowest_available_account_history_block_number(
280                prune_checkpoint_block_number + 1,
281            );
282        }
283        if let Some(prune_checkpoint_block_number) =
284            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
285        {
286            state_provider = state_provider.with_lowest_available_storage_history_block_number(
287                prune_checkpoint_block_number + 1,
288            );
289        }
290
291        Ok(Box::new(state_provider))
292    }
293
294    #[cfg(feature = "test-utils")]
295    /// Sets the prune modes for provider.
296    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
297        self.prune_modes = prune_modes;
298    }
299}
300
301impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
302    type Primitives = N::Primitives;
303}
304
305impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
306    /// Returns a static file provider
307    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
308        self.static_file_provider.clone()
309    }
310
311    fn get_static_file_writer(
312        &self,
313        block: BlockNumber,
314        segment: StaticFileSegment,
315    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
316        self.static_file_provider.get_writer(block, segment)
317    }
318}
319
320impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
321    /// Returns the `RocksDB` provider.
322    fn rocksdb_provider(&self) -> RocksDBProvider {
323        self.rocksdb_provider.clone()
324    }
325
326    #[cfg(all(unix, feature = "rocksdb"))]
327    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
328        self.pending_rocksdb_batches.lock().push(batch);
329    }
330
331    #[cfg(all(unix, feature = "rocksdb"))]
332    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
333        let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
334        for batch in batches {
335            self.rocksdb_provider.commit_batch(batch)?;
336        }
337        Ok(())
338    }
339}
340
341impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
342    for DatabaseProvider<TX, N>
343{
344    type ChainSpec = N::ChainSpec;
345
346    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
347        self.chain_spec.clone()
348    }
349}
350
351impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
352    /// Creates a provider with an inner read-write transaction.
353    #[allow(clippy::too_many_arguments)]
354    fn new_rw_inner(
355        tx: TX,
356        chain_spec: Arc<N::ChainSpec>,
357        static_file_provider: StaticFileProvider<N::Primitives>,
358        prune_modes: PruneModes,
359        storage: Arc<N::Storage>,
360        storage_settings: Arc<RwLock<StorageSettings>>,
361        rocksdb_provider: RocksDBProvider,
362        changeset_cache: ChangesetCache,
363        runtime: reth_tasks::Runtime,
364        db_path: PathBuf,
365        commit_order: CommitOrder,
366    ) -> Self {
367        Self {
368            tx,
369            chain_spec,
370            static_file_provider,
371            prune_modes,
372            storage,
373            storage_settings,
374            rocksdb_provider,
375            changeset_cache,
376            runtime,
377            db_path,
378            pending_rocksdb_batches: Default::default(),
379            commit_order,
380            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
381            metrics: metrics::DatabaseProviderMetrics::default(),
382        }
383    }
384
385    /// Creates a provider with an inner read-write transaction using normal commit order.
386    #[allow(clippy::too_many_arguments)]
387    pub fn new_rw(
388        tx: TX,
389        chain_spec: Arc<N::ChainSpec>,
390        static_file_provider: StaticFileProvider<N::Primitives>,
391        prune_modes: PruneModes,
392        storage: Arc<N::Storage>,
393        storage_settings: Arc<RwLock<StorageSettings>>,
394        rocksdb_provider: RocksDBProvider,
395        changeset_cache: ChangesetCache,
396        runtime: reth_tasks::Runtime,
397        db_path: PathBuf,
398    ) -> Self {
399        Self::new_rw_inner(
400            tx,
401            chain_spec,
402            static_file_provider,
403            prune_modes,
404            storage,
405            storage_settings,
406            rocksdb_provider,
407            changeset_cache,
408            runtime,
409            db_path,
410            CommitOrder::Normal,
411        )
412    }
413
414    /// Creates a provider with an inner read-write transaction using unwind commit order.
415    #[allow(clippy::too_many_arguments)]
416    pub fn new_unwind_rw(
417        tx: TX,
418        chain_spec: Arc<N::ChainSpec>,
419        static_file_provider: StaticFileProvider<N::Primitives>,
420        prune_modes: PruneModes,
421        storage: Arc<N::Storage>,
422        storage_settings: Arc<RwLock<StorageSettings>>,
423        rocksdb_provider: RocksDBProvider,
424        changeset_cache: ChangesetCache,
425        runtime: reth_tasks::Runtime,
426        db_path: PathBuf,
427    ) -> Self {
428        Self::new_rw_inner(
429            tx,
430            chain_spec,
431            static_file_provider,
432            prune_modes,
433            storage,
434            storage_settings,
435            rocksdb_provider,
436            changeset_cache,
437            runtime,
438            db_path,
439            CommitOrder::Unwind,
440        )
441    }
442}
443
444impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
445    fn as_ref(&self) -> &Self {
446        self
447    }
448}
449
450impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
451    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
452    ///
453    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
454    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
455    where
456        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
457    {
458        #[cfg(all(unix, feature = "rocksdb"))]
459        let rocksdb = self.rocksdb_provider();
460        #[cfg(all(unix, feature = "rocksdb"))]
461        let rocksdb_batch = rocksdb.batch();
462        #[cfg(not(all(unix, feature = "rocksdb")))]
463        let rocksdb_batch = ();
464
465        let (result, raw_batch) = f(rocksdb_batch)?;
466
467        #[cfg(all(unix, feature = "rocksdb"))]
468        if let Some(batch) = raw_batch {
469            self.set_pending_rocksdb_batch(batch);
470        }
471        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
472
473        Ok(result)
474    }
475
476    /// Creates the context for static file writes.
477    fn static_file_write_ctx(
478        &self,
479        save_mode: SaveBlocksMode,
480        first_block: BlockNumber,
481        last_block: BlockNumber,
482    ) -> ProviderResult<StaticFileWriteCtx> {
483        let tip = self.last_block_number()?.max(last_block);
484        Ok(StaticFileWriteCtx {
485            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
486                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
487            write_receipts: save_mode.with_state() &&
488                EitherWriter::receipts_destination(self).is_static_file(),
489            write_account_changesets: save_mode.with_state() &&
490                EitherWriterDestination::account_changesets(self).is_static_file(),
491            write_storage_changesets: save_mode.with_state() &&
492                EitherWriterDestination::storage_changesets(self).is_static_file(),
493            tip,
494            receipts_prune_mode: self.prune_modes.receipts,
495            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
496            receipts_prunable: self
497                .static_file_provider
498                .get_highest_static_file_tx(StaticFileSegment::Receipts)
499                .is_none() &&
500                PruneMode::Distance(self.minimum_pruning_distance)
501                    .should_prune(first_block, tip),
502        })
503    }
504
505    /// Creates the context for `RocksDB` writes.
506    #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
507    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
508        RocksDBWriteCtx {
509            first_block_number: first_block,
510            prune_tx_lookup: self.prune_modes.transaction_lookup,
511            storage_settings: self.cached_storage_settings(),
512            pending_batches: self.pending_rocksdb_batches.clone(),
513        }
514    }
515
516    /// Writes executed blocks and state to storage.
517    ///
518    /// This method parallelizes static file (SF) writes with MDBX writes.
519    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
520    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
521    ///
522    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
523    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
524    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
525    pub fn save_blocks(
526        &self,
527        blocks: Vec<ExecutedBlock<N::Primitives>>,
528        save_mode: SaveBlocksMode,
529    ) -> ProviderResult<()> {
530        if blocks.is_empty() {
531            debug!(target: "providers::db", "Attempted to write empty block range");
532            return Ok(())
533        }
534
535        let total_start = Instant::now();
536        let block_count = blocks.len() as u64;
537        let first_number = blocks.first().unwrap().recovered_block().number();
538        let last_block_number = blocks.last().unwrap().recovered_block().number();
539
540        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
541
542        // Compute tx_nums upfront (both threads need these)
543        let first_tx_num = self
544            .tx
545            .cursor_read::<tables::TransactionBlocks>()?
546            .last()?
547            .map(|(n, _)| n + 1)
548            .unwrap_or_default();
549
550        let tx_nums: Vec<TxNumber> = {
551            let mut nums = Vec::with_capacity(blocks.len());
552            let mut current = first_tx_num;
553            for block in &blocks {
554                nums.push(current);
555                current += block.recovered_block().body().transaction_count() as u64;
556            }
557            nums
558        };
559
560        let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
561
562        // avoid capturing &self.tx in scope below.
563        let sf_provider = &self.static_file_provider;
564        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
565        #[cfg(all(unix, feature = "rocksdb"))]
566        let rocksdb_provider = self.rocksdb_provider.clone();
567        #[cfg(all(unix, feature = "rocksdb"))]
568        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
569        #[cfg(all(unix, feature = "rocksdb"))]
570        let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
571
572        let mut sf_result = None;
573        #[cfg(all(unix, feature = "rocksdb"))]
574        let mut rocksdb_result = None;
575
576        // Write to all backends in parallel.
577        let runtime = &self.runtime;
578        // Propagate tracing context into rayon-spawned threads so that static file
579        // and RocksDB write spans appear as children of save_blocks in traces.
580        let span = tracing::Span::current();
581        runtime.storage_pool().in_place_scope(|s| {
582            // SF writes
583            s.spawn(|_| {
584                let _guard = span.enter();
585                let start = Instant::now();
586                sf_result = Some(
587                    sf_provider
588                        .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
589                        .map(|()| start.elapsed()),
590                );
591            });
592
593            // RocksDB writes
594            #[cfg(all(unix, feature = "rocksdb"))]
595            if rocksdb_enabled {
596                s.spawn(|_| {
597                    let _guard = span.enter();
598                    let start = Instant::now();
599                    rocksdb_result = Some(
600                        rocksdb_provider
601                            .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
602                            .map(|()| start.elapsed()),
603                    );
604                });
605            }
606
607            // MDBX writes
608            let mdbx_start = Instant::now();
609
610            // Collect all transaction hashes across all blocks, sort them, and write in batch
611            if !self.cached_storage_settings().storage_v2 &&
612                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
613            {
614                let start = Instant::now();
615                let total_tx_count: usize =
616                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
617                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
618                for (i, block) in blocks.iter().enumerate() {
619                    let recovered_block = block.recovered_block();
620                    for (tx_num, transaction) in
621                        (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
622                    {
623                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
624                    }
625                }
626
627                // Sort by hash for optimal MDBX insertion performance
628                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
629
630                // Write all transaction hash numbers in a single batch
631                self.with_rocksdb_batch(|batch| {
632                    let mut tx_hash_writer =
633                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
634                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
635                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
636                    Ok(((), raw_batch))
637                })?;
638                self.metrics.record_duration(
639                    metrics::Action::InsertTransactionHashNumbers,
640                    start.elapsed(),
641                );
642            }
643
644            for (i, block) in blocks.iter().enumerate() {
645                let recovered_block = block.recovered_block();
646
647                let start = Instant::now();
648                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
649                timings.insert_block += start.elapsed();
650
651                if save_mode.with_state() {
652                    let execution_output = block.execution_outcome();
653
654                    // Write state and changesets to the database.
655                    // Must be written after blocks because of the receipt lookup.
656                    // Skip receipts/account changesets if they're being written to static files.
657                    let start = Instant::now();
658                    self.write_state(
659                        WriteStateInput::Single {
660                            outcome: execution_output,
661                            block: recovered_block.number(),
662                        },
663                        OriginalValuesKnown::No,
664                        StateWriteConfig {
665                            write_receipts: !sf_ctx.write_receipts,
666                            write_account_changesets: !sf_ctx.write_account_changesets,
667                            write_storage_changesets: !sf_ctx.write_storage_changesets,
668                        },
669                    )?;
670                    timings.write_state += start.elapsed();
671                }
672            }
673
674            // Write all hashed state and trie updates in single batches.
675            // This reduces cursor open/close overhead from N calls to 1.
676            if save_mode.with_state() {
677                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
678                let start = Instant::now();
679                let merged_hashed_state = HashedPostStateSorted::merge_batch(
680                    blocks.iter().rev().map(|b| b.trie_data().hashed_state),
681                );
682                if !merged_hashed_state.is_empty() {
683                    self.write_hashed_state(&merged_hashed_state)?;
684                }
685                timings.write_hashed_state += start.elapsed();
686
687                let start = Instant::now();
688                let merged_trie =
689                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
690                if !merged_trie.is_empty() {
691                    self.write_trie_updates_sorted(&merged_trie)?;
692                }
693                timings.write_trie_updates += start.elapsed();
694            }
695
696            // Full mode: update history indices
697            if save_mode.with_state() {
698                let start = Instant::now();
699                self.update_history_indices(first_number..=last_block_number)?;
700                timings.update_history_indices = start.elapsed();
701            }
702
703            // Update pipeline progress
704            let start = Instant::now();
705            self.update_pipeline_stages(last_block_number, false)?;
706            timings.update_pipeline_stages = start.elapsed();
707
708            timings.mdbx = mdbx_start.elapsed();
709
710            Ok::<_, ProviderError>(())
711        })?;
712
713        // Collect results from spawned tasks
714        timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
715
716        #[cfg(all(unix, feature = "rocksdb"))]
717        if rocksdb_enabled {
718            timings.rocksdb = rocksdb_result.ok_or_else(|| {
719                ProviderError::Database(reth_db_api::DatabaseError::Other(
720                    "RocksDB thread panicked".into(),
721                ))
722            })??;
723        }
724
725        timings.total = total_start.elapsed();
726
727        self.metrics.record_save_blocks(&timings);
728        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
729
730        Ok(())
731    }
732
733    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
734    ///
735    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
736    #[instrument(level = "debug", target = "providers::db", skip_all)]
737    fn insert_block_mdbx_only(
738        &self,
739        block: &RecoveredBlock<BlockTy<N>>,
740        first_tx_num: TxNumber,
741    ) -> ProviderResult<StoredBlockBodyIndices> {
742        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
743            EitherWriterDestination::senders(self).is_database()
744        {
745            let start = Instant::now();
746            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
747            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
748            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
749                cursor.append(tx_num, &sender)?;
750            }
751            self.metrics
752                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
753        }
754
755        let block_number = block.number();
756        let tx_count = block.body().transaction_count() as u64;
757
758        let start = Instant::now();
759        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
760        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
761
762        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
763
764        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
765    }
766
767    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
768    /// `Ommers`/`Withdrawals`).
769    fn write_block_body_indices(
770        &self,
771        block_number: BlockNumber,
772        body: &BodyTy<N>,
773        first_tx_num: TxNumber,
774        tx_count: u64,
775    ) -> ProviderResult<()> {
776        // MDBX: BlockBodyIndices
777        let start = Instant::now();
778        self.tx
779            .cursor_write::<tables::BlockBodyIndices>()?
780            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
781        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
782
783        // MDBX: TransactionBlocks (last tx -> block mapping)
784        if tx_count > 0 {
785            let start = Instant::now();
786            self.tx
787                .cursor_write::<tables::TransactionBlocks>()?
788                .append(first_tx_num + tx_count - 1, &block_number)?;
789            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
790        }
791
792        // MDBX: Ommers/Withdrawals
793        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
794
795        Ok(())
796    }
797
798    /// Unwinds trie state starting at and including the given block.
799    ///
800    /// This includes calculating the resulted state root and comparing it with the parent block
801    /// state root.
802    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
803        let changed_accounts = self.account_changesets_range(from..)?;
804
805        // Unwind account hashes.
806        self.unwind_account_hashing(changed_accounts.iter())?;
807
808        // Unwind account history indices.
809        self.unwind_account_history_indices(changed_accounts.iter())?;
810
811        let changed_storages = self.storage_changesets_range(from..)?;
812
813        // Unwind storage hashes.
814        self.unwind_storage_hashing(changed_storages.iter().copied())?;
815
816        // Unwind storage history indices.
817        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
818
819        // Unwind accounts/storages trie tables using the revert.
820        // Get the database tip block number
821        let db_tip_block = self
822            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
823            .as_ref()
824            .map(|chk| chk.block_number)
825            .ok_or_else(|| ProviderError::InsufficientChangesets {
826                requested: from,
827                available: 0..=0,
828            })?;
829
830        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
831        self.write_trie_updates_sorted(&trie_revert)?;
832
833        Ok(())
834    }
835
836    /// Removes receipts from all transactions starting with provided number (inclusive).
837    fn remove_receipts_from(
838        &self,
839        from_tx: TxNumber,
840        last_block: BlockNumber,
841    ) -> ProviderResult<()> {
842        // iterate over block body and remove receipts
843        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
844
845        if EitherWriter::receipts_destination(self).is_static_file() {
846            let static_file_receipt_num =
847                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
848
849            let to_delete = static_file_receipt_num
850                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
851                .unwrap_or_default();
852
853            self.static_file_provider
854                .latest_writer(StaticFileSegment::Receipts)?
855                .prune_receipts(to_delete, last_block)?;
856        }
857
858        Ok(())
859    }
860
861    /// Writes bytecodes to MDBX.
862    fn write_bytecodes(
863        &self,
864        bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
865    ) -> ProviderResult<()> {
866        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
867        for (hash, bytecode) in bytecodes {
868            bytecodes_cursor.upsert(hash, &bytecode)?;
869        }
870        Ok(())
871    }
872}
873
874impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
875    fn try_into_history_at_block(
876        self,
877        mut block_number: BlockNumber,
878    ) -> ProviderResult<StateProviderBox> {
879        let best_block = self.best_block_number().unwrap_or_default();
880
881        // Reject requests for blocks beyond the best block
882        if block_number > best_block {
883            return Err(ProviderError::BlockNotExecuted {
884                requested: block_number,
885                executed: best_block,
886            });
887        }
888
889        // If requesting state at the best block, use the latest state provider
890        if block_number == best_block {
891            return Ok(Box::new(LatestStateProvider::new(self)));
892        }
893
894        // +1 as the changeset that we want is the one that was applied after this block.
895        block_number += 1;
896
897        let account_history_prune_checkpoint =
898            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
899        let storage_history_prune_checkpoint =
900            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
901
902        let mut state_provider = HistoricalStateProvider::new(self, block_number);
903
904        // If we pruned account or storage history, we can't return state on every historical block.
905        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
906        if let Some(prune_checkpoint_block_number) =
907            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
908        {
909            state_provider = state_provider.with_lowest_available_account_history_block_number(
910                prune_checkpoint_block_number + 1,
911            );
912        }
913        if let Some(prune_checkpoint_block_number) =
914            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
915        {
916            state_provider = state_provider.with_lowest_available_storage_history_block_number(
917                prune_checkpoint_block_number + 1,
918            );
919        }
920
921        Ok(Box::new(state_provider))
922    }
923}
924
925/// For a given key, unwind all history shards that contain block numbers at or above the given
926/// block number.
927///
928/// S - Sharded key subtype.
929/// T - Table to walk over.
930/// C - Cursor implementation.
931///
932/// This function walks the entries from the given start key and deletes all shards that belong to
933/// the key and contain block numbers at or above the given block number. Shards entirely below
934/// the block number are preserved.
935///
936/// The boundary shard (the shard that spans across the block number) is removed from the database.
937/// Any indices that are below the block number are filtered out and returned for reinsertion.
938/// The boundary shard is returned for reinsertion (if it's not empty).
939fn unwind_history_shards<S, T, C>(
940    cursor: &mut C,
941    start_key: T::Key,
942    block_number: BlockNumber,
943    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
944) -> ProviderResult<Vec<u64>>
945where
946    T: Table<Value = BlockNumberList>,
947    T::Key: AsRef<ShardedKey<S>>,
948    C: DbCursorRO<T> + DbCursorRW<T>,
949{
950    // Start from the given key and iterate through shards
951    let mut item = cursor.seek_exact(start_key)?;
952    while let Some((sharded_key, list)) = item {
953        // If the shard does not belong to the key, break.
954        if !shard_belongs_to_key(&sharded_key) {
955            break
956        }
957
958        // Always delete the current shard from the database first
959        // We'll decide later what (if anything) to reinsert
960        cursor.delete_current()?;
961
962        // Get the first (lowest) block number in this shard
963        // All block numbers in a shard are sorted in ascending order
964        let first = list.iter().next().expect("List can't be empty");
965
966        // Case 1: Entire shard is at or above the unwinding point
967        // Keep it deleted (don't return anything for reinsertion)
968        if first >= block_number {
969            item = cursor.prev()?;
970            continue
971        }
972        // Case 2: This is a boundary shard (spans across the unwinding point)
973        // The shard contains some blocks below and some at/above the unwinding point
974        else if block_number <= sharded_key.as_ref().highest_block_number {
975            // Return only the block numbers that are below the unwinding point
976            // These will be reinserted to preserve the historical data
977            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
978        }
979        // Case 3: Entire shard is below the unwinding point
980        // Return all block numbers for reinsertion (preserve entire shard)
981        return Ok(list.iter().collect::<Vec<_>>())
982    }
983
984    // No shards found or all processed
985    Ok(Vec::new())
986}
987
988impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
989    /// Creates a provider with an inner read-only transaction.
990    #[allow(clippy::too_many_arguments)]
991    pub fn new(
992        tx: TX,
993        chain_spec: Arc<N::ChainSpec>,
994        static_file_provider: StaticFileProvider<N::Primitives>,
995        prune_modes: PruneModes,
996        storage: Arc<N::Storage>,
997        storage_settings: Arc<RwLock<StorageSettings>>,
998        rocksdb_provider: RocksDBProvider,
999        changeset_cache: ChangesetCache,
1000        runtime: reth_tasks::Runtime,
1001        db_path: PathBuf,
1002    ) -> Self {
1003        Self {
1004            tx,
1005            chain_spec,
1006            static_file_provider,
1007            prune_modes,
1008            storage,
1009            storage_settings,
1010            rocksdb_provider,
1011            changeset_cache,
1012            runtime,
1013            db_path,
1014            pending_rocksdb_batches: Default::default(),
1015            commit_order: CommitOrder::Normal,
1016            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1017            metrics: metrics::DatabaseProviderMetrics::default(),
1018        }
1019    }
1020
1021    /// Consume `DbTx` or `DbTxMut`.
1022    pub fn into_tx(self) -> TX {
1023        self.tx
1024    }
1025
1026    /// Pass `DbTx` or `DbTxMut` mutable reference.
1027    pub const fn tx_mut(&mut self) -> &mut TX {
1028        &mut self.tx
1029    }
1030
1031    /// Pass `DbTx` or `DbTxMut` immutable reference.
1032    pub const fn tx_ref(&self) -> &TX {
1033        &self.tx
1034    }
1035
1036    /// Returns a reference to the chain specification.
1037    pub fn chain_spec(&self) -> &N::ChainSpec {
1038        &self.chain_spec
1039    }
1040}
1041
1042impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1043    fn recovered_block<H, HF, B, BF>(
1044        &self,
1045        id: BlockHashOrNumber,
1046        _transaction_kind: TransactionVariant,
1047        header_by_number: HF,
1048        construct_block: BF,
1049    ) -> ProviderResult<Option<B>>
1050    where
1051        H: AsRef<HeaderTy<N>>,
1052        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1053        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1054    {
1055        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1056        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1057
1058        // Get the block body
1059        //
1060        // If the body indices are not found, this means that the transactions either do not exist
1061        // in the database yet, or they do exit but are not indexed. If they exist but are not
1062        // indexed, we don't have enough information to return the block anyways, so we return
1063        // `None`.
1064        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1065
1066        let tx_range = body.tx_num_range();
1067
1068        let transactions = if tx_range.is_empty() {
1069            vec![]
1070        } else {
1071            self.transactions_by_tx_range(tx_range.clone())?
1072        };
1073
1074        let body = self
1075            .storage
1076            .reader()
1077            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1078            .pop()
1079            .ok_or(ProviderError::InvalidStorageOutput)?;
1080
1081        let senders = if tx_range.is_empty() {
1082            vec![]
1083        } else {
1084            let known_senders: HashMap<TxNumber, Address> =
1085                EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1086
1087            let mut senders = Vec::with_capacity(body.transactions().len());
1088            for (tx_num, tx) in tx_range.zip(body.transactions()) {
1089                match known_senders.get(&tx_num) {
1090                    None => {
1091                        let sender = tx.recover_signer_unchecked()?;
1092                        senders.push(sender);
1093                    }
1094                    Some(sender) => senders.push(*sender),
1095                }
1096            }
1097            senders
1098        };
1099
1100        construct_block(header, body, senders)
1101    }
1102
1103    /// Returns a range of blocks from the database.
1104    ///
1105    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1106    /// construct blocks from the following inputs:
1107    ///     – Header
1108    ///     - Range of transaction numbers
1109    ///     – Ommers
1110    ///     – Withdrawals
1111    ///     – Senders
1112    fn block_range<F, H, HF, R>(
1113        &self,
1114        range: RangeInclusive<BlockNumber>,
1115        headers_range: HF,
1116        mut assemble_block: F,
1117    ) -> ProviderResult<Vec<R>>
1118    where
1119        H: AsRef<HeaderTy<N>>,
1120        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1121        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1122    {
1123        if range.is_empty() {
1124            return Ok(Vec::new())
1125        }
1126
1127        let len = range.end().saturating_sub(*range.start()) as usize;
1128        let mut blocks = Vec::with_capacity(len);
1129
1130        let headers = headers_range(range.clone())?;
1131
1132        // If the body indices are not found, this means that the transactions either do
1133        // not exist in the database yet, or they do exit but are
1134        // not indexed. If they exist but are not indexed, we don't
1135        // have enough information to return the block anyways, so
1136        // we skip the block.
1137        let present_headers = self
1138            .block_body_indices_range(range)?
1139            .into_iter()
1140            .map(|b| b.tx_num_range())
1141            .zip(headers)
1142            .collect::<Vec<_>>();
1143
1144        let mut inputs = Vec::with_capacity(present_headers.len());
1145        for (tx_range, header) in &present_headers {
1146            let transactions = if tx_range.is_empty() {
1147                Vec::new()
1148            } else {
1149                self.transactions_by_tx_range(tx_range.clone())?
1150            };
1151
1152            inputs.push((header.as_ref(), transactions));
1153        }
1154
1155        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1156
1157        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1158            blocks.push(assemble_block(header, body, tx_range)?);
1159        }
1160
1161        Ok(blocks)
1162    }
1163
1164    /// Returns a range of blocks from the database, along with the senders of each
1165    /// transaction in the blocks.
1166    ///
1167    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1168    /// construct blocks from the following inputs:
1169    ///     – Header
1170    ///     - Transactions
1171    ///     – Ommers
1172    ///     – Withdrawals
1173    ///     – Senders
1174    fn block_with_senders_range<H, HF, B, BF>(
1175        &self,
1176        range: RangeInclusive<BlockNumber>,
1177        headers_range: HF,
1178        assemble_block: BF,
1179    ) -> ProviderResult<Vec<B>>
1180    where
1181        H: AsRef<HeaderTy<N>>,
1182        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1183        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1184    {
1185        self.block_range(range, headers_range, |header, body, tx_range| {
1186            let senders = if tx_range.is_empty() {
1187                Vec::new()
1188            } else {
1189                let known_senders: HashMap<TxNumber, Address> =
1190                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1191
1192                let mut senders = Vec::with_capacity(body.transactions().len());
1193                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1194                    match known_senders.get(&tx_num) {
1195                        None => {
1196                            // recover the sender from the transaction if not found
1197                            let sender = tx.recover_signer_unchecked()?;
1198                            senders.push(sender);
1199                        }
1200                        Some(sender) => senders.push(*sender),
1201                    }
1202                }
1203
1204                senders
1205            };
1206
1207            assemble_block(header, body, senders)
1208        })
1209    }
1210
1211    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1212    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
1213    /// account changesets.
1214    fn populate_bundle_state<A, S>(
1215        &self,
1216        account_changeset: Vec<(u64, AccountBeforeTx)>,
1217        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1218        plain_accounts_cursor: &mut A,
1219        plain_storage_cursor: &mut S,
1220    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1221    where
1222        A: DbCursorRO<PlainAccountState>,
1223        S: DbDupCursorRO<PlainStorageState>,
1224    {
1225        // iterate previous value and get plain state value to create changeset
1226        // Double option around Account represent if Account state is know (first option) and
1227        // account is removed (Second Option)
1228        let mut state: BundleStateInit = HashMap::default();
1229
1230        // This is not working for blocks that are not at tip. as plain state is not the last
1231        // state of end range. We should rename the functions or add support to access
1232        // History state. Accessing history state can be tricky but we are not gaining
1233        // anything.
1234
1235        let mut reverts: RevertsInit = HashMap::default();
1236
1237        // add account changeset changes
1238        for (block_number, account_before) in account_changeset.into_iter().rev() {
1239            let AccountBeforeTx { info: old_info, address } = account_before;
1240            match state.entry(address) {
1241                hash_map::Entry::Vacant(entry) => {
1242                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1243                    entry.insert((old_info, new_info, HashMap::default()));
1244                }
1245                hash_map::Entry::Occupied(mut entry) => {
1246                    // overwrite old account state.
1247                    entry.get_mut().0 = old_info;
1248                }
1249            }
1250            // insert old info into reverts.
1251            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1252        }
1253
1254        // add storage changeset changes
1255        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1256            let BlockNumberAddress((block_number, address)) = block_and_address;
1257            // get account state or insert from plain state.
1258            let account_state = match state.entry(address) {
1259                hash_map::Entry::Vacant(entry) => {
1260                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1261                    entry.insert((present_info, present_info, HashMap::default()))
1262                }
1263                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1264            };
1265
1266            // match storage.
1267            match account_state.2.entry(old_storage.key) {
1268                hash_map::Entry::Vacant(entry) => {
1269                    let new_storage = plain_storage_cursor
1270                        .seek_by_key_subkey(address, old_storage.key)?
1271                        .filter(|storage| storage.key == old_storage.key)
1272                        .unwrap_or_default();
1273                    entry.insert((old_storage.value, new_storage.value));
1274                }
1275                hash_map::Entry::Occupied(mut entry) => {
1276                    entry.get_mut().0 = old_storage.value;
1277                }
1278            };
1279
1280            reverts
1281                .entry(block_number)
1282                .or_default()
1283                .entry(address)
1284                .or_default()
1285                .1
1286                .push(old_storage);
1287        }
1288
1289        Ok((state, reverts))
1290    }
1291
1292    /// Like [`populate_bundle_state`](Self::populate_bundle_state), but reads current values from
1293    /// `HashedAccounts`/`HashedStorages`. Addresses and storage keys are hashed via `keccak256`
1294    /// for DB lookups. The output `BundleStateInit`/`RevertsInit` structures remain keyed by
1295    /// plain address and plain storage key.
1296    fn populate_bundle_state_hashed(
1297        &self,
1298        account_changeset: Vec<(u64, AccountBeforeTx)>,
1299        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1300        hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1301        hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1302    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1303        let mut state: BundleStateInit = HashMap::default();
1304        let mut reverts: RevertsInit = HashMap::default();
1305
1306        // add account changeset changes
1307        for (block_number, account_before) in account_changeset.into_iter().rev() {
1308            let AccountBeforeTx { info: old_info, address } = account_before;
1309            match state.entry(address) {
1310                hash_map::Entry::Vacant(entry) => {
1311                    let hashed_address = keccak256(address);
1312                    let new_info =
1313                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1314                    entry.insert((old_info, new_info, HashMap::default()));
1315                }
1316                hash_map::Entry::Occupied(mut entry) => {
1317                    entry.get_mut().0 = old_info;
1318                }
1319            }
1320            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1321        }
1322
1323        // add storage changeset changes
1324        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1325            let BlockNumberAddress((block_number, address)) = block_and_address;
1326            let account_state = match state.entry(address) {
1327                hash_map::Entry::Vacant(entry) => {
1328                    let hashed_address = keccak256(address);
1329                    let present_info =
1330                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1331                    entry.insert((present_info, present_info, HashMap::default()))
1332                }
1333                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1334            };
1335
1336            // Storage keys in changesets are plain; hash them for HashedStorages lookup.
1337            let hashed_storage_key = keccak256(old_storage.key);
1338            match account_state.2.entry(old_storage.key) {
1339                hash_map::Entry::Vacant(entry) => {
1340                    let hashed_address = keccak256(address);
1341                    let new_storage = hashed_storage_cursor
1342                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
1343                        .filter(|storage| storage.key == hashed_storage_key)
1344                        .unwrap_or_default();
1345                    entry.insert((old_storage.value, new_storage.value));
1346                }
1347                hash_map::Entry::Occupied(mut entry) => {
1348                    entry.get_mut().0 = old_storage.value;
1349                }
1350            };
1351
1352            reverts
1353                .entry(block_number)
1354                .or_default()
1355                .entry(address)
1356                .or_default()
1357                .1
1358                .push(old_storage);
1359        }
1360
1361        Ok((state, reverts))
1362    }
1363}
1364
1365impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1366    /// Insert history index to the database.
1367    ///
1368    /// For each updated partial key, this function retrieves the last shard from the database
1369    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1370    /// the shards back into the database.
1371    ///
1372    /// This function is used by history indexing stages.
1373    fn append_history_index<P, T>(
1374        &self,
1375        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1376        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1377    ) -> ProviderResult<()>
1378    where
1379        P: Copy,
1380        T: Table<Value = BlockNumberList>,
1381    {
1382        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1383        // will append duplicate entries instead of updating existing ones, causing data corruption.
1384        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1385
1386        let mut cursor = self.tx.cursor_write::<T>()?;
1387
1388        for (partial_key, indices) in index_updates {
1389            let last_key = sharded_key_factory(partial_key, u64::MAX);
1390            let mut last_shard = cursor
1391                .seek_exact(last_key.clone())?
1392                .map(|(_, list)| list)
1393                .unwrap_or_else(BlockNumberList::empty);
1394
1395            last_shard.append(indices).map_err(ProviderError::other)?;
1396
1397            // fast path: all indices fit in one shard
1398            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1399                cursor.upsert(last_key, &last_shard)?;
1400                continue;
1401            }
1402
1403            // slow path: rechunk into multiple shards
1404            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1405            let mut chunks_peekable = chunks.into_iter().peekable();
1406
1407            while let Some(chunk) = chunks_peekable.next() {
1408                let shard = BlockNumberList::new_pre_sorted(chunk);
1409                let highest_block_number = if chunks_peekable.peek().is_some() {
1410                    shard.iter().next_back().expect("`chunks` does not return empty list")
1411                } else {
1412                    // Insert last list with `u64::MAX`.
1413                    u64::MAX
1414                };
1415
1416                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1417            }
1418        }
1419
1420        Ok(())
1421    }
1422}
1423
1424impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1425    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1426        if self.cached_storage_settings().use_hashed_state() {
1427            let hashed_address = keccak256(address);
1428            Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1429        } else {
1430            Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1431        }
1432    }
1433}
1434
1435impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1436    fn changed_accounts_with_range(
1437        &self,
1438        range: RangeInclusive<BlockNumber>,
1439    ) -> ProviderResult<BTreeSet<Address>> {
1440        let mut reader = EitherReader::new_account_changesets(self)?;
1441
1442        reader.changed_accounts_with_range(range)
1443    }
1444
1445    fn basic_accounts(
1446        &self,
1447        iter: impl IntoIterator<Item = Address>,
1448    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1449        if self.cached_storage_settings().use_hashed_state() {
1450            let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1451            Ok(iter
1452                .into_iter()
1453                .map(|address| {
1454                    let hashed_address = keccak256(address);
1455                    hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1456                })
1457                .collect::<Result<Vec<_>, _>>()?)
1458        } else {
1459            let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1460            Ok(iter
1461                .into_iter()
1462                .map(|address| {
1463                    plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1464                })
1465                .collect::<Result<Vec<_>, _>>()?)
1466        }
1467    }
1468
1469    fn changed_accounts_and_blocks_with_range(
1470        &self,
1471        range: RangeInclusive<BlockNumber>,
1472    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1473        let highest_static_block = self
1474            .static_file_provider
1475            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1476
1477        if let Some(highest) = highest_static_block &&
1478            self.cached_storage_settings().storage_v2
1479        {
1480            let start = *range.start();
1481            let static_end = (*range.end()).min(highest);
1482
1483            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1484            if start <= static_end {
1485                for block in start..=static_end {
1486                    let block_changesets = self.account_block_changeset(block)?;
1487                    for changeset in block_changesets {
1488                        changed_accounts_and_blocks
1489                            .entry(changeset.address)
1490                            .or_default()
1491                            .push(block);
1492                    }
1493                }
1494            }
1495
1496            Ok(changed_accounts_and_blocks)
1497        } else {
1498            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1499
1500            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1501                BTreeMap::new(),
1502                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1503                    let (index, account) = entry?;
1504                    accounts.entry(account.address).or_default().push(index);
1505                    Ok(accounts)
1506                },
1507            )?;
1508
1509            Ok(account_transitions)
1510        }
1511    }
1512}
1513
1514impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1515    fn storage_changeset(
1516        &self,
1517        block_number: BlockNumber,
1518    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1519        if self.cached_storage_settings().storage_v2 {
1520            self.static_file_provider.storage_changeset(block_number)
1521        } else {
1522            let range = block_number..=block_number;
1523            let storage_range = BlockNumberAddress::range(range);
1524            self.tx
1525                .cursor_dup_read::<tables::StorageChangeSets>()?
1526                .walk_range(storage_range)?
1527                .map(|r| {
1528                    let (bna, entry) = r?;
1529                    Ok((bna, entry))
1530                })
1531                .collect()
1532        }
1533    }
1534
1535    fn get_storage_before_block(
1536        &self,
1537        block_number: BlockNumber,
1538        address: Address,
1539        storage_key: B256,
1540    ) -> ProviderResult<Option<StorageEntry>> {
1541        if self.cached_storage_settings().storage_v2 {
1542            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1543        } else {
1544            Ok(self
1545                .tx
1546                .cursor_dup_read::<tables::StorageChangeSets>()?
1547                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1548                .filter(|entry| entry.key == storage_key))
1549        }
1550    }
1551
1552    fn storage_changesets_range(
1553        &self,
1554        range: impl RangeBounds<BlockNumber>,
1555    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1556        if self.cached_storage_settings().storage_v2 {
1557            self.static_file_provider.storage_changesets_range(range)
1558        } else {
1559            self.tx
1560                .cursor_dup_read::<tables::StorageChangeSets>()?
1561                .walk_range(BlockNumberAddressRange::from(range))?
1562                .map(|r| {
1563                    let (bna, entry) = r?;
1564                    Ok((bna, entry))
1565                })
1566                .collect()
1567        }
1568    }
1569
1570    fn storage_changeset_count(&self) -> ProviderResult<usize> {
1571        if self.cached_storage_settings().storage_v2 {
1572            self.static_file_provider.storage_changeset_count()
1573        } else {
1574            Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1575        }
1576    }
1577}
1578
1579impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1580    fn account_block_changeset(
1581        &self,
1582        block_number: BlockNumber,
1583    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1584        if self.cached_storage_settings().storage_v2 {
1585            let static_changesets =
1586                self.static_file_provider.account_block_changeset(block_number)?;
1587            Ok(static_changesets)
1588        } else {
1589            let range = block_number..=block_number;
1590            self.tx
1591                .cursor_read::<tables::AccountChangeSets>()?
1592                .walk_range(range)?
1593                .map(|result| -> ProviderResult<_> {
1594                    let (_, account_before) = result?;
1595                    Ok(account_before)
1596                })
1597                .collect()
1598        }
1599    }
1600
1601    fn get_account_before_block(
1602        &self,
1603        block_number: BlockNumber,
1604        address: Address,
1605    ) -> ProviderResult<Option<AccountBeforeTx>> {
1606        if self.cached_storage_settings().storage_v2 {
1607            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1608        } else {
1609            self.tx
1610                .cursor_dup_read::<tables::AccountChangeSets>()?
1611                .seek_by_key_subkey(block_number, address)?
1612                .filter(|acc| acc.address == address)
1613                .map(Ok)
1614                .transpose()
1615        }
1616    }
1617
1618    fn account_changesets_range(
1619        &self,
1620        range: impl core::ops::RangeBounds<BlockNumber>,
1621    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1622        if self.cached_storage_settings().storage_v2 {
1623            self.static_file_provider.account_changesets_range(range)
1624        } else {
1625            self.tx
1626                .cursor_read::<tables::AccountChangeSets>()?
1627                .walk_range(to_range(range))?
1628                .map(|r| r.map_err(Into::into))
1629                .collect()
1630        }
1631    }
1632
1633    fn account_changeset_count(&self) -> ProviderResult<usize> {
1634        // check if account changesets are in static files, otherwise just count the changeset
1635        // entries in the DB
1636        if self.cached_storage_settings().storage_v2 {
1637            self.static_file_provider.account_changeset_count()
1638        } else {
1639            Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1640        }
1641    }
1642}
1643
1644impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1645    for DatabaseProvider<TX, N>
1646{
1647    type Header = HeaderTy<N>;
1648
1649    fn local_tip_header(
1650        &self,
1651        highest_uninterrupted_block: BlockNumber,
1652    ) -> ProviderResult<SealedHeader<Self::Header>> {
1653        let static_file_provider = self.static_file_provider();
1654
1655        // Make sure Headers static file is at the same height. If it's further, this
1656        // input execution was interrupted previously and we need to unwind the static file.
1657        let next_static_file_block_num = static_file_provider
1658            .get_highest_static_file_block(StaticFileSegment::Headers)
1659            .map(|id| id + 1)
1660            .unwrap_or_default();
1661        let next_block = highest_uninterrupted_block + 1;
1662
1663        match next_static_file_block_num.cmp(&next_block) {
1664            // The node shutdown between an executed static file commit and before the database
1665            // commit, so we need to unwind the static files.
1666            Ordering::Greater => {
1667                let mut static_file_producer =
1668                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1669                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1670                // Since this is a database <-> static file inconsistency, we commit the change
1671                // straight away.
1672                static_file_producer.commit()?
1673            }
1674            Ordering::Less => {
1675                // There's either missing or corrupted files.
1676                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1677            }
1678            Ordering::Equal => {}
1679        }
1680
1681        let local_head = static_file_provider
1682            .sealed_header(highest_uninterrupted_block)?
1683            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1684
1685        Ok(local_head)
1686    }
1687}
1688
1689impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1690    type Header = HeaderTy<N>;
1691
1692    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1693        if let Some(num) = self.block_number(block_hash)? {
1694            Ok(self.header_by_number(num)?)
1695        } else {
1696            Ok(None)
1697        }
1698    }
1699
1700    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1701        self.static_file_provider.header_by_number(num)
1702    }
1703
1704    fn headers_range(
1705        &self,
1706        range: impl RangeBounds<BlockNumber>,
1707    ) -> ProviderResult<Vec<Self::Header>> {
1708        self.static_file_provider.headers_range(range)
1709    }
1710
1711    fn sealed_header(
1712        &self,
1713        number: BlockNumber,
1714    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1715        self.static_file_provider.sealed_header(number)
1716    }
1717
1718    fn sealed_headers_while(
1719        &self,
1720        range: impl RangeBounds<BlockNumber>,
1721        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1722    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1723        self.static_file_provider.sealed_headers_while(range, predicate)
1724    }
1725}
1726
1727impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1728    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1729        self.static_file_provider.block_hash(number)
1730    }
1731
1732    fn canonical_hashes_range(
1733        &self,
1734        start: BlockNumber,
1735        end: BlockNumber,
1736    ) -> ProviderResult<Vec<B256>> {
1737        self.static_file_provider.canonical_hashes_range(start, end)
1738    }
1739}
1740
1741impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1742    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1743        let best_number = self.best_block_number()?;
1744        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1745        Ok(ChainInfo { best_hash, best_number })
1746    }
1747
1748    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1749        // The best block number is tracked via the finished stage which gets updated in the same tx
1750        // when new blocks committed
1751        Ok(self
1752            .get_stage_checkpoint(StageId::Finish)?
1753            .map(|checkpoint| checkpoint.block_number)
1754            .unwrap_or_default())
1755    }
1756
1757    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1758        self.static_file_provider.last_block_number()
1759    }
1760
1761    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1762        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1763    }
1764}
1765
1766impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1767    type Block = BlockTy<N>;
1768
1769    fn find_block_by_hash(
1770        &self,
1771        hash: B256,
1772        source: BlockSource,
1773    ) -> ProviderResult<Option<Self::Block>> {
1774        if source.is_canonical() {
1775            self.block(hash.into())
1776        } else {
1777            Ok(None)
1778        }
1779    }
1780
1781    /// Returns the block with matching number from database.
1782    ///
1783    /// If the header for this block is not found, this returns `None`.
1784    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1785    /// will return None.
1786    ///
1787    /// Returns an error if the requested block is below the earliest available history.
1788    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1789        if let Some(number) = self.convert_hash_or_number(id)? {
1790            let earliest_available = self.static_file_provider.earliest_history_height();
1791            if number < earliest_available {
1792                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1793            }
1794
1795            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1796
1797            // If the body indices are not found, this means that the transactions either do not
1798            // exist in the database yet, or they do exit but are not indexed.
1799            // If they exist but are not indexed, we don't have enough
1800            // information to return the block anyways, so we return `None`.
1801            let Some(transactions) = self.transactions_by_block(number.into())? else {
1802                return Ok(None)
1803            };
1804
1805            let body = self
1806                .storage
1807                .reader()
1808                .read_block_bodies(self, vec![(&header, transactions)])?
1809                .pop()
1810                .ok_or(ProviderError::InvalidStorageOutput)?;
1811
1812            return Ok(Some(Self::Block::new(header, body)))
1813        }
1814
1815        Ok(None)
1816    }
1817
1818    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1819        Ok(None)
1820    }
1821
1822    fn pending_block_and_receipts(
1823        &self,
1824    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1825        Ok(None)
1826    }
1827
1828    /// Returns the block with senders with matching number or hash from database.
1829    ///
1830    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1831    /// spot, and we want fast querying.**
1832    ///
1833    /// If the header for this block is not found, this returns `None`.
1834    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1835    /// will return None.
1836    fn recovered_block(
1837        &self,
1838        id: BlockHashOrNumber,
1839        transaction_kind: TransactionVariant,
1840    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1841        self.recovered_block(
1842            id,
1843            transaction_kind,
1844            |block_number| self.header_by_number(block_number),
1845            |header, body, senders| {
1846                Self::Block::new(header, body)
1847                    // Note: we're using unchecked here because we know the block contains valid txs
1848                    // wrt to its height and can ignore the s value check so pre
1849                    // EIP-2 txs are allowed
1850                    .try_into_recovered_unchecked(senders)
1851                    .map(Some)
1852                    .map_err(|_| ProviderError::SenderRecoveryError)
1853            },
1854        )
1855    }
1856
1857    fn sealed_block_with_senders(
1858        &self,
1859        id: BlockHashOrNumber,
1860        transaction_kind: TransactionVariant,
1861    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1862        self.recovered_block(
1863            id,
1864            transaction_kind,
1865            |block_number| self.sealed_header(block_number),
1866            |header, body, senders| {
1867                Self::Block::new_sealed(header, body)
1868                    // Note: we're using unchecked here because we know the block contains valid txs
1869                    // wrt to its height and can ignore the s value check so pre
1870                    // EIP-2 txs are allowed
1871                    .try_with_senders_unchecked(senders)
1872                    .map(Some)
1873                    .map_err(|_| ProviderError::SenderRecoveryError)
1874            },
1875        )
1876    }
1877
1878    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1879        self.block_range(
1880            range,
1881            |range| self.headers_range(range),
1882            |header, body, _| Ok(Self::Block::new(header, body)),
1883        )
1884    }
1885
1886    fn block_with_senders_range(
1887        &self,
1888        range: RangeInclusive<BlockNumber>,
1889    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1890        self.block_with_senders_range(
1891            range,
1892            |range| self.headers_range(range),
1893            |header, body, senders| {
1894                Self::Block::new(header, body)
1895                    .try_into_recovered_unchecked(senders)
1896                    .map_err(|_| ProviderError::SenderRecoveryError)
1897            },
1898        )
1899    }
1900
1901    fn recovered_block_range(
1902        &self,
1903        range: RangeInclusive<BlockNumber>,
1904    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1905        self.block_with_senders_range(
1906            range,
1907            |range| self.sealed_headers_range(range),
1908            |header, body, senders| {
1909                Self::Block::new_sealed(header, body)
1910                    .try_with_senders(senders)
1911                    .map_err(|_| ProviderError::SenderRecoveryError)
1912            },
1913        )
1914    }
1915
1916    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1917        Ok(self
1918            .tx
1919            .cursor_read::<tables::TransactionBlocks>()?
1920            .seek(id)
1921            .map(|b| b.map(|(_, bn)| bn))?)
1922    }
1923}
1924
1925impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1926    for DatabaseProvider<TX, N>
1927{
1928    /// Recovers transaction hashes by walking through `Transactions` table and
1929    /// calculating them in a parallel manner. Returned unsorted.
1930    fn transaction_hashes_by_range(
1931        &self,
1932        tx_range: Range<TxNumber>,
1933    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1934        self.static_file_provider.transaction_hashes_by_range(tx_range)
1935    }
1936}
1937
1938// Calculates the hash of the given transaction
1939impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1940    type Transaction = TxTy<N>;
1941
1942    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1943        self.with_rocksdb_tx(|tx_ref| {
1944            let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1945            reader.get_transaction_hash_number(tx_hash)
1946        })
1947    }
1948
1949    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1950        self.static_file_provider.transaction_by_id(id)
1951    }
1952
1953    fn transaction_by_id_unhashed(
1954        &self,
1955        id: TxNumber,
1956    ) -> ProviderResult<Option<Self::Transaction>> {
1957        self.static_file_provider.transaction_by_id_unhashed(id)
1958    }
1959
1960    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1961        if let Some(id) = self.transaction_id(hash)? {
1962            Ok(self.transaction_by_id_unhashed(id)?)
1963        } else {
1964            Ok(None)
1965        }
1966    }
1967
1968    fn transaction_by_hash_with_meta(
1969        &self,
1970        tx_hash: TxHash,
1971    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1972        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1973            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1974            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1975            let Some(sealed_header) = self.sealed_header(block_number)?
1976        {
1977            let (header, block_hash) = sealed_header.split();
1978            if let Some(block_body) = self.block_body_indices(block_number)? {
1979                // the index of the tx in the block is the offset:
1980                // len([start..tx_id])
1981                // NOTE: `transaction_id` is always `>=` the block's first
1982                // index
1983                let index = transaction_id - block_body.first_tx_num();
1984
1985                let meta = TransactionMeta {
1986                    tx_hash,
1987                    index,
1988                    block_hash,
1989                    block_number,
1990                    base_fee: header.base_fee_per_gas(),
1991                    excess_blob_gas: header.excess_blob_gas(),
1992                    timestamp: header.timestamp(),
1993                };
1994
1995                return Ok(Some((transaction, meta)))
1996            }
1997        }
1998
1999        Ok(None)
2000    }
2001
2002    fn transactions_by_block(
2003        &self,
2004        id: BlockHashOrNumber,
2005    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2006        if let Some(block_number) = self.convert_hash_or_number(id)? &&
2007            let Some(body) = self.block_body_indices(block_number)?
2008        {
2009            let tx_range = body.tx_num_range();
2010            return if tx_range.is_empty() {
2011                Ok(Some(Vec::new()))
2012            } else {
2013                self.transactions_by_tx_range(tx_range).map(Some)
2014            }
2015        }
2016        Ok(None)
2017    }
2018
2019    fn transactions_by_block_range(
2020        &self,
2021        range: impl RangeBounds<BlockNumber>,
2022    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2023        let range = to_range(range);
2024
2025        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2026            .into_iter()
2027            .map(|body| {
2028                let tx_num_range = body.tx_num_range();
2029                if tx_num_range.is_empty() {
2030                    Ok(Vec::new())
2031                } else {
2032                    self.transactions_by_tx_range(tx_num_range)
2033                }
2034            })
2035            .collect()
2036    }
2037
2038    fn transactions_by_tx_range(
2039        &self,
2040        range: impl RangeBounds<TxNumber>,
2041    ) -> ProviderResult<Vec<Self::Transaction>> {
2042        self.static_file_provider.transactions_by_tx_range(range)
2043    }
2044
2045    fn senders_by_tx_range(
2046        &self,
2047        range: impl RangeBounds<TxNumber>,
2048    ) -> ProviderResult<Vec<Address>> {
2049        if EitherWriterDestination::senders(self).is_static_file() {
2050            self.static_file_provider.senders_by_tx_range(range)
2051        } else {
2052            self.cursor_read_collect::<tables::TransactionSenders>(range)
2053        }
2054    }
2055
2056    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2057        if EitherWriterDestination::senders(self).is_static_file() {
2058            self.static_file_provider.transaction_sender(id)
2059        } else {
2060            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2061        }
2062    }
2063}
2064
2065impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2066    type Receipt = ReceiptTy<N>;
2067
2068    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2069        self.static_file_provider.get_with_static_file_or_database(
2070            StaticFileSegment::Receipts,
2071            id,
2072            |static_file| static_file.receipt(id),
2073            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2074        )
2075    }
2076
2077    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2078        if let Some(id) = self.transaction_id(hash)? {
2079            self.receipt(id)
2080        } else {
2081            Ok(None)
2082        }
2083    }
2084
2085    fn receipts_by_block(
2086        &self,
2087        block: BlockHashOrNumber,
2088    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2089        if let Some(number) = self.convert_hash_or_number(block)? &&
2090            let Some(body) = self.block_body_indices(number)?
2091        {
2092            let tx_range = body.tx_num_range();
2093            return if tx_range.is_empty() {
2094                Ok(Some(Vec::new()))
2095            } else {
2096                self.receipts_by_tx_range(tx_range).map(Some)
2097            }
2098        }
2099        Ok(None)
2100    }
2101
2102    fn receipts_by_tx_range(
2103        &self,
2104        range: impl RangeBounds<TxNumber>,
2105    ) -> ProviderResult<Vec<Self::Receipt>> {
2106        self.static_file_provider.get_range_with_static_file_or_database(
2107            StaticFileSegment::Receipts,
2108            to_range(range),
2109            |static_file, range, _| static_file.receipts_by_tx_range(range),
2110            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2111            |_| true,
2112        )
2113    }
2114
2115    fn receipts_by_block_range(
2116        &self,
2117        block_range: RangeInclusive<BlockNumber>,
2118    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2119        if block_range.is_empty() {
2120            return Ok(Vec::new());
2121        }
2122
2123        // collect block body indices for each block in the range
2124        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2125        let mut block_body_indices = Vec::with_capacity(range_len);
2126        for block_num in block_range {
2127            if let Some(indices) = self.block_body_indices(block_num)? {
2128                block_body_indices.push(indices);
2129            } else {
2130                // use default indices for missing blocks (empty block)
2131                block_body_indices.push(StoredBlockBodyIndices::default());
2132            }
2133        }
2134
2135        if block_body_indices.is_empty() {
2136            return Ok(Vec::new());
2137        }
2138
2139        // find blocks with transactions to determine transaction range
2140        let non_empty_blocks: Vec<_> =
2141            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2142
2143        if non_empty_blocks.is_empty() {
2144            // all blocks are empty
2145            return Ok(vec![Vec::new(); block_body_indices.len()]);
2146        }
2147
2148        // calculate the overall transaction range
2149        let first_tx = non_empty_blocks[0].first_tx_num();
2150        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2151
2152        // fetch all receipts in the transaction range
2153        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2154        let mut receipts_iter = all_receipts.into_iter();
2155
2156        // distribute receipts to their respective blocks
2157        let mut result = Vec::with_capacity(block_body_indices.len());
2158        for indices in &block_body_indices {
2159            if indices.tx_count == 0 {
2160                result.push(Vec::new());
2161            } else {
2162                let block_receipts =
2163                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2164                result.push(block_receipts);
2165            }
2166        }
2167
2168        Ok(result)
2169    }
2170}
2171
2172impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2173    for DatabaseProvider<TX, N>
2174{
2175    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2176        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2177    }
2178
2179    fn block_body_indices_range(
2180        &self,
2181        range: RangeInclusive<BlockNumber>,
2182    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2183        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2184    }
2185}
2186
2187impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2188    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2189        Ok(if let Some(encoded) = id.get_pre_encoded() {
2190            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2191        } else {
2192            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2193        })
2194    }
2195
2196    /// Get stage checkpoint progress.
2197    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2198        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2199    }
2200
2201    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2202        self.tx
2203            .cursor_read::<tables::StageCheckpoints>()?
2204            .walk(None)?
2205            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2206            .map_err(ProviderError::Database)
2207    }
2208}
2209
2210impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2211    /// Save stage checkpoint.
2212    fn save_stage_checkpoint(
2213        &self,
2214        id: StageId,
2215        checkpoint: StageCheckpoint,
2216    ) -> ProviderResult<()> {
2217        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2218    }
2219
2220    /// Save stage checkpoint progress.
2221    fn save_stage_checkpoint_progress(
2222        &self,
2223        id: StageId,
2224        checkpoint: Vec<u8>,
2225    ) -> ProviderResult<()> {
2226        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2227    }
2228
2229    #[instrument(level = "debug", target = "providers::db", skip_all)]
2230    fn update_pipeline_stages(
2231        &self,
2232        block_number: BlockNumber,
2233        drop_stage_checkpoint: bool,
2234    ) -> ProviderResult<()> {
2235        // iterate over all existing stages in the table and update its progress.
2236        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2237        for stage_id in StageId::ALL {
2238            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2239            cursor.upsert(
2240                stage_id.to_string(),
2241                &StageCheckpoint {
2242                    block_number,
2243                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2244                },
2245            )?;
2246        }
2247
2248        Ok(())
2249    }
2250}
2251
2252impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2253    fn plain_state_storages(
2254        &self,
2255        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2256    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2257        if self.cached_storage_settings().use_hashed_state() {
2258            let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2259
2260            addresses_with_keys
2261                .into_iter()
2262                .map(|(address, storage)| {
2263                    let hashed_address = keccak256(address);
2264                    storage
2265                        .into_iter()
2266                        .map(|key| -> ProviderResult<_> {
2267                            let hashed_key = keccak256(key);
2268                            let value = hashed_storage
2269                                .seek_by_key_subkey(hashed_address, hashed_key)?
2270                                .filter(|v| v.key == hashed_key)
2271                                .map(|v| v.value)
2272                                .unwrap_or_default();
2273                            Ok(StorageEntry { key, value })
2274                        })
2275                        .collect::<ProviderResult<Vec<_>>>()
2276                        .map(|storage| (address, storage))
2277                })
2278                .collect::<ProviderResult<Vec<(_, _)>>>()
2279        } else {
2280            let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2281
2282            addresses_with_keys
2283                .into_iter()
2284                .map(|(address, storage)| {
2285                    storage
2286                        .into_iter()
2287                        .map(|key| -> ProviderResult<_> {
2288                            Ok(plain_storage
2289                                .seek_by_key_subkey(address, key)?
2290                                .filter(|v| v.key == key)
2291                                .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2292                        })
2293                        .collect::<ProviderResult<Vec<_>>>()
2294                        .map(|storage| (address, storage))
2295                })
2296                .collect::<ProviderResult<Vec<(_, _)>>>()
2297        }
2298    }
2299
2300    fn changed_storages_with_range(
2301        &self,
2302        range: RangeInclusive<BlockNumber>,
2303    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2304        if self.cached_storage_settings().storage_v2 {
2305            self.storage_changesets_range(range)?.into_iter().try_fold(
2306                BTreeMap::new(),
2307                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2308                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2309                    accounts.entry(address).or_default().insert(storage_entry.key);
2310                    Ok(accounts)
2311                },
2312            )
2313        } else {
2314            self.tx
2315                .cursor_read::<tables::StorageChangeSets>()?
2316                .walk_range(BlockNumberAddress::range(range))?
2317                // fold all storages and save its old state so we can remove it from HashedStorage
2318                // it is needed as it is dup table.
2319                .try_fold(
2320                    BTreeMap::new(),
2321                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2322                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2323                        accounts.entry(address).or_default().insert(storage_entry.key);
2324                        Ok(accounts)
2325                    },
2326                )
2327        }
2328    }
2329
2330    fn changed_storages_and_blocks_with_range(
2331        &self,
2332        range: RangeInclusive<BlockNumber>,
2333    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2334        if self.cached_storage_settings().storage_v2 {
2335            self.storage_changesets_range(range)?.into_iter().try_fold(
2336                BTreeMap::new(),
2337                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2338                    storages
2339                        .entry((index.address(), storage.key))
2340                        .or_default()
2341                        .push(index.block_number());
2342                    Ok(storages)
2343                },
2344            )
2345        } else {
2346            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2347
2348            let storage_changeset_lists =
2349                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2350                    BTreeMap::new(),
2351                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2352                     entry|
2353                     -> ProviderResult<_> {
2354                        let (index, storage) = entry?;
2355                        storages
2356                            .entry((index.address(), storage.key))
2357                            .or_default()
2358                            .push(index.block_number());
2359                        Ok(storages)
2360                    },
2361                )?;
2362
2363            Ok(storage_changeset_lists)
2364        }
2365    }
2366}
2367
2368impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2369    for DatabaseProvider<TX, N>
2370{
2371    type Receipt = ReceiptTy<N>;
2372
2373    #[instrument(level = "debug", target = "providers::db", skip_all)]
2374    fn write_state<'a>(
2375        &self,
2376        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2377        is_value_known: OriginalValuesKnown,
2378        config: StateWriteConfig,
2379    ) -> ProviderResult<()> {
2380        let execution_outcome = execution_outcome.into();
2381
2382        if self.cached_storage_settings().use_hashed_state() &&
2383            !config.write_receipts &&
2384            !config.write_account_changesets &&
2385            !config.write_storage_changesets
2386        {
2387            // In storage v2 with all outputs directed to static files, plain state and changesets
2388            // are written elsewhere. Only bytecodes need MDBX writes, so skip the expensive
2389            // to_plain_state_and_reverts conversion that iterates all accounts and storage.
2390            self.write_bytecodes(
2391                execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2392            )?;
2393            return Ok(());
2394        }
2395
2396        let first_block = execution_outcome.first_block();
2397        let (plain_state, reverts) =
2398            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2399
2400        self.write_state_reverts(reverts, first_block, config)?;
2401        self.write_state_changes(plain_state)?;
2402
2403        if !config.write_receipts {
2404            return Ok(());
2405        }
2406
2407        let block_count = execution_outcome.len() as u64;
2408        let last_block = execution_outcome.last_block();
2409        let block_range = first_block..=last_block;
2410
2411        let tip = self.last_block_number()?.max(last_block);
2412
2413        // Fetch the first transaction number for each block in the range
2414        let block_indices: Vec<_> = self
2415            .block_body_indices_range(block_range)?
2416            .into_iter()
2417            .map(|b| b.first_tx_num)
2418            .collect();
2419
2420        // Ensure all expected blocks are present.
2421        if block_indices.len() < block_count as usize {
2422            let missing_blocks = block_count - block_indices.len() as u64;
2423            return Err(ProviderError::BlockBodyIndicesNotFound(
2424                last_block.saturating_sub(missing_blocks - 1),
2425            ));
2426        }
2427
2428        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2429
2430        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2431        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2432
2433        // All receipts from the last 128 blocks are required for blockchain tree, even with
2434        // [`PruneSegment::ContractLogs`].
2435        //
2436        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2437        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2438        // files yet. Once receipts exist in static files, we must continue writing to maintain
2439        // continuity and have no gaps.
2440        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2441            self.static_file_provider()
2442                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2443                .is_none()) &&
2444            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2445
2446        // Prepare set of addresses which logs should not be pruned.
2447        let mut allowed_addresses: AddressSet = AddressSet::default();
2448        for (_, addresses) in contract_log_pruner.range(..first_block) {
2449            allowed_addresses.extend(addresses.iter().copied());
2450        }
2451
2452        for (idx, (receipts, first_tx_index)) in
2453            execution_outcome.receipts().zip(block_indices).enumerate()
2454        {
2455            let block_number = first_block + idx as u64;
2456
2457            // Increment block number for receipts static file writer
2458            receipts_writer.increment_block(block_number)?;
2459
2460            // Skip writing receipts if pruning configuration requires us to.
2461            if prunable_receipts &&
2462                self.prune_modes
2463                    .receipts
2464                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2465            {
2466                continue
2467            }
2468
2469            // If there are new addresses to retain after this block number, track them
2470            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2471                allowed_addresses.extend(new_addresses.iter().copied());
2472            }
2473
2474            for (idx, receipt) in receipts.iter().enumerate() {
2475                let receipt_idx = first_tx_index + idx as u64;
2476                // Skip writing receipt if log filter is active and it does not have any logs to
2477                // retain
2478                if prunable_receipts &&
2479                    has_contract_log_filter &&
2480                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2481                {
2482                    continue
2483                }
2484
2485                receipts_writer.append_receipt(receipt_idx, receipt)?;
2486            }
2487        }
2488
2489        Ok(())
2490    }
2491
2492    fn write_state_reverts(
2493        &self,
2494        reverts: PlainStateReverts,
2495        first_block: BlockNumber,
2496        config: StateWriteConfig,
2497    ) -> ProviderResult<()> {
2498        // Write storage changes
2499        if config.write_storage_changesets {
2500            tracing::trace!("Writing storage changes");
2501            let mut storages_cursor =
2502                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2503            for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2504                let block_number = first_block + block_index as BlockNumber;
2505
2506                tracing::trace!(block_number, "Writing block change");
2507                // sort changes by address.
2508                storage_changes.par_sort_unstable_by_key(|a| a.address);
2509                let total_changes =
2510                    storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2511                let mut changeset = Vec::with_capacity(total_changes);
2512                for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2513                    let mut storage = storage_revert
2514                        .into_iter()
2515                        .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2516                        .collect::<Vec<_>>();
2517                    // sort storage slots by key.
2518                    storage.par_sort_unstable_by_key(|a| a.0);
2519
2520                    // If we are writing the primary storage wipe transition, the pre-existing
2521                    // storage state has to be taken from the database and written to storage
2522                    // history. See [StorageWipe::Primary] for more details.
2523                    //
2524                    // TODO(mediocregopher): This could be rewritten in a way which doesn't
2525                    // require collecting wiped entries into a Vec like this, see
2526                    // `write_storage_trie_changesets`.
2527                    let mut wiped_storage = Vec::new();
2528                    if wiped {
2529                        tracing::trace!(?address, "Wiping storage");
2530                        if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2531                            wiped_storage.push((entry.key, entry.value));
2532                            while let Some(entry) = storages_cursor.next_dup_val()? {
2533                                wiped_storage.push((entry.key, entry.value))
2534                            }
2535                        }
2536                    }
2537
2538                    tracing::trace!(?address, ?storage, "Writing storage reverts");
2539                    for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2540                        changeset.push(StorageBeforeTx { address, key, value });
2541                    }
2542                }
2543
2544                let mut storage_changesets_writer =
2545                    EitherWriter::new_storage_changesets(self, block_number)?;
2546                storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2547            }
2548        }
2549
2550        if !config.write_account_changesets {
2551            return Ok(());
2552        }
2553
2554        // Write account changes
2555        tracing::trace!(?first_block, "Writing account changes");
2556        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2557            let block_number = first_block + block_index as BlockNumber;
2558            let changeset = account_block_reverts
2559                .into_iter()
2560                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2561                .collect::<Vec<_>>();
2562            let mut account_changesets_writer =
2563                EitherWriter::new_account_changesets(self, block_number)?;
2564
2565            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2566        }
2567
2568        Ok(())
2569    }
2570
2571    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2572        // sort all entries so they can be written to database in more performant way.
2573        // and take smaller memory footprint.
2574        changes.accounts.par_sort_by_key(|a| a.0);
2575        changes.storage.par_sort_by_key(|a| a.address);
2576        changes.contracts.par_sort_by_key(|a| a.0);
2577
2578        if !self.cached_storage_settings().use_hashed_state() {
2579            // Write new account state
2580            tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2581            let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2582            // write account to database.
2583            for (address, account) in changes.accounts {
2584                if let Some(account) = account {
2585                    tracing::trace!(?address, "Updating plain state account");
2586                    accounts_cursor.upsert(address, &account.into())?;
2587                } else if accounts_cursor.seek_exact(address)?.is_some() {
2588                    tracing::trace!(?address, "Deleting plain state account");
2589                    accounts_cursor.delete_current()?;
2590                }
2591            }
2592
2593            // Write new storage state and wipe storage if needed.
2594            tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2595            let mut storages_cursor =
2596                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2597            for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2598                // Wiping of storage.
2599                if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2600                    storages_cursor.delete_current_duplicates()?;
2601                }
2602                // cast storages to B256.
2603                let mut storage = storage
2604                    .into_iter()
2605                    .map(|(k, value)| StorageEntry { key: k.into(), value })
2606                    .collect::<Vec<_>>();
2607                // sort storage slots by key.
2608                storage.par_sort_unstable_by_key(|a| a.key);
2609
2610                for entry in storage {
2611                    tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2612                    if let Some(db_entry) =
2613                        storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2614                        db_entry.key == entry.key
2615                    {
2616                        storages_cursor.delete_current()?;
2617                    }
2618
2619                    if !entry.value.is_zero() {
2620                        storages_cursor.upsert(address, &entry)?;
2621                    }
2622                }
2623            }
2624        }
2625
2626        // Write bytecode
2627        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2628        self.write_bytecodes(
2629            changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2630        )?;
2631
2632        Ok(())
2633    }
2634
2635    #[instrument(level = "debug", target = "providers::db", skip_all)]
2636    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2637        // Write hashed account updates.
2638        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2639        for (hashed_address, account) in hashed_state.accounts() {
2640            if let Some(account) = account {
2641                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2642            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2643                hashed_accounts_cursor.delete_current()?;
2644            }
2645        }
2646
2647        // Write hashed storage changes.
2648        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2649        let mut hashed_storage_cursor =
2650            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2651        for (hashed_address, storage) in sorted_storages {
2652            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2653                hashed_storage_cursor.delete_current_duplicates()?;
2654            }
2655
2656            for (hashed_slot, value) in storage.storage_slots_ref() {
2657                let entry = StorageEntry { key: *hashed_slot, value: *value };
2658
2659                if let Some(db_entry) =
2660                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2661                    db_entry.key == entry.key
2662                {
2663                    hashed_storage_cursor.delete_current()?;
2664                }
2665
2666                if !entry.value.is_zero() {
2667                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2668                }
2669            }
2670        }
2671
2672        Ok(())
2673    }
2674
2675    /// Remove the last N blocks of state.
2676    ///
2677    /// The latest state will be unwound
2678    ///
2679    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2680    ///    transaction ids.
2681    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2682    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2683    ///    the changesets.
2684    ///    - In order to have both the old and new values in the changesets, we also access the
2685    ///      plain state tables.
2686    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2687    ///    we:
2688    ///     1. Take the old value from the changeset
2689    ///     2. Take the new value from the plain state
2690    ///     3. Save the old value to the local state
2691    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2692    ///    have seen before we:
2693    ///     1. Take the old value from the changeset
2694    ///     2. Take the new value from the local state
2695    ///     3. Set the local state to the value in the changeset
2696    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2697        let range = block + 1..=self.last_block_number()?;
2698
2699        if range.is_empty() {
2700            return Ok(());
2701        }
2702
2703        // We are not removing block meta as it is used to get block changesets.
2704        let block_bodies = self.block_body_indices_range(range.clone())?;
2705
2706        // get transaction receipts
2707        let from_transaction_num =
2708            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2709
2710        let storage_range = BlockNumberAddress::range(range.clone());
2711        let storage_changeset = if self.cached_storage_settings().storage_v2 {
2712            let changesets = self.storage_changesets_range(range.clone())?;
2713            let mut changeset_writer =
2714                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2715            changeset_writer.prune_storage_changesets(block)?;
2716            changesets
2717        } else {
2718            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2719        };
2720        let account_changeset = if self.cached_storage_settings().storage_v2 {
2721            let changesets = self.account_changesets_range(range)?;
2722            let mut changeset_writer =
2723                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2724            changeset_writer.prune_account_changesets(block)?;
2725            changesets
2726        } else {
2727            self.take::<tables::AccountChangeSets>(range)?
2728        };
2729
2730        if self.cached_storage_settings().use_hashed_state() {
2731            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2732            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2733
2734            let (state, _) = self.populate_bundle_state_hashed(
2735                account_changeset,
2736                storage_changeset,
2737                &mut hashed_accounts_cursor,
2738                &mut hashed_storage_cursor,
2739            )?;
2740
2741            for (address, (old_account, new_account, storage)) in &state {
2742                if old_account != new_account {
2743                    let hashed_address = keccak256(address);
2744                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2745                    if let Some(account) = old_account {
2746                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2747                    } else if existing_entry.is_some() {
2748                        hashed_accounts_cursor.delete_current()?;
2749                    }
2750                }
2751
2752                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2753                    let hashed_address = keccak256(address);
2754                    let hashed_storage_key = keccak256(storage_key);
2755                    let storage_entry =
2756                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2757                    if hashed_storage_cursor
2758                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2759                        .is_some_and(|s| s.key == hashed_storage_key)
2760                    {
2761                        hashed_storage_cursor.delete_current()?
2762                    }
2763
2764                    if !old_storage_value.is_zero() {
2765                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2766                    }
2767                }
2768            }
2769        } else {
2770            // This is not working for blocks that are not at tip. as plain state is not the last
2771            // state of end range. We should rename the functions or add support to access
2772            // History state. Accessing history state can be tricky but we are not gaining
2773            // anything.
2774            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2775            let mut plain_storage_cursor =
2776                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2777
2778            let (state, _) = self.populate_bundle_state(
2779                account_changeset,
2780                storage_changeset,
2781                &mut plain_accounts_cursor,
2782                &mut plain_storage_cursor,
2783            )?;
2784
2785            for (address, (old_account, new_account, storage)) in &state {
2786                if old_account != new_account {
2787                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2788                    if let Some(account) = old_account {
2789                        plain_accounts_cursor.upsert(*address, account)?;
2790                    } else if existing_entry.is_some() {
2791                        plain_accounts_cursor.delete_current()?;
2792                    }
2793                }
2794
2795                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2796                    let storage_entry =
2797                        StorageEntry { key: *storage_key, value: *old_storage_value };
2798                    if plain_storage_cursor
2799                        .seek_by_key_subkey(*address, *storage_key)?
2800                        .is_some_and(|s| s.key == *storage_key)
2801                    {
2802                        plain_storage_cursor.delete_current()?
2803                    }
2804
2805                    if !old_storage_value.is_zero() {
2806                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2807                    }
2808                }
2809            }
2810        }
2811
2812        self.remove_receipts_from(from_transaction_num, block)?;
2813
2814        Ok(())
2815    }
2816
2817    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2818    ///
2819    /// The latest state will be unwound and returned back with all the blocks
2820    ///
2821    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2822    ///    transaction ids.
2823    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2824    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2825    ///    the changesets.
2826    ///    - In order to have both the old and new values in the changesets, we also access the
2827    ///      plain state tables.
2828    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2829    ///    we:
2830    ///     1. Take the old value from the changeset
2831    ///     2. Take the new value from the plain state
2832    ///     3. Save the old value to the local state
2833    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2834    ///    have seen before we:
2835    ///     1. Take the old value from the changeset
2836    ///     2. Take the new value from the local state
2837    ///     3. Set the local state to the value in the changeset
2838    fn take_state_above(
2839        &self,
2840        block: BlockNumber,
2841    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2842        let range = block + 1..=self.last_block_number()?;
2843
2844        if range.is_empty() {
2845            return Ok(ExecutionOutcome::default())
2846        }
2847        let start_block_number = *range.start();
2848
2849        // We are not removing block meta as it is used to get block changesets.
2850        let block_bodies = self.block_body_indices_range(range.clone())?;
2851
2852        // get transaction receipts
2853        let from_transaction_num =
2854            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2855        let to_transaction_num =
2856            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2857
2858        let storage_range = BlockNumberAddress::range(range.clone());
2859        let storage_changeset = if let Some(highest_block) = self
2860            .static_file_provider
2861            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2862            self.cached_storage_settings().storage_v2
2863        {
2864            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2865            let mut changeset_writer =
2866                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2867            changeset_writer.prune_storage_changesets(block)?;
2868            changesets
2869        } else {
2870            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2871        };
2872
2873        // if there are static files for this segment, prune them.
2874        let highest_changeset_block = self
2875            .static_file_provider
2876            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2877        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2878            self.cached_storage_settings().storage_v2
2879        {
2880            // TODO: add a `take` method that removes and returns the items instead of doing this
2881            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2882            let mut changeset_writer =
2883                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2884            changeset_writer.prune_account_changesets(block)?;
2885
2886            changesets
2887        } else {
2888            // Have to remove from static files if they exist, otherwise remove using `take` for the
2889            // changeset tables
2890            self.take::<tables::AccountChangeSets>(range)?
2891        };
2892
2893        let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2894            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2895            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2896
2897            let (state, reverts) = self.populate_bundle_state_hashed(
2898                account_changeset,
2899                storage_changeset,
2900                &mut hashed_accounts_cursor,
2901                &mut hashed_storage_cursor,
2902            )?;
2903
2904            for (address, (old_account, new_account, storage)) in &state {
2905                if old_account != new_account {
2906                    let hashed_address = keccak256(address);
2907                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2908                    if let Some(account) = old_account {
2909                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2910                    } else if existing_entry.is_some() {
2911                        hashed_accounts_cursor.delete_current()?;
2912                    }
2913                }
2914
2915                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2916                    let hashed_address = keccak256(address);
2917                    let hashed_storage_key = keccak256(storage_key);
2918                    let storage_entry =
2919                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2920                    if hashed_storage_cursor
2921                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2922                        .is_some_and(|s| s.key == hashed_storage_key)
2923                    {
2924                        hashed_storage_cursor.delete_current()?
2925                    }
2926
2927                    if !old_storage_value.is_zero() {
2928                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2929                    }
2930                }
2931            }
2932
2933            (state, reverts)
2934        } else {
2935            // This is not working for blocks that are not at tip. as plain state is not the last
2936            // state of end range. We should rename the functions or add support to access
2937            // History state. Accessing history state can be tricky but we are not gaining
2938            // anything.
2939            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2940            let mut plain_storage_cursor =
2941                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2942
2943            let (state, reverts) = self.populate_bundle_state(
2944                account_changeset,
2945                storage_changeset,
2946                &mut plain_accounts_cursor,
2947                &mut plain_storage_cursor,
2948            )?;
2949
2950            for (address, (old_account, new_account, storage)) in &state {
2951                if old_account != new_account {
2952                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2953                    if let Some(account) = old_account {
2954                        plain_accounts_cursor.upsert(*address, account)?;
2955                    } else if existing_entry.is_some() {
2956                        plain_accounts_cursor.delete_current()?;
2957                    }
2958                }
2959
2960                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2961                    let storage_entry =
2962                        StorageEntry { key: *storage_key, value: *old_storage_value };
2963                    if plain_storage_cursor
2964                        .seek_by_key_subkey(*address, *storage_key)?
2965                        .is_some_and(|s| s.key == *storage_key)
2966                    {
2967                        plain_storage_cursor.delete_current()?
2968                    }
2969
2970                    if !old_storage_value.is_zero() {
2971                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2972                    }
2973                }
2974            }
2975
2976            (state, reverts)
2977        };
2978
2979        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2980        let mut receipts_iter = self
2981            .static_file_provider
2982            .get_range_with_static_file_or_database(
2983                StaticFileSegment::Receipts,
2984                from_transaction_num..to_transaction_num + 1,
2985                |static_file, range, _| {
2986                    static_file
2987                        .receipts_by_tx_range(range.clone())
2988                        .map(|r| range.into_iter().zip(r).collect())
2989                },
2990                |range, _| {
2991                    self.tx
2992                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2993                        .walk_range(range)?
2994                        .map(|r| r.map_err(Into::into))
2995                        .collect()
2996                },
2997                |_| true,
2998            )?
2999            .into_iter()
3000            .peekable();
3001
3002        let mut receipts = Vec::with_capacity(block_bodies.len());
3003        // loop break if we are at the end of the blocks.
3004        for block_body in block_bodies {
3005            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3006            for num in block_body.tx_num_range() {
3007                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3008                    block_receipts.push(receipts_iter.next().unwrap().1);
3009                }
3010            }
3011            receipts.push(block_receipts);
3012        }
3013
3014        self.remove_receipts_from(from_transaction_num, block)?;
3015
3016        Ok(ExecutionOutcome::new_init(
3017            state,
3018            reverts,
3019            Vec::new(),
3020            receipts,
3021            start_block_number,
3022            Vec::new(),
3023        ))
3024    }
3025}
3026
3027impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3028    fn write_account_trie_updates<A: TrieTableAdapter>(
3029        tx: &TX,
3030        trie_updates: &TrieUpdatesSorted,
3031        num_entries: &mut usize,
3032    ) -> ProviderResult<()>
3033    where
3034        TX: DbTxMut,
3035    {
3036        let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3037        // Process sorted account nodes
3038        for (key, updated_node) in trie_updates.account_nodes_ref() {
3039            let nibbles = A::AccountKey::from(*key);
3040            match updated_node {
3041                Some(node) => {
3042                    if !key.is_empty() {
3043                        *num_entries += 1;
3044                        account_trie_cursor.upsert(nibbles, node)?;
3045                    }
3046                }
3047                None => {
3048                    *num_entries += 1;
3049                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3050                        account_trie_cursor.delete_current()?;
3051                    }
3052                }
3053            }
3054        }
3055        Ok(())
3056    }
3057
3058    fn write_storage_tries<A: TrieTableAdapter>(
3059        tx: &TX,
3060        storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3061        num_entries: &mut usize,
3062    ) -> ProviderResult<()>
3063    where
3064        TX: DbTxMut,
3065    {
3066        let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3067        for (hashed_address, storage_trie_updates) in storage_tries {
3068            let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3069                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3070            *num_entries +=
3071                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3072            cursor = db_storage_trie_cursor.cursor;
3073        }
3074        Ok(())
3075    }
3076}
3077
3078impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3079    /// Writes trie updates to the database with already sorted updates.
3080    ///
3081    /// Returns the number of entries modified.
3082    #[instrument(level = "debug", target = "providers::db", skip_all)]
3083    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3084        if trie_updates.is_empty() {
3085            return Ok(0)
3086        }
3087
3088        // Track the number of inserted entries.
3089        let mut num_entries = 0;
3090
3091        reth_trie_db::with_adapter!(self, |A| {
3092            Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3093        });
3094
3095        num_entries +=
3096            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3097
3098        Ok(num_entries)
3099    }
3100}
3101
3102impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3103    /// Writes storage trie updates from the given storage trie map with already sorted updates.
3104    ///
3105    /// Expects the storage trie updates to already be sorted by the hashed address key.
3106    ///
3107    /// Returns the number of entries modified.
3108    fn write_storage_trie_updates_sorted<'a>(
3109        &self,
3110        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3111    ) -> ProviderResult<usize> {
3112        let mut num_entries = 0;
3113        let mut storage_tries = storage_tries.collect::<Vec<_>>();
3114        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3115        reth_trie_db::with_adapter!(self, |A| {
3116            Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3117        });
3118        Ok(num_entries)
3119    }
3120}
3121
3122impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3123    fn unwind_account_hashing<'a>(
3124        &self,
3125        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3126    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3127        // Aggregate all block changesets and make a list of accounts that have been changed.
3128        // Note that collecting and then reversing the order is necessary to ensure that the
3129        // changes are applied in the correct order.
3130        let hashed_accounts = changesets
3131            .into_iter()
3132            .map(|(_, e)| (keccak256(e.address), e.info))
3133            .collect::<Vec<_>>()
3134            .into_iter()
3135            .rev()
3136            .collect::<BTreeMap<_, _>>();
3137
3138        // Apply values to HashedState, and remove the account if it's None.
3139        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3140        for (hashed_address, account) in &hashed_accounts {
3141            if let Some(account) = account {
3142                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3143            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3144                hashed_accounts_cursor.delete_current()?;
3145            }
3146        }
3147
3148        Ok(hashed_accounts)
3149    }
3150
3151    fn unwind_account_hashing_range(
3152        &self,
3153        range: impl RangeBounds<BlockNumber>,
3154    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3155        let changesets = self.account_changesets_range(range)?;
3156        self.unwind_account_hashing(changesets.iter())
3157    }
3158
3159    fn insert_account_for_hashing(
3160        &self,
3161        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3162    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3163        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3164        let hashed_accounts =
3165            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3166        for (hashed_address, account) in &hashed_accounts {
3167            if let Some(account) = account {
3168                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3169            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3170                hashed_accounts_cursor.delete_current()?;
3171            }
3172        }
3173        Ok(hashed_accounts)
3174    }
3175
3176    fn unwind_storage_hashing(
3177        &self,
3178        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3179    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3180        // Aggregate all block changesets and make list of accounts that have been changed.
3181        let mut hashed_storages = changesets
3182            .into_iter()
3183            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3184                let hashed_key = keccak256(storage_entry.key);
3185                (keccak256(address), hashed_key, storage_entry.value)
3186            })
3187            .collect::<Vec<_>>();
3188        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3189
3190        // Apply values to HashedState, and remove the account if it's None.
3191        let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3192            B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3193        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3194        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3195            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3196
3197            if hashed_storage
3198                .seek_by_key_subkey(hashed_address, key)?
3199                .is_some_and(|entry| entry.key == key)
3200            {
3201                hashed_storage.delete_current()?;
3202            }
3203
3204            if !value.is_zero() {
3205                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3206            }
3207        }
3208        Ok(hashed_storage_keys)
3209    }
3210
3211    fn unwind_storage_hashing_range(
3212        &self,
3213        range: impl RangeBounds<BlockNumber>,
3214    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3215        let changesets = self.storage_changesets_range(range)?;
3216        self.unwind_storage_hashing(changesets.into_iter())
3217    }
3218
3219    fn insert_storage_for_hashing(
3220        &self,
3221        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3222    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3223        // hash values
3224        let hashed_storages =
3225            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3226                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3227                    map.insert(keccak256(entry.key), entry.value);
3228                    map
3229                });
3230                map.insert(keccak256(address), storage);
3231                map
3232            });
3233
3234        let hashed_storage_keys = hashed_storages
3235            .iter()
3236            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3237            .collect();
3238
3239        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3240        // Hash the address and key and apply them to HashedStorage (if Storage is None
3241        // just remove it);
3242        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3243            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3244                if hashed_storage_cursor
3245                    .seek_by_key_subkey(hashed_address, key)?
3246                    .is_some_and(|entry| entry.key == key)
3247                {
3248                    hashed_storage_cursor.delete_current()?;
3249                }
3250
3251                if !value.is_zero() {
3252                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3253                }
3254                Ok(())
3255            })
3256        })?;
3257
3258        Ok(hashed_storage_keys)
3259    }
3260}
3261
3262impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3263    fn unwind_account_history_indices<'a>(
3264        &self,
3265        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3266    ) -> ProviderResult<usize> {
3267        let mut last_indices = changesets
3268            .into_iter()
3269            .map(|(index, account)| (account.address, *index))
3270            .collect::<Vec<_>>();
3271        last_indices.sort_unstable_by_key(|(a, _)| *a);
3272
3273        if self.cached_storage_settings().storage_v2 {
3274            #[cfg(all(unix, feature = "rocksdb"))]
3275            {
3276                let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3277                self.pending_rocksdb_batches.lock().push(batch);
3278            }
3279        } else {
3280            // Unwind the account history index in MDBX.
3281            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3282            for &(address, rem_index) in &last_indices {
3283                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3284                    &mut cursor,
3285                    ShardedKey::last(address),
3286                    rem_index,
3287                    |sharded_key| sharded_key.key == address,
3288                )?;
3289
3290                // Check the last returned partial shard.
3291                // If it's not empty, the shard needs to be reinserted.
3292                if !partial_shard.is_empty() {
3293                    cursor.insert(
3294                        ShardedKey::last(address),
3295                        &BlockNumberList::new_pre_sorted(partial_shard),
3296                    )?;
3297                }
3298            }
3299        }
3300
3301        let changesets = last_indices.len();
3302        Ok(changesets)
3303    }
3304
3305    fn unwind_account_history_indices_range(
3306        &self,
3307        range: impl RangeBounds<BlockNumber>,
3308    ) -> ProviderResult<usize> {
3309        let changesets = self.account_changesets_range(range)?;
3310        self.unwind_account_history_indices(changesets.iter())
3311    }
3312
3313    fn insert_account_history_index(
3314        &self,
3315        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3316    ) -> ProviderResult<()> {
3317        self.append_history_index::<_, tables::AccountsHistory>(
3318            account_transitions,
3319            ShardedKey::new,
3320        )
3321    }
3322
3323    fn unwind_storage_history_indices(
3324        &self,
3325        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3326    ) -> ProviderResult<usize> {
3327        let mut storage_changesets = changesets
3328            .into_iter()
3329            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3330            .collect::<Vec<_>>();
3331        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3332
3333        if self.cached_storage_settings().storage_v2 {
3334            #[cfg(all(unix, feature = "rocksdb"))]
3335            {
3336                let batch =
3337                    self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3338                self.pending_rocksdb_batches.lock().push(batch);
3339            }
3340        } else {
3341            // Unwind the storage history index in MDBX.
3342            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3343            for &(address, storage_key, rem_index) in &storage_changesets {
3344                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3345                    &mut cursor,
3346                    StorageShardedKey::last(address, storage_key),
3347                    rem_index,
3348                    |storage_sharded_key| {
3349                        storage_sharded_key.address == address &&
3350                            storage_sharded_key.sharded_key.key == storage_key
3351                    },
3352                )?;
3353
3354                // Check the last returned partial shard.
3355                // If it's not empty, the shard needs to be reinserted.
3356                if !partial_shard.is_empty() {
3357                    cursor.insert(
3358                        StorageShardedKey::last(address, storage_key),
3359                        &BlockNumberList::new_pre_sorted(partial_shard),
3360                    )?;
3361                }
3362            }
3363        }
3364
3365        let changesets = storage_changesets.len();
3366        Ok(changesets)
3367    }
3368
3369    fn unwind_storage_history_indices_range(
3370        &self,
3371        range: impl RangeBounds<BlockNumber>,
3372    ) -> ProviderResult<usize> {
3373        let changesets = self.storage_changesets_range(range)?;
3374        self.unwind_storage_history_indices(changesets.into_iter())
3375    }
3376
3377    fn insert_storage_history_index(
3378        &self,
3379        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3380    ) -> ProviderResult<()> {
3381        self.append_history_index::<_, tables::StoragesHistory>(
3382            storage_transitions,
3383            |(address, storage_key), highest_block_number| {
3384                StorageShardedKey::new(address, storage_key, highest_block_number)
3385            },
3386        )
3387    }
3388
3389    #[instrument(level = "debug", target = "providers::db", skip_all)]
3390    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3391        let storage_settings = self.cached_storage_settings();
3392        if !storage_settings.storage_v2 {
3393            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3394            self.insert_account_history_index(indices)?;
3395        }
3396
3397        if !storage_settings.storage_v2 {
3398            let indices = self.changed_storages_and_blocks_with_range(range)?;
3399            self.insert_storage_history_index(indices)?;
3400        }
3401
3402        Ok(())
3403    }
3404}
3405
3406impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3407    for DatabaseProvider<TX, N>
3408{
3409    fn take_block_and_execution_above(
3410        &self,
3411        block: BlockNumber,
3412    ) -> ProviderResult<Chain<Self::Primitives>> {
3413        let range = block + 1..=self.last_block_number()?;
3414
3415        self.unwind_trie_state_from(block + 1)?;
3416
3417        // get execution res
3418        let execution_state = self.take_state_above(block)?;
3419
3420        let blocks = self.recovered_block_range(range)?;
3421
3422        // remove block bodies it is needed for both get block range and get block execution results
3423        // that is why it is deleted afterwards.
3424        self.remove_blocks_above(block)?;
3425
3426        // Update pipeline progress
3427        self.update_pipeline_stages(block, true)?;
3428
3429        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3430    }
3431
3432    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3433        self.unwind_trie_state_from(block + 1)?;
3434
3435        // remove execution res
3436        self.remove_state_above(block)?;
3437
3438        // remove block bodies it is needed for both get block range and get block execution results
3439        // that is why it is deleted afterwards.
3440        self.remove_blocks_above(block)?;
3441
3442        // Update pipeline progress
3443        self.update_pipeline_stages(block, true)?;
3444
3445        Ok(())
3446    }
3447}
3448
3449impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3450    for DatabaseProvider<TX, N>
3451{
3452    type Block = BlockTy<N>;
3453    type Receipt = ReceiptTy<N>;
3454
3455    /// Inserts the block into the database, writing to both static files and MDBX.
3456    ///
3457    /// This is a convenience method primarily used in tests. For production use,
3458    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3459    fn insert_block(
3460        &self,
3461        block: &RecoveredBlock<Self::Block>,
3462    ) -> ProviderResult<StoredBlockBodyIndices> {
3463        let block_number = block.number();
3464
3465        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3466        let executed_block = ExecutedBlock::new(
3467            Arc::new(block.clone()),
3468            Arc::new(BlockExecutionOutput {
3469                result: BlockExecutionResult {
3470                    receipts: Default::default(),
3471                    requests: Default::default(),
3472                    gas_used: 0,
3473                    blob_gas_used: 0,
3474                },
3475                state: Default::default(),
3476            }),
3477            ComputedTrieData::default(),
3478        );
3479
3480        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3481        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3482
3483        // Return the body indices
3484        self.block_body_indices(block_number)?
3485            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3486    }
3487
3488    fn append_block_bodies(
3489        &self,
3490        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3491    ) -> ProviderResult<()> {
3492        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3493
3494        // Initialize writer if we will be writing transactions to staticfiles
3495        let mut tx_writer =
3496            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3497
3498        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3499        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3500
3501        // Get id for the next tx_num or zero if there are no transactions.
3502        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3503
3504        for (block_number, body) in &bodies {
3505            // Increment block on static file header.
3506            tx_writer.increment_block(*block_number)?;
3507
3508            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3509            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3510
3511            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3512
3513            // insert block meta
3514            block_indices_cursor.append(*block_number, &block_indices)?;
3515
3516            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3517
3518            let Some(body) = body else { continue };
3519
3520            // write transaction block index
3521            if !body.transactions().is_empty() {
3522                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3523                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3524            }
3525
3526            // write transactions
3527            for transaction in body.transactions() {
3528                tx_writer.append_transaction(next_tx_num, transaction)?;
3529
3530                // Increment transaction id for each transaction.
3531                next_tx_num += 1;
3532            }
3533        }
3534
3535        self.storage.writer().write_block_bodies(self, bodies)?;
3536
3537        Ok(())
3538    }
3539
3540    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3541        let last_block_number = self.last_block_number()?;
3542        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3543        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3544            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3545        }
3546
3547        // Get highest static file block for the total block range
3548        let highest_static_file_block = self
3549            .static_file_provider()
3550            .get_highest_static_file_block(StaticFileSegment::Headers)
3551            .expect("todo: error handling, headers should exist");
3552
3553        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3554        // we remove only what is ABOVE the block.
3555        //
3556        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3557        // will have three blocks to remove, which will be block 8, 7, and 6.
3558        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3559        self.static_file_provider()
3560            .get_writer(block, StaticFileSegment::Headers)?
3561            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3562
3563        // First transaction to be removed
3564        let unwind_tx_from = self
3565            .block_body_indices(block)?
3566            .map(|b| b.next_tx_num())
3567            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3568
3569        // Last transaction to be removed
3570        let unwind_tx_to = self
3571            .tx
3572            .cursor_read::<tables::BlockBodyIndices>()?
3573            .last()?
3574            // shouldn't happen because this was OK above
3575            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3576            .1
3577            .last_tx_num();
3578
3579        if unwind_tx_from <= unwind_tx_to {
3580            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3581            self.with_rocksdb_batch(|batch| {
3582                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3583                for (hash, _) in hashes {
3584                    writer.delete_transaction_hash_number(hash)?;
3585                }
3586                Ok(((), writer.into_raw_rocksdb_batch()))
3587            })?;
3588        }
3589
3590        // Skip sender pruning when sender_recovery is fully pruned, since no sender data
3591        // exists in static files or the database.
3592        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3593            EitherWriter::new_senders(self, last_block_number)?
3594                .prune_senders(unwind_tx_from, block)?;
3595        }
3596
3597        self.remove_bodies_above(block)?;
3598
3599        Ok(())
3600    }
3601
3602    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3603        self.storage.writer().remove_block_bodies_above(self, block)?;
3604
3605        // First transaction to be removed
3606        let unwind_tx_from = self
3607            .block_body_indices(block)?
3608            .map(|b| b.next_tx_num())
3609            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3610
3611        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3612        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3613
3614        let static_file_tx_num =
3615            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3616
3617        let to_delete = static_file_tx_num
3618            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3619            .unwrap_or_default();
3620
3621        self.static_file_provider
3622            .latest_writer(StaticFileSegment::Transactions)?
3623            .prune_transactions(to_delete, block)?;
3624
3625        Ok(())
3626    }
3627
3628    /// Appends blocks with their execution state to the database.
3629    ///
3630    /// **Note:** This function is only used in tests.
3631    ///
3632    /// History indices are written to the appropriate backend based on storage settings:
3633    /// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
3634    ///
3635    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3636    fn append_blocks_with_state(
3637        &self,
3638        blocks: Vec<RecoveredBlock<Self::Block>>,
3639        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3640        hashed_state: HashedPostStateSorted,
3641    ) -> ProviderResult<()> {
3642        if blocks.is_empty() {
3643            debug!(target: "providers::db", "Attempted to append empty block range");
3644            return Ok(())
3645        }
3646
3647        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3648        // `None`.
3649        let first_number = blocks[0].number();
3650
3651        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3652        // `None`.
3653        let last_block_number = blocks[blocks.len() - 1].number();
3654
3655        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3656
3657        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3658        // This is necessary because with edge storage, changesets are written to static files
3659        // whose index isn't updated until commit, making them invisible to subsequent reads
3660        // within the same transaction.
3661        let (account_transitions, storage_transitions) = {
3662            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3663            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3664            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3665                let block_number = first_number + block_idx as u64;
3666                for (address, account_revert) in block_reverts {
3667                    account_transitions.entry(*address).or_default().push(block_number);
3668                    for storage_key in account_revert.storage.keys() {
3669                        let key = B256::from(storage_key.to_be_bytes());
3670                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3671                    }
3672                }
3673            }
3674            (account_transitions, storage_transitions)
3675        };
3676
3677        // Insert the blocks
3678        for block in blocks {
3679            self.insert_block(&block)?;
3680            durations_recorder.record_relative(metrics::Action::InsertBlock);
3681        }
3682
3683        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3684        durations_recorder.record_relative(metrics::Action::InsertState);
3685
3686        // insert hashes and intermediate merkle nodes
3687        self.write_hashed_state(&hashed_state)?;
3688        durations_recorder.record_relative(metrics::Action::InsertHashes);
3689
3690        // Use pre-computed transitions for history indices since static file
3691        // writes aren't visible until commit.
3692        // Note: For MDBX we use insert_*_history_index. For RocksDB we use
3693        // append_*_history_shard which handles read-merge-write internally.
3694        let storage_settings = self.cached_storage_settings();
3695        if storage_settings.storage_v2 {
3696            #[cfg(all(unix, feature = "rocksdb"))]
3697            self.with_rocksdb_batch(|mut batch| {
3698                for (address, blocks) in account_transitions {
3699                    batch.append_account_history_shard(address, blocks)?;
3700                }
3701                Ok(((), Some(batch.into_inner())))
3702            })?;
3703        } else {
3704            self.insert_account_history_index(account_transitions)?;
3705        }
3706        if storage_settings.storage_v2 {
3707            #[cfg(all(unix, feature = "rocksdb"))]
3708            self.with_rocksdb_batch(|mut batch| {
3709                for ((address, key), blocks) in storage_transitions {
3710                    batch.append_storage_history_shard(address, key, blocks)?;
3711                }
3712                Ok(((), Some(batch.into_inner())))
3713            })?;
3714        } else {
3715            self.insert_storage_history_index(storage_transitions)?;
3716        }
3717        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3718
3719        // Update pipeline progress
3720        self.update_pipeline_stages(last_block_number, false)?;
3721        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3722
3723        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3724
3725        Ok(())
3726    }
3727}
3728
3729impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3730    fn get_prune_checkpoint(
3731        &self,
3732        segment: PruneSegment,
3733    ) -> ProviderResult<Option<PruneCheckpoint>> {
3734        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3735    }
3736
3737    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3738        Ok(PruneSegment::variants()
3739            .filter_map(|segment| {
3740                self.tx
3741                    .get::<tables::PruneCheckpoints>(segment)
3742                    .transpose()
3743                    .map(|chk| chk.map(|chk| (segment, chk)))
3744            })
3745            .collect::<Result<_, _>>()?)
3746    }
3747}
3748
3749impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3750    fn save_prune_checkpoint(
3751        &self,
3752        segment: PruneSegment,
3753        checkpoint: PruneCheckpoint,
3754    ) -> ProviderResult<()> {
3755        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3756    }
3757}
3758
3759impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3760    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3761        let db_entries = self.tx.entries::<T>()?;
3762        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3763            Ok(entries) => entries,
3764            Err(ProviderError::UnsupportedProvider) => 0,
3765            Err(err) => return Err(err),
3766        };
3767
3768        Ok(db_entries + static_file_entries)
3769    }
3770}
3771
3772impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3773    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3774        let mut finalized_blocks = self
3775            .tx
3776            .cursor_read::<tables::ChainState>()?
3777            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3778            .take(1)
3779            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3780
3781        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3782        Ok(last_finalized_block_number)
3783    }
3784
3785    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3786        let mut finalized_blocks = self
3787            .tx
3788            .cursor_read::<tables::ChainState>()?
3789            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3790            .take(1)
3791            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3792
3793        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3794        Ok(last_finalized_block_number)
3795    }
3796}
3797
3798impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3799    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3800        Ok(self
3801            .tx
3802            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3803    }
3804
3805    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3806        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3807    }
3808}
3809
3810impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3811    type Tx = TX;
3812
3813    fn tx_ref(&self) -> &Self::Tx {
3814        &self.tx
3815    }
3816
3817    fn tx_mut(&mut self) -> &mut Self::Tx {
3818        &mut self.tx
3819    }
3820
3821    fn into_tx(self) -> Self::Tx {
3822        self.tx
3823    }
3824
3825    fn prune_modes_ref(&self) -> &PruneModes {
3826        self.prune_modes_ref()
3827    }
3828
3829    /// Commit database transaction, static files, and pending `RocksDB` batches.
3830    #[instrument(
3831        name = "DatabaseProvider::commit",
3832        level = "debug",
3833        target = "providers::db",
3834        skip_all
3835    )]
3836    fn commit(self) -> ProviderResult<()> {
3837        // For unwinding it makes more sense to commit the database first, since if
3838        // it is interrupted before the static files commit, we can just
3839        // truncate the static files according to the
3840        // checkpoints on the next start-up.
3841        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3842            self.tx.commit()?;
3843
3844            #[cfg(all(unix, feature = "rocksdb"))]
3845            {
3846                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3847                for batch in batches {
3848                    self.rocksdb_provider.commit_batch(batch)?;
3849                }
3850            }
3851
3852            self.static_file_provider.commit()?;
3853        } else {
3854            // Normal path: finalize() will call sync_all() if not already synced
3855            let mut timings = metrics::CommitTimings::default();
3856
3857            let start = Instant::now();
3858            self.static_file_provider.finalize()?;
3859            timings.sf = start.elapsed();
3860
3861            #[cfg(all(unix, feature = "rocksdb"))]
3862            {
3863                let start = Instant::now();
3864                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3865                for batch in batches {
3866                    self.rocksdb_provider.commit_batch(batch)?;
3867                }
3868                timings.rocksdb = start.elapsed();
3869            }
3870
3871            let start = Instant::now();
3872            self.tx.commit()?;
3873            timings.mdbx = start.elapsed();
3874
3875            self.metrics.record_commit(&timings);
3876        }
3877
3878        Ok(())
3879    }
3880}
3881
3882impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3883    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3884        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3885    }
3886}
3887
3888impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3889    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3890        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3891    }
3892}
3893
3894impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3895    fn cached_storage_settings(&self) -> StorageSettings {
3896        *self.storage_settings.read()
3897    }
3898
3899    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3900        *self.storage_settings.write() = settings;
3901    }
3902}
3903
3904impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3905    fn storage_path(&self) -> PathBuf {
3906        self.db_path.clone()
3907    }
3908}
3909
3910#[cfg(test)]
3911mod tests {
3912    use super::*;
3913    use crate::{
3914        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3915        BlockWriter,
3916    };
3917    use alloy_consensus::Header;
3918    use alloy_primitives::{
3919        map::{AddressMap, B256Map},
3920        U256,
3921    };
3922    use reth_chain_state::ExecutedBlock;
3923    use reth_ethereum_primitives::Receipt;
3924    use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3925    use reth_primitives_traits::SealedBlock;
3926    use reth_testing_utils::generators::{self, random_block, BlockParams};
3927    use reth_trie::{
3928        HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3929    };
3930    use revm_database::BundleState;
3931    use revm_state::AccountInfo;
3932
3933    #[test]
3934    fn test_receipts_by_block_range_empty_range() {
3935        let factory = create_test_provider_factory();
3936        let provider = factory.provider().unwrap();
3937
3938        // empty range should return empty vec
3939        let start = 10u64;
3940        let end = 9u64;
3941        let result = provider.receipts_by_block_range(start..=end).unwrap();
3942        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3943    }
3944
3945    #[test]
3946    fn test_receipts_by_block_range_nonexistent_blocks() {
3947        let factory = create_test_provider_factory();
3948        let provider = factory.provider().unwrap();
3949
3950        // non-existent blocks should return empty vecs for each block
3951        let result = provider.receipts_by_block_range(10..=12).unwrap();
3952        assert_eq!(result, vec![vec![], vec![], vec![]]);
3953    }
3954
3955    #[test]
3956    fn test_receipts_by_block_range_single_block() {
3957        let factory = create_test_provider_factory();
3958        let data = BlockchainTestData::default();
3959
3960        let provider_rw = factory.provider_rw().unwrap();
3961        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3962        provider_rw
3963            .write_state(
3964                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3965                crate::OriginalValuesKnown::No,
3966                StateWriteConfig::default(),
3967            )
3968            .unwrap();
3969        provider_rw.insert_block(&data.blocks[0].0).unwrap();
3970        provider_rw
3971            .write_state(
3972                &data.blocks[0].1,
3973                crate::OriginalValuesKnown::No,
3974                StateWriteConfig::default(),
3975            )
3976            .unwrap();
3977        provider_rw.commit().unwrap();
3978
3979        let provider = factory.provider().unwrap();
3980        let result = provider.receipts_by_block_range(1..=1).unwrap();
3981
3982        // should have one vec with one receipt
3983        assert_eq!(result.len(), 1);
3984        assert_eq!(result[0].len(), 1);
3985        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3986    }
3987
3988    #[test]
3989    fn test_receipts_by_block_range_multiple_blocks() {
3990        let factory = create_test_provider_factory();
3991        let data = BlockchainTestData::default();
3992
3993        let provider_rw = factory.provider_rw().unwrap();
3994        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3995        provider_rw
3996            .write_state(
3997                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3998                crate::OriginalValuesKnown::No,
3999                StateWriteConfig::default(),
4000            )
4001            .unwrap();
4002        for i in 0..3 {
4003            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4004            provider_rw
4005                .write_state(
4006                    &data.blocks[i].1,
4007                    crate::OriginalValuesKnown::No,
4008                    StateWriteConfig::default(),
4009                )
4010                .unwrap();
4011        }
4012        provider_rw.commit().unwrap();
4013
4014        let provider = factory.provider().unwrap();
4015        let result = provider.receipts_by_block_range(1..=3).unwrap();
4016
4017        // should have 3 vecs, each with one receipt
4018        assert_eq!(result.len(), 3);
4019        for (i, block_receipts) in result.iter().enumerate() {
4020            assert_eq!(block_receipts.len(), 1);
4021            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4022        }
4023    }
4024
4025    #[test]
4026    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4027        let factory = create_test_provider_factory();
4028        let data = BlockchainTestData::default();
4029
4030        let provider_rw = factory.provider_rw().unwrap();
4031        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4032        provider_rw
4033            .write_state(
4034                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4035                crate::OriginalValuesKnown::No,
4036                StateWriteConfig::default(),
4037            )
4038            .unwrap();
4039
4040        // insert blocks 1-3 with receipts
4041        for i in 0..3 {
4042            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4043            provider_rw
4044                .write_state(
4045                    &data.blocks[i].1,
4046                    crate::OriginalValuesKnown::No,
4047                    StateWriteConfig::default(),
4048                )
4049                .unwrap();
4050        }
4051        provider_rw.commit().unwrap();
4052
4053        let provider = factory.provider().unwrap();
4054        let result = provider.receipts_by_block_range(1..=3).unwrap();
4055
4056        // verify each block has one receipt
4057        assert_eq!(result.len(), 3);
4058        for block_receipts in &result {
4059            assert_eq!(block_receipts.len(), 1);
4060        }
4061    }
4062
4063    #[test]
4064    fn test_receipts_by_block_range_partial_range() {
4065        let factory = create_test_provider_factory();
4066        let data = BlockchainTestData::default();
4067
4068        let provider_rw = factory.provider_rw().unwrap();
4069        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4070        provider_rw
4071            .write_state(
4072                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4073                crate::OriginalValuesKnown::No,
4074                StateWriteConfig::default(),
4075            )
4076            .unwrap();
4077        for i in 0..3 {
4078            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4079            provider_rw
4080                .write_state(
4081                    &data.blocks[i].1,
4082                    crate::OriginalValuesKnown::No,
4083                    StateWriteConfig::default(),
4084                )
4085                .unwrap();
4086        }
4087        provider_rw.commit().unwrap();
4088
4089        let provider = factory.provider().unwrap();
4090
4091        // request range that includes both existing and non-existing blocks
4092        let result = provider.receipts_by_block_range(2..=5).unwrap();
4093        assert_eq!(result.len(), 4);
4094
4095        // blocks 2-3 should have receipts, blocks 4-5 should be empty
4096        assert_eq!(result[0].len(), 1); // block 2
4097        assert_eq!(result[1].len(), 1); // block 3
4098        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
4099        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
4100
4101        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4102        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4103    }
4104
4105    #[test]
4106    fn test_receipts_by_block_range_all_empty_blocks() {
4107        let factory = create_test_provider_factory();
4108        let mut rng = generators::rng();
4109
4110        // create blocks with no transactions
4111        let mut blocks = Vec::new();
4112        for i in 0..3 {
4113            let block =
4114                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4115            blocks.push(block);
4116        }
4117
4118        let provider_rw = factory.provider_rw().unwrap();
4119        for block in blocks {
4120            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4121        }
4122        provider_rw.commit().unwrap();
4123
4124        let provider = factory.provider().unwrap();
4125        let result = provider.receipts_by_block_range(1..=3).unwrap();
4126
4127        assert_eq!(result.len(), 3);
4128        for block_receipts in result {
4129            assert_eq!(block_receipts.len(), 0);
4130        }
4131    }
4132
4133    #[test]
4134    fn test_receipts_by_block_range_consistency_with_individual_calls() {
4135        let factory = create_test_provider_factory();
4136        let data = BlockchainTestData::default();
4137
4138        let provider_rw = factory.provider_rw().unwrap();
4139        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4140        provider_rw
4141            .write_state(
4142                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4143                crate::OriginalValuesKnown::No,
4144                StateWriteConfig::default(),
4145            )
4146            .unwrap();
4147        for i in 0..3 {
4148            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4149            provider_rw
4150                .write_state(
4151                    &data.blocks[i].1,
4152                    crate::OriginalValuesKnown::No,
4153                    StateWriteConfig::default(),
4154                )
4155                .unwrap();
4156        }
4157        provider_rw.commit().unwrap();
4158
4159        let provider = factory.provider().unwrap();
4160
4161        // get receipts using block range method
4162        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4163
4164        // get receipts using individual block calls
4165        let mut individual_results = Vec::new();
4166        for block_num in 1..=3 {
4167            let receipts =
4168                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4169            individual_results.push(receipts);
4170        }
4171
4172        assert_eq!(range_result, individual_results);
4173    }
4174
4175    #[test]
4176    fn test_write_trie_updates_sorted() {
4177        use reth_trie::{
4178            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4179            BranchNodeCompact, StorageTrieEntry,
4180        };
4181
4182        let factory = create_test_provider_factory();
4183        let provider_rw = factory.provider_rw().unwrap();
4184
4185        // Pre-populate account trie with data that will be deleted
4186        {
4187            let tx = provider_rw.tx_ref();
4188            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4189
4190            // Add account node that will be deleted
4191            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4192            cursor
4193                .upsert(
4194                    to_delete,
4195                    &BranchNodeCompact::new(
4196                        0b1010_1010_1010_1010, // state_mask
4197                        0b0000_0000_0000_0000, // tree_mask
4198                        0b0000_0000_0000_0000, // hash_mask
4199                        vec![],
4200                        None,
4201                    ),
4202                )
4203                .unwrap();
4204
4205            // Add account node that will be updated
4206            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4207            cursor
4208                .upsert(
4209                    to_update,
4210                    &BranchNodeCompact::new(
4211                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4212                        0b0000_0000_0000_0000, // tree_mask
4213                        0b0000_0000_0000_0000, // hash_mask
4214                        vec![],
4215                        None,
4216                    ),
4217                )
4218                .unwrap();
4219        }
4220
4221        // Pre-populate storage tries with data
4222        let storage_address1 = B256::from([1u8; 32]);
4223        let storage_address2 = B256::from([2u8; 32]);
4224        {
4225            let tx = provider_rw.tx_ref();
4226            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4227
4228            // Add storage nodes for address1 (one will be deleted)
4229            storage_cursor
4230                .upsert(
4231                    storage_address1,
4232                    &StorageTrieEntry {
4233                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4234                        node: BranchNodeCompact::new(
4235                            0b0011_0011_0011_0011, // will be deleted
4236                            0b0000_0000_0000_0000,
4237                            0b0000_0000_0000_0000,
4238                            vec![],
4239                            None,
4240                        ),
4241                    },
4242                )
4243                .unwrap();
4244
4245            // Add storage nodes for address2 (will be wiped)
4246            storage_cursor
4247                .upsert(
4248                    storage_address2,
4249                    &StorageTrieEntry {
4250                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4251                        node: BranchNodeCompact::new(
4252                            0b1100_1100_1100_1100, // will be wiped
4253                            0b0000_0000_0000_0000,
4254                            0b0000_0000_0000_0000,
4255                            vec![],
4256                            None,
4257                        ),
4258                    },
4259                )
4260                .unwrap();
4261            storage_cursor
4262                .upsert(
4263                    storage_address2,
4264                    &StorageTrieEntry {
4265                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4266                        node: BranchNodeCompact::new(
4267                            0b0011_1100_0011_1100, // will be wiped
4268                            0b0000_0000_0000_0000,
4269                            0b0000_0000_0000_0000,
4270                            vec![],
4271                            None,
4272                        ),
4273                    },
4274                )
4275                .unwrap();
4276        }
4277
4278        // Create sorted account trie updates
4279        let account_nodes = vec![
4280            (
4281                Nibbles::from_nibbles([0x1, 0x2]),
4282                Some(BranchNodeCompact::new(
4283                    0b1111_1111_1111_1111, // state_mask (updated)
4284                    0b0000_0000_0000_0000, // tree_mask
4285                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4286                    vec![],
4287                    None,
4288                )),
4289            ),
4290            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4291            (
4292                Nibbles::from_nibbles([0x5, 0x6]),
4293                Some(BranchNodeCompact::new(
4294                    0b1111_1111_1111_1111, // state_mask
4295                    0b0000_0000_0000_0000, // tree_mask
4296                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4297                    vec![],
4298                    None,
4299                )),
4300            ),
4301        ];
4302
4303        // Create sorted storage trie updates
4304        let storage_trie1 = StorageTrieUpdatesSorted {
4305            is_deleted: false,
4306            storage_nodes: vec![
4307                (
4308                    Nibbles::from_nibbles([0x1, 0x0]),
4309                    Some(BranchNodeCompact::new(
4310                        0b1111_0000_0000_0000, // state_mask
4311                        0b0000_0000_0000_0000, // tree_mask
4312                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4313                        vec![],
4314                        None,
4315                    )),
4316                ),
4317                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4318            ],
4319        };
4320
4321        let storage_trie2 = StorageTrieUpdatesSorted {
4322            is_deleted: true, // Wipe all storage for this address
4323            storage_nodes: vec![],
4324        };
4325
4326        let mut storage_tries = B256Map::default();
4327        storage_tries.insert(storage_address1, storage_trie1);
4328        storage_tries.insert(storage_address2, storage_trie2);
4329
4330        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4331
4332        // Write the sorted trie updates
4333        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4334
4335        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4336        // storage deletion = 5
4337        assert_eq!(num_entries, 5);
4338
4339        // Verify account trie updates were written correctly
4340        let tx = provider_rw.tx_ref();
4341        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4342
4343        // Check first account node was updated
4344        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4345        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4346        assert!(entry1.is_some(), "Updated account node should exist");
4347        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4348        assert_eq!(
4349            entry1.unwrap().1.state_mask,
4350            expected_mask,
4351            "Account node should have updated state_mask"
4352        );
4353
4354        // Check deleted account node no longer exists
4355        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4356        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4357        assert!(entry2.is_none(), "Deleted account node should not exist");
4358
4359        // Check new account node exists
4360        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4361        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4362        assert!(entry3.is_some(), "New account node should exist");
4363
4364        // Verify storage trie updates were written correctly
4365        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4366
4367        // Check storage for address1
4368        let storage_entries1: Vec<_> = storage_cursor
4369            .walk_dup(Some(storage_address1), None)
4370            .unwrap()
4371            .collect::<Result<Vec<_>, _>>()
4372            .unwrap();
4373        assert_eq!(
4374            storage_entries1.len(),
4375            1,
4376            "Storage address1 should have 1 entry after deletion"
4377        );
4378        assert_eq!(
4379            storage_entries1[0].1.nibbles.0,
4380            Nibbles::from_nibbles([0x1, 0x0]),
4381            "Remaining entry should be [0x1, 0x0]"
4382        );
4383
4384        // Check storage for address2 was wiped
4385        let storage_entries2: Vec<_> = storage_cursor
4386            .walk_dup(Some(storage_address2), None)
4387            .unwrap()
4388            .collect::<Result<Vec<_>, _>>()
4389            .unwrap();
4390        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4391
4392        provider_rw.commit().unwrap();
4393    }
4394
4395    #[test]
4396    fn test_prunable_receipts_logic() {
4397        let insert_blocks =
4398            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4399                let mut rng = generators::rng();
4400                for block_num in 0..=tip_block {
4401                    let block = random_block(
4402                        &mut rng,
4403                        block_num,
4404                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4405                    );
4406                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4407                }
4408            };
4409
4410        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4411            let outcome = ExecutionOutcome {
4412                first_block: block,
4413                receipts: vec![vec![Receipt {
4414                    tx_type: Default::default(),
4415                    success: true,
4416                    cumulative_gas_used: block, // identifier to assert against
4417                    logs: vec![],
4418                }]],
4419                ..Default::default()
4420            };
4421            provider_rw
4422                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4423                .unwrap();
4424            provider_rw.commit().unwrap();
4425        };
4426
4427        // Legacy mode (receipts in DB) - should be prunable
4428        {
4429            let factory = create_test_provider_factory();
4430            let storage_settings = StorageSettings::v1();
4431            factory.set_storage_settings_cache(storage_settings);
4432            let factory = factory.with_prune_modes(PruneModes {
4433                receipts: Some(PruneMode::Before(100)),
4434                ..Default::default()
4435            });
4436
4437            let tip_block = 200u64;
4438            let first_block = 1u64;
4439
4440            // create chain
4441            let provider_rw = factory.provider_rw().unwrap();
4442            insert_blocks(&provider_rw, tip_block, 1);
4443            provider_rw.commit().unwrap();
4444
4445            write_receipts(
4446                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4447                first_block,
4448            );
4449            write_receipts(
4450                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4451                tip_block - 1,
4452            );
4453
4454            let provider = factory.provider().unwrap();
4455
4456            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4457                assert!(provider
4458                    .receipts_by_block(block.into())
4459                    .unwrap()
4460                    .is_some_and(|r| r.len() == num_receipts));
4461            }
4462        }
4463
4464        // Static files mode
4465        {
4466            let factory = create_test_provider_factory();
4467            let storage_settings = StorageSettings::v2();
4468            factory.set_storage_settings_cache(storage_settings);
4469            let factory = factory.with_prune_modes(PruneModes {
4470                receipts: Some(PruneMode::Before(2)),
4471                ..Default::default()
4472            });
4473
4474            let tip_block = 200u64;
4475
4476            // create chain
4477            let provider_rw = factory.provider_rw().unwrap();
4478            insert_blocks(&provider_rw, tip_block, 1);
4479            provider_rw.commit().unwrap();
4480
4481            // Attempt to write receipts for block 0 and 1 (should be skipped)
4482            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4483            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4484
4485            assert!(factory
4486                .static_file_provider()
4487                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4488                .is_none(),);
4489            assert!(factory
4490                .static_file_provider()
4491                .get_highest_static_file_block(StaticFileSegment::Receipts)
4492                .is_some_and(|b| b == 1),);
4493
4494            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4495            // static files.
4496            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4497            assert!(factory
4498                .static_file_provider()
4499                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4500                .is_some_and(|num| num == 2),);
4501
4502            // After having a receipt already in static files, attempt to skip the next receipt by
4503            // changing the prune mode. It should NOT skip it and should still write the receipt,
4504            // since static files do not support gaps.
4505            let factory = factory.with_prune_modes(PruneModes {
4506                receipts: Some(PruneMode::Before(100)),
4507                ..Default::default()
4508            });
4509            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4510            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4511            write_receipts(provider_rw, 3);
4512
4513            // Ensure we can only fetch the 2 last receipts.
4514            //
4515            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4516            // to the block number it belongs to easily identify and assert.
4517            let provider = factory.provider().unwrap();
4518            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4519            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4520                assert!(provider
4521                    .receipts_by_block(num.into())
4522                    .unwrap()
4523                    .is_some_and(|r| r.len() == num_receipts));
4524
4525                let receipt = provider.receipt(num).unwrap();
4526                if num_receipts > 0 {
4527                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4528                } else {
4529                    assert!(receipt.is_none());
4530                }
4531            }
4532        }
4533    }
4534
4535    #[test]
4536    fn test_try_into_history_rejects_unexecuted_blocks() {
4537        use reth_storage_api::TryIntoHistoricalStateProvider;
4538
4539        let factory = create_test_provider_factory();
4540
4541        // Insert genesis block to have some data
4542        let data = BlockchainTestData::default();
4543        let provider_rw = factory.provider_rw().unwrap();
4544        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4545        provider_rw
4546            .write_state(
4547                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4548                crate::OriginalValuesKnown::No,
4549                StateWriteConfig::default(),
4550            )
4551            .unwrap();
4552        provider_rw.commit().unwrap();
4553
4554        // Get a fresh provider - Execution checkpoint is 0, no receipts written beyond genesis
4555        let provider = factory.provider().unwrap();
4556
4557        // Requesting historical state for block 0 (executed) should succeed
4558        let result = provider.try_into_history_at_block(0);
4559        assert!(result.is_ok(), "Block 0 should be available");
4560
4561        // Get another provider and request state for block 100 (not executed)
4562        let provider = factory.provider().unwrap();
4563        let result = provider.try_into_history_at_block(100);
4564
4565        // Should fail with BlockNotExecuted error
4566        match result {
4567            Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4568            Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4569            Ok(_) => panic!("Expected error, got Ok"),
4570        }
4571    }
4572
4573    #[test]
4574    fn test_unwind_storage_hashing_with_hashed_state() {
4575        let factory = create_test_provider_factory();
4576        let storage_settings = StorageSettings::v2();
4577        factory.set_storage_settings_cache(storage_settings);
4578
4579        let address = Address::random();
4580        let hashed_address = keccak256(address);
4581
4582        let plain_slot = B256::random();
4583        let hashed_slot = keccak256(plain_slot);
4584
4585        let current_value = U256::from(100);
4586        let old_value = U256::from(42);
4587
4588        let provider_rw = factory.provider_rw().unwrap();
4589        provider_rw
4590            .tx
4591            .cursor_dup_write::<tables::HashedStorages>()
4592            .unwrap()
4593            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4594            .unwrap();
4595
4596        let changesets = vec![(
4597            BlockNumberAddress((1, address)),
4598            StorageEntry { key: plain_slot, value: old_value },
4599        )];
4600
4601        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4602
4603        assert_eq!(result.len(), 1);
4604        assert!(result.contains_key(&hashed_address));
4605        assert!(result[&hashed_address].contains(&hashed_slot));
4606
4607        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4608        let entry = cursor
4609            .seek_by_key_subkey(hashed_address, hashed_slot)
4610            .unwrap()
4611            .expect("entry should exist");
4612        assert_eq!(entry.key, hashed_slot);
4613        assert_eq!(entry.value, old_value);
4614    }
4615
4616    #[test]
4617    fn test_write_and_remove_state_roundtrip_legacy() {
4618        let factory = create_test_provider_factory();
4619        let storage_settings = StorageSettings::v1();
4620        assert!(!storage_settings.use_hashed_state());
4621        factory.set_storage_settings_cache(storage_settings);
4622
4623        let address = Address::with_last_byte(1);
4624        let hashed_address = keccak256(address);
4625        let slot = U256::from(5);
4626        let slot_key = B256::from(slot);
4627        let hashed_slot = keccak256(slot_key);
4628
4629        let mut rng = generators::rng();
4630        let block0 =
4631            random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4632        let block1 =
4633            random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4634
4635        {
4636            let provider_rw = factory.provider_rw().unwrap();
4637            provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4638            provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4639            provider_rw
4640                .tx
4641                .cursor_write::<tables::PlainAccountState>()
4642                .unwrap()
4643                .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4644                .unwrap();
4645            provider_rw.commit().unwrap();
4646        }
4647
4648        let provider_rw = factory.provider_rw().unwrap();
4649
4650        let mut state_init: BundleStateInit = AddressMap::default();
4651        let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4652        storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4653        state_init.insert(
4654            address,
4655            (
4656                Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4657                Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4658                storage_map,
4659            ),
4660        );
4661
4662        let mut reverts_init: RevertsInit = HashMap::default();
4663        let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4664        block_reverts.insert(
4665            address,
4666            (
4667                Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4668                vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4669            ),
4670        );
4671        reverts_init.insert(1, block_reverts);
4672
4673        let execution_outcome =
4674            ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4675
4676        provider_rw
4677            .write_state(
4678                &execution_outcome,
4679                OriginalValuesKnown::Yes,
4680                StateWriteConfig {
4681                    write_receipts: false,
4682                    write_account_changesets: true,
4683                    write_storage_changesets: true,
4684                },
4685            )
4686            .unwrap();
4687
4688        let hashed_state =
4689            execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4690        provider_rw.write_hashed_state(&hashed_state).unwrap();
4691
4692        let account = provider_rw
4693            .tx
4694            .cursor_read::<tables::PlainAccountState>()
4695            .unwrap()
4696            .seek_exact(address)
4697            .unwrap()
4698            .unwrap()
4699            .1;
4700        assert_eq!(account.nonce, 1);
4701
4702        let storage_entry = provider_rw
4703            .tx
4704            .cursor_dup_read::<tables::PlainStorageState>()
4705            .unwrap()
4706            .seek_by_key_subkey(address, slot_key)
4707            .unwrap()
4708            .unwrap();
4709        assert_eq!(storage_entry.key, slot_key);
4710        assert_eq!(storage_entry.value, U256::from(10));
4711
4712        let hashed_entry = provider_rw
4713            .tx
4714            .cursor_dup_read::<tables::HashedStorages>()
4715            .unwrap()
4716            .seek_by_key_subkey(hashed_address, hashed_slot)
4717            .unwrap()
4718            .unwrap();
4719        assert_eq!(hashed_entry.key, hashed_slot);
4720        assert_eq!(hashed_entry.value, U256::from(10));
4721
4722        let account_cs_entries = provider_rw
4723            .tx
4724            .cursor_dup_read::<tables::AccountChangeSets>()
4725            .unwrap()
4726            .walk(Some(1))
4727            .unwrap()
4728            .collect::<Result<Vec<_>, _>>()
4729            .unwrap();
4730        assert!(!account_cs_entries.is_empty());
4731
4732        let storage_cs_entries = provider_rw
4733            .tx
4734            .cursor_read::<tables::StorageChangeSets>()
4735            .unwrap()
4736            .walk(Some(BlockNumberAddress((1, address))))
4737            .unwrap()
4738            .collect::<Result<Vec<_>, _>>()
4739            .unwrap();
4740        assert!(!storage_cs_entries.is_empty());
4741        assert_eq!(storage_cs_entries[0].1.key, slot_key);
4742
4743        provider_rw.remove_state_above(0).unwrap();
4744
4745        let restored_account = provider_rw
4746            .tx
4747            .cursor_read::<tables::PlainAccountState>()
4748            .unwrap()
4749            .seek_exact(address)
4750            .unwrap()
4751            .unwrap()
4752            .1;
4753        assert_eq!(restored_account.nonce, 0);
4754
4755        let storage_gone = provider_rw
4756            .tx
4757            .cursor_dup_read::<tables::PlainStorageState>()
4758            .unwrap()
4759            .seek_by_key_subkey(address, slot_key)
4760            .unwrap();
4761        assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4762
4763        let account_cs_after = provider_rw
4764            .tx
4765            .cursor_dup_read::<tables::AccountChangeSets>()
4766            .unwrap()
4767            .walk(Some(1))
4768            .unwrap()
4769            .collect::<Result<Vec<_>, _>>()
4770            .unwrap();
4771        assert!(account_cs_after.is_empty());
4772
4773        let storage_cs_after = provider_rw
4774            .tx
4775            .cursor_read::<tables::StorageChangeSets>()
4776            .unwrap()
4777            .walk(Some(BlockNumberAddress((1, address))))
4778            .unwrap()
4779            .collect::<Result<Vec<_>, _>>()
4780            .unwrap();
4781        assert!(storage_cs_after.is_empty());
4782    }
4783
4784    #[test]
4785    fn test_unwind_storage_hashing_legacy() {
4786        let factory = create_test_provider_factory();
4787        let storage_settings = StorageSettings::v1();
4788        assert!(!storage_settings.use_hashed_state());
4789        factory.set_storage_settings_cache(storage_settings);
4790
4791        let address = Address::random();
4792        let hashed_address = keccak256(address);
4793
4794        let plain_slot = B256::random();
4795        let hashed_slot = keccak256(plain_slot);
4796
4797        let current_value = U256::from(100);
4798        let old_value = U256::from(42);
4799
4800        let provider_rw = factory.provider_rw().unwrap();
4801        provider_rw
4802            .tx
4803            .cursor_dup_write::<tables::HashedStorages>()
4804            .unwrap()
4805            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4806            .unwrap();
4807
4808        let changesets = vec![(
4809            BlockNumberAddress((1, address)),
4810            StorageEntry { key: plain_slot, value: old_value },
4811        )];
4812
4813        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4814
4815        assert_eq!(result.len(), 1);
4816        assert!(result.contains_key(&hashed_address));
4817        assert!(result[&hashed_address].contains(&hashed_slot));
4818
4819        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4820        let entry = cursor
4821            .seek_by_key_subkey(hashed_address, hashed_slot)
4822            .unwrap()
4823            .expect("entry should exist");
4824        assert_eq!(entry.key, hashed_slot);
4825        assert_eq!(entry.value, old_value);
4826    }
4827
4828    #[test]
4829    fn test_write_state_and_historical_read_hashed() {
4830        use reth_storage_api::StateProvider;
4831        use reth_trie::{HashedPostState, KeccakKeyHasher};
4832        use revm_database::BundleState;
4833        use revm_state::AccountInfo;
4834
4835        let factory = create_test_provider_factory();
4836        factory.set_storage_settings_cache(StorageSettings::v2());
4837
4838        let address = Address::with_last_byte(1);
4839        let slot = U256::from(5);
4840        let slot_key = B256::from(slot);
4841        let hashed_address = keccak256(address);
4842        let hashed_slot = keccak256(slot_key);
4843
4844        {
4845            let sf = factory.static_file_provider();
4846            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4847            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4848            hw.append_header(&h0, &B256::ZERO).unwrap();
4849            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4850            hw.append_header(&h1, &B256::ZERO).unwrap();
4851            hw.commit().unwrap();
4852
4853            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4854            aw.append_account_changeset(vec![], 0).unwrap();
4855            aw.commit().unwrap();
4856
4857            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4858            sw.append_storage_changeset(vec![], 0).unwrap();
4859            sw.commit().unwrap();
4860        }
4861
4862        let provider_rw = factory.provider_rw().unwrap();
4863
4864        let bundle = BundleState::builder(1..=1)
4865            .state_present_account_info(
4866                address,
4867                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4868            )
4869            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4870            .revert_account_info(1, address, Some(None))
4871            .revert_storage(1, address, vec![(slot, U256::ZERO)])
4872            .build();
4873
4874        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4875
4876        provider_rw
4877            .tx
4878            .put::<tables::BlockBodyIndices>(
4879                1,
4880                StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4881            )
4882            .unwrap();
4883
4884        provider_rw
4885            .write_state(
4886                &execution_outcome,
4887                OriginalValuesKnown::Yes,
4888                StateWriteConfig {
4889                    write_receipts: false,
4890                    write_account_changesets: true,
4891                    write_storage_changesets: true,
4892                },
4893            )
4894            .unwrap();
4895
4896        let hashed_state =
4897            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4898        provider_rw.write_hashed_state(&hashed_state).unwrap();
4899
4900        let plain_storage_entries = provider_rw
4901            .tx
4902            .cursor_dup_read::<tables::PlainStorageState>()
4903            .unwrap()
4904            .walk(None)
4905            .unwrap()
4906            .collect::<Result<Vec<_>, _>>()
4907            .unwrap();
4908        assert!(plain_storage_entries.is_empty());
4909
4910        let hashed_entry = provider_rw
4911            .tx
4912            .cursor_dup_read::<tables::HashedStorages>()
4913            .unwrap()
4914            .seek_by_key_subkey(hashed_address, hashed_slot)
4915            .unwrap()
4916            .unwrap();
4917        assert_eq!(hashed_entry.key, hashed_slot);
4918        assert_eq!(hashed_entry.value, U256::from(10));
4919
4920        provider_rw.static_file_provider().commit().unwrap();
4921
4922        let sf = factory.static_file_provider();
4923        let storage_cs = sf.storage_changeset(1).unwrap();
4924        assert!(!storage_cs.is_empty());
4925        assert_eq!(storage_cs[0].1.key, slot_key);
4926
4927        let account_cs = sf.account_block_changeset(1).unwrap();
4928        assert!(!account_cs.is_empty());
4929        assert_eq!(account_cs[0].address, address);
4930
4931        let historical_value =
4932            HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4933        assert_eq!(historical_value, None);
4934    }
4935
4936    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4937    enum StorageMode {
4938        V1,
4939        V2,
4940    }
4941
4942    fn run_save_blocks_and_verify(mode: StorageMode) {
4943        use alloy_primitives::map::{FbBuildHasher, HashMap};
4944
4945        let factory = create_test_provider_factory();
4946
4947        match mode {
4948            StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4949            StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4950        }
4951
4952        let num_blocks = 3u64;
4953        let accounts_per_block = 5usize;
4954        let slots_per_account = 3usize;
4955
4956        let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4957            SealedHeader::new(
4958                Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4959                B256::ZERO,
4960            ),
4961            Default::default(),
4962        );
4963
4964        let genesis_executed = ExecutedBlock::new(
4965            Arc::new(genesis.try_recover().unwrap()),
4966            Arc::new(BlockExecutionOutput {
4967                result: BlockExecutionResult {
4968                    receipts: vec![],
4969                    requests: Default::default(),
4970                    gas_used: 0,
4971                    blob_gas_used: 0,
4972                },
4973                state: Default::default(),
4974            }),
4975            ComputedTrieData::default(),
4976        );
4977        let provider_rw = factory.provider_rw().unwrap();
4978        provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
4979        provider_rw.commit().unwrap();
4980
4981        let mut blocks: Vec<ExecutedBlock> = Vec::new();
4982        let mut parent_hash = B256::ZERO;
4983
4984        for block_num in 1..=num_blocks {
4985            let mut builder = BundleState::builder(block_num..=block_num);
4986
4987            for acct_idx in 0..accounts_per_block {
4988                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
4989                let info = AccountInfo {
4990                    nonce: block_num,
4991                    balance: U256::from(block_num * 100 + acct_idx as u64),
4992                    ..Default::default()
4993                };
4994
4995                let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
4996                    slots_per_account as u64)
4997                    .map(|s| {
4998                        (
4999                            U256::from(s + acct_idx as u64 * 100),
5000                            (U256::ZERO, U256::from(block_num * 1000 + s)),
5001                        )
5002                    })
5003                    .collect();
5004
5005                let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5006                    .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5007                    .collect();
5008
5009                builder = builder
5010                    .state_present_account_info(address, info)
5011                    .revert_account_info(block_num, address, Some(None))
5012                    .state_storage(address, storage)
5013                    .revert_storage(block_num, address, revert_storage);
5014            }
5015
5016            let bundle = builder.build();
5017
5018            let hashed_state =
5019                HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5020
5021            let header = Header {
5022                number: block_num,
5023                parent_hash,
5024                difficulty: U256::from(1),
5025                ..Default::default()
5026            };
5027            let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5028                header,
5029                Default::default(),
5030            );
5031            parent_hash = block.hash();
5032
5033            let executed = ExecutedBlock::new(
5034                Arc::new(block.try_recover().unwrap()),
5035                Arc::new(BlockExecutionOutput {
5036                    result: BlockExecutionResult {
5037                        receipts: vec![],
5038                        requests: Default::default(),
5039                        gas_used: 0,
5040                        blob_gas_used: 0,
5041                    },
5042                    state: bundle,
5043                }),
5044                ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5045            );
5046            blocks.push(executed);
5047        }
5048
5049        let provider_rw = factory.provider_rw().unwrap();
5050        provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5051        provider_rw.commit().unwrap();
5052
5053        let provider = factory.provider().unwrap();
5054
5055        for block_num in 1..=num_blocks {
5056            for acct_idx in 0..accounts_per_block {
5057                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5058                let hashed_address = keccak256(address);
5059
5060                let ha_entry = provider
5061                    .tx_ref()
5062                    .cursor_read::<tables::HashedAccounts>()
5063                    .unwrap()
5064                    .seek_exact(hashed_address)
5065                    .unwrap();
5066                assert!(
5067                    ha_entry.is_some(),
5068                    "HashedAccounts missing for block {block_num} acct {acct_idx}"
5069                );
5070
5071                for s in 1..=slots_per_account as u64 {
5072                    let slot = U256::from(s + acct_idx as u64 * 100);
5073                    let slot_key = B256::from(slot);
5074                    let hashed_slot = keccak256(slot_key);
5075
5076                    let hs_entry = provider
5077                        .tx_ref()
5078                        .cursor_dup_read::<tables::HashedStorages>()
5079                        .unwrap()
5080                        .seek_by_key_subkey(hashed_address, hashed_slot)
5081                        .unwrap();
5082                    assert!(
5083                        hs_entry.is_some(),
5084                        "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5085                    );
5086                    let entry = hs_entry.unwrap();
5087                    assert_eq!(entry.key, hashed_slot);
5088                    assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5089                }
5090            }
5091        }
5092
5093        for block_num in 1..=num_blocks {
5094            let header = provider.header_by_number(block_num).unwrap();
5095            assert!(header.is_some(), "Header missing for block {block_num}");
5096
5097            let indices = provider.block_body_indices(block_num).unwrap();
5098            assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5099        }
5100
5101        let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5102        let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5103
5104        if mode == StorageMode::V2 {
5105            assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5106            assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5107
5108            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5109            assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5110
5111            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5112            assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5113
5114            provider.static_file_provider().commit().unwrap();
5115            let sf = factory.static_file_provider();
5116
5117            for block_num in 1..=num_blocks {
5118                let account_cs = sf.account_block_changeset(block_num).unwrap();
5119                assert!(
5120                    !account_cs.is_empty(),
5121                    "v2: static file AccountChangeSets should exist for block {block_num}"
5122                );
5123
5124                let storage_cs = sf.storage_changeset(block_num).unwrap();
5125                assert!(
5126                    !storage_cs.is_empty(),
5127                    "v2: static file StorageChangeSets should exist for block {block_num}"
5128                );
5129
5130                for (_, entry) in &storage_cs {
5131                    assert!(
5132                        entry.key != keccak256(entry.key),
5133                        "v2: static file storage changeset should have plain slot keys"
5134                    );
5135                }
5136            }
5137
5138            #[cfg(all(unix, feature = "rocksdb"))]
5139            {
5140                let rocksdb = factory.rocksdb_provider();
5141                for block_num in 1..=num_blocks {
5142                    for acct_idx in 0..accounts_per_block {
5143                        let address =
5144                            Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5145                        let shards = rocksdb.account_history_shards(address).unwrap();
5146                        assert!(
5147                            !shards.is_empty(),
5148                            "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5149                        );
5150
5151                        for s in 1..=slots_per_account as u64 {
5152                            let slot = U256::from(s + acct_idx as u64 * 100);
5153                            let slot_key = B256::from(slot);
5154                            let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5155                            assert!(
5156                                !shards.is_empty(),
5157                                "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5158                            );
5159                        }
5160                    }
5161                }
5162            }
5163        } else {
5164            assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5165            assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5166
5167            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5168            assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5169
5170            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5171            assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5172
5173            for block_num in 1..=num_blocks {
5174                let storage_entries: Vec<_> = provider
5175                    .tx_ref()
5176                    .cursor_dup_read::<tables::StorageChangeSets>()
5177                    .unwrap()
5178                    .walk_range(BlockNumberAddress::range(block_num..=block_num))
5179                    .unwrap()
5180                    .collect::<Result<Vec<_>, _>>()
5181                    .unwrap();
5182                assert!(
5183                    !storage_entries.is_empty(),
5184                    "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5185                );
5186
5187                for (_, entry) in &storage_entries {
5188                    let slot_key = B256::from(entry.key);
5189                    assert!(
5190                        slot_key != keccak256(slot_key),
5191                        "v1: storage changeset keys should be plain (not hashed)"
5192                    );
5193                }
5194            }
5195
5196            let mdbx_account_history =
5197                provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5198            assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5199
5200            let mdbx_storage_history =
5201                provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5202            assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5203        }
5204    }
5205
5206    #[test]
5207    fn test_save_blocks_v1_table_assertions() {
5208        run_save_blocks_and_verify(StorageMode::V1);
5209    }
5210
5211    #[test]
5212    fn test_save_blocks_v2_table_assertions() {
5213        run_save_blocks_and_verify(StorageMode::V2);
5214    }
5215
5216    #[test]
5217    fn test_write_and_remove_state_roundtrip_v2() {
5218        let factory = create_test_provider_factory();
5219        let storage_settings = StorageSettings::v2();
5220        assert!(storage_settings.use_hashed_state());
5221        factory.set_storage_settings_cache(storage_settings);
5222
5223        let address = Address::with_last_byte(1);
5224        let hashed_address = keccak256(address);
5225        let slot = U256::from(5);
5226        let slot_key = B256::from(slot);
5227        let hashed_slot = keccak256(slot_key);
5228
5229        {
5230            let sf = factory.static_file_provider();
5231            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5232            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5233            hw.append_header(&h0, &B256::ZERO).unwrap();
5234            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5235            hw.append_header(&h1, &B256::ZERO).unwrap();
5236            hw.commit().unwrap();
5237
5238            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5239            aw.append_account_changeset(vec![], 0).unwrap();
5240            aw.commit().unwrap();
5241
5242            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5243            sw.append_storage_changeset(vec![], 0).unwrap();
5244            sw.commit().unwrap();
5245        }
5246
5247        {
5248            let provider_rw = factory.provider_rw().unwrap();
5249            provider_rw
5250                .tx
5251                .put::<tables::BlockBodyIndices>(
5252                    0,
5253                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5254                )
5255                .unwrap();
5256            provider_rw
5257                .tx
5258                .put::<tables::BlockBodyIndices>(
5259                    1,
5260                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5261                )
5262                .unwrap();
5263            provider_rw
5264                .tx
5265                .cursor_write::<tables::HashedAccounts>()
5266                .unwrap()
5267                .upsert(
5268                    hashed_address,
5269                    &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5270                )
5271                .unwrap();
5272            provider_rw.commit().unwrap();
5273        }
5274
5275        let provider_rw = factory.provider_rw().unwrap();
5276
5277        let bundle = BundleState::builder(1..=1)
5278            .state_present_account_info(
5279                address,
5280                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5281            )
5282            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5283            .revert_account_info(1, address, Some(None))
5284            .revert_storage(1, address, vec![(slot, U256::ZERO)])
5285            .build();
5286
5287        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5288
5289        provider_rw
5290            .write_state(
5291                &execution_outcome,
5292                OriginalValuesKnown::Yes,
5293                StateWriteConfig {
5294                    write_receipts: false,
5295                    write_account_changesets: true,
5296                    write_storage_changesets: true,
5297                },
5298            )
5299            .unwrap();
5300
5301        let hashed_state =
5302            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5303        provider_rw.write_hashed_state(&hashed_state).unwrap();
5304
5305        let hashed_account = provider_rw
5306            .tx
5307            .cursor_read::<tables::HashedAccounts>()
5308            .unwrap()
5309            .seek_exact(hashed_address)
5310            .unwrap()
5311            .unwrap()
5312            .1;
5313        assert_eq!(hashed_account.nonce, 1);
5314
5315        let hashed_entry = provider_rw
5316            .tx
5317            .cursor_dup_read::<tables::HashedStorages>()
5318            .unwrap()
5319            .seek_by_key_subkey(hashed_address, hashed_slot)
5320            .unwrap()
5321            .unwrap();
5322        assert_eq!(hashed_entry.key, hashed_slot);
5323        assert_eq!(hashed_entry.value, U256::from(10));
5324
5325        let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5326        assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5327
5328        let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5329        assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5330
5331        provider_rw.static_file_provider().commit().unwrap();
5332
5333        let sf = factory.static_file_provider();
5334        let storage_cs = sf.storage_changeset(1).unwrap();
5335        assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5336        assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5337
5338        provider_rw.remove_state_above(0).unwrap();
5339
5340        let restored_account = provider_rw
5341            .tx
5342            .cursor_read::<tables::HashedAccounts>()
5343            .unwrap()
5344            .seek_exact(hashed_address)
5345            .unwrap();
5346        assert!(
5347            restored_account.is_none(),
5348            "v2: account should be removed (didn't exist before block 1)"
5349        );
5350
5351        let storage_gone = provider_rw
5352            .tx
5353            .cursor_dup_read::<tables::HashedStorages>()
5354            .unwrap()
5355            .seek_by_key_subkey(hashed_address, hashed_slot)
5356            .unwrap();
5357        assert!(
5358            storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5359            "v2: storage should be reverted (removed or different key)"
5360        );
5361
5362        let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5363        assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5364
5365        let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5366        assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5367    }
5368
5369    #[test]
5370    #[cfg(all(unix, feature = "rocksdb"))]
5371    fn test_unwind_storage_history_indices_v2() {
5372        let factory = create_test_provider_factory();
5373        factory.set_storage_settings_cache(StorageSettings::v2());
5374
5375        let address = Address::with_last_byte(1);
5376        let slot_key = B256::from(U256::from(42));
5377
5378        {
5379            let rocksdb = factory.rocksdb_provider();
5380            let mut batch = rocksdb.batch();
5381            batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5382            batch.commit().unwrap();
5383
5384            let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5385            assert!(!shards.is_empty(), "history should be written to rocksdb");
5386        }
5387
5388        let provider_rw = factory.provider_rw().unwrap();
5389
5390        let changesets = vec![
5391            (
5392                BlockNumberAddress((7, address)),
5393                StorageEntry { key: slot_key, value: U256::from(5) },
5394            ),
5395            (
5396                BlockNumberAddress((10, address)),
5397                StorageEntry { key: slot_key, value: U256::from(8) },
5398            ),
5399        ];
5400
5401        let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5402        assert_eq!(count, 2);
5403
5404        provider_rw.commit().unwrap();
5405
5406        let rocksdb = factory.rocksdb_provider();
5407        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5408
5409        assert!(
5410            !shards.is_empty(),
5411            "history shards should still exist with block 3 after partial unwind"
5412        );
5413
5414        let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5415        assert!(all_blocks.contains(&3), "block 3 should remain");
5416        assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5417        assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5418    }
5419}