Skip to main content

reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, AddressSet, B256Map, HashMap},
31    Address, BlockHash, BlockNumber, StorageKey, StorageValue, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::{Database, ReaderTxnTracker},
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44        StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55    SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StateReader, StateWriteConfig, StorageChangeSetReader,
65    StoragePath, StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77    cmp::Ordering,
78    collections::{BTreeMap, BTreeSet},
79    fmt::Debug,
80    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81    path::PathBuf,
82    sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86/// Determines the commit order for database operations.
87#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
90    #[default]
91    Normal,
92    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
93    /// Used for unwind operations to allow recovery by truncating static files on restart.
94    Unwind,
95}
96
97impl CommitOrder {
98    /// Returns true if this is unwind commit order.
99    pub const fn is_unwind(&self) -> bool {
100        matches!(self, Self::Unwind)
101    }
102}
103
104/// A [`DatabaseProvider`] that holds a read-only database transaction.
105pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107/// A [`DatabaseProvider`] that holds a read-write database transaction.
108///
109/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
110/// Once that issue is solved, we can probably revert back to being an alias type.
111#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113    pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119    fn deref(&self) -> &Self::Target {
120        &self.0
121    }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125    fn deref_mut(&mut self) -> &mut Self::Target {
126        &mut self.0
127    }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131    for DatabaseProviderRW<DB, N>
132{
133    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134        &self.0
135    }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139    /// Commit database transaction and static file if it exists.
140    pub fn commit(self) -> ProviderResult<()> {
141        self.0.commit()
142    }
143
144    /// Consume `DbTx` or `DbTxMut`.
145    pub fn into_tx(self) -> <DB as Database>::TXMut {
146        self.0.into_tx()
147    }
148
149    /// Override the minimum pruning distance for testing purposes.
150    #[cfg(any(test, feature = "test-utils"))]
151    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152        self.0.minimum_pruning_distance = distance;
153        self
154    }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158    for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161        provider.0
162    }
163}
164
165/// Mode for [`DatabaseProvider::save_blocks`].
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168    /// Full mode: write block structure + receipts + state + trie.
169    /// Used by engine/production code.
170    Full,
171    /// Blocks only: write block structure (headers, txs, senders, indices).
172    /// Receipts/state/trie are skipped - they may come later via separate calls.
173    /// Used by `insert_block`.
174    BlocksOnly,
175}
176
177impl SaveBlocksMode {
178    /// Returns `true` if this is [`SaveBlocksMode::Full`].
179    pub const fn with_state(self) -> bool {
180        matches!(self, Self::Full)
181    }
182}
183
184/// A provider struct that fetches data from the database.
185/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
186pub struct DatabaseProvider<TX, N: NodeTypes> {
187    /// Database transaction.
188    tx: TX,
189    /// Chain spec
190    chain_spec: Arc<N::ChainSpec>,
191    /// Static File provider
192    static_file_provider: StaticFileProvider<N::Primitives>,
193    /// Pruning configuration
194    prune_modes: PruneModes,
195    /// Node storage handler.
196    storage: Arc<N::Storage>,
197    /// Storage configuration settings for this node
198    storage_settings: Arc<RwLock<StorageSettings>>,
199    /// `RocksDB` provider
200    rocksdb_provider: RocksDBProvider,
201    /// Changeset cache for trie unwinding
202    changeset_cache: ChangesetCache,
203    /// Task runtime for spawning parallel I/O work.
204    runtime: reth_tasks::Runtime,
205    /// Path to the database directory.
206    db_path: PathBuf,
207    /// Pending `RocksDB` batches to be committed at provider commit time.
208    pending_rocksdb_batches: PendingRocksDBBatches,
209    /// Commit order for database operations.
210    commit_order: CommitOrder,
211    /// Minimum distance from tip required for pruning
212    minimum_pruning_distance: u64,
213    /// Database provider metrics
214    metrics: metrics::DatabaseProviderMetrics,
215    /// Database handle used to inspect active MDBX readers during unwind commits.
216    reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
217}
218
219impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        let mut s = f.debug_struct("DatabaseProvider");
222        s.field("tx", &self.tx)
223            .field("chain_spec", &self.chain_spec)
224            .field("static_file_provider", &self.static_file_provider)
225            .field("prune_modes", &self.prune_modes)
226            .field("storage", &self.storage)
227            .field("storage_settings", &self.storage_settings)
228            .field("rocksdb_provider", &self.rocksdb_provider)
229            .field("changeset_cache", &self.changeset_cache)
230            .field("runtime", &self.runtime)
231            .field("pending_rocksdb_batches", &"<pending batches>")
232            .field("commit_order", &self.commit_order)
233            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
234            .field("reader_txn_tracker", &"<reader txn tracker>")
235            .finish()
236    }
237}
238
239impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
240    /// Returns reference to prune modes.
241    pub const fn prune_modes_ref(&self) -> &PruneModes {
242        &self.prune_modes
243    }
244
245    /// Sets the minimum pruning distance.
246    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
247        self.minimum_pruning_distance = distance;
248        self
249    }
250
251    /// Attaches reader tracking so unwind commits can wait on active readers.
252    pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
253    where
254        T: ReaderTxnTracker + 'static,
255    {
256        self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
257        self
258    }
259}
260
261impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
262    /// Commits unwind writes in MDBX -> `RocksDB` -> static-file order.
263    ///
264    /// This keeps MDBX as the first durable step so an interrupted unwind can be recovered by
265    /// truncating static files from checkpoints on the next startup.
266    ///
267    /// This waits after the MDBX commit so readers holding older MDBX-visible views cannot overlap
268    /// later cross-store unwind steps.
269    ///
270    /// Historical `storage_v2` reads ignore `RocksDB` history entries above their MDBX-visible tip,
271    /// so no additional post-`RocksDB` wait is needed before static-file commit.
272    fn commit_unwind(self) -> ProviderResult<()> {
273        let storage_v2 = self.cached_storage_settings().storage_v2;
274        let reader_txn_tracker = self.reader_txn_tracker.clone();
275        self.tx.commit()?;
276
277        if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
278            reader_txn_tracker.wait_for_pre_commit_readers();
279        }
280
281        if storage_v2 {
282            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
283            for batch in batches {
284                self.rocksdb_provider.commit_batch(batch)?;
285            }
286        }
287
288        self.static_file_provider.commit()?;
289        Ok(())
290    }
291
292    /// State provider for latest state
293    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
294        trace!(target: "providers::db", "Returning latest state provider");
295        Box::new(LatestStateProviderRef::new(self))
296    }
297
298    /// Storage provider for state at that given block hash
299    pub fn history_by_block_hash<'a>(
300        &'a self,
301        block_hash: BlockHash,
302    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
303        let block_number =
304            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
305        self.history_by_block_number(block_number)
306    }
307
308    /// Storage provider for state at that given block number
309    pub fn history_by_block_number<'a>(
310        &'a self,
311        mut block_number: BlockNumber,
312    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
313        if block_number == self.best_block_number().unwrap_or_default() &&
314            block_number == self.last_block_number().unwrap_or_default()
315        {
316            return Ok(Box::new(LatestStateProviderRef::new(self)))
317        }
318
319        // +1 as the changeset that we want is the one that was applied after this block.
320        block_number += 1;
321
322        let account_history_prune_checkpoint =
323            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
324        let storage_history_prune_checkpoint =
325            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
326
327        let mut state_provider =
328            HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
329        // If we pruned account or storage history, we can't return state on every historical block.
330        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
331        if let Some(prune_checkpoint_block_number) =
332            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
333        {
334            state_provider = state_provider.with_lowest_available_account_history_block_number(
335                prune_checkpoint_block_number + 1,
336            );
337        }
338        if let Some(prune_checkpoint_block_number) =
339            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
340        {
341            state_provider = state_provider.with_lowest_available_storage_history_block_number(
342                prune_checkpoint_block_number + 1,
343            );
344        }
345
346        Ok(Box::new(state_provider))
347    }
348
349    #[cfg(feature = "test-utils")]
350    /// Sets the prune modes for provider.
351    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
352        self.prune_modes = prune_modes;
353    }
354}
355
356impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
357    type Primitives = N::Primitives;
358}
359
360impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
361    /// Returns a static file provider
362    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
363        self.static_file_provider.clone()
364    }
365
366    fn get_static_file_writer(
367        &self,
368        block: BlockNumber,
369        segment: StaticFileSegment,
370    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
371        self.static_file_provider.get_writer(block, segment)
372    }
373}
374
375impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
376    /// Returns the `RocksDB` provider.
377    fn rocksdb_provider(&self) -> RocksDBProvider {
378        self.rocksdb_provider.clone()
379    }
380
381    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
382        self.pending_rocksdb_batches.lock().push(batch);
383    }
384
385    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
386        let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
387        for batch in batches {
388            self.rocksdb_provider.commit_batch(batch)?;
389        }
390        Ok(())
391    }
392}
393
394impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
395    for DatabaseProvider<TX, N>
396{
397    type ChainSpec = N::ChainSpec;
398
399    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
400        self.chain_spec.clone()
401    }
402}
403
404impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
405    /// Creates a provider with an inner read-write transaction.
406    #[expect(clippy::too_many_arguments)]
407    fn new_rw_inner(
408        tx: TX,
409        chain_spec: Arc<N::ChainSpec>,
410        static_file_provider: StaticFileProvider<N::Primitives>,
411        prune_modes: PruneModes,
412        storage: Arc<N::Storage>,
413        storage_settings: Arc<RwLock<StorageSettings>>,
414        rocksdb_provider: RocksDBProvider,
415        changeset_cache: ChangesetCache,
416        runtime: reth_tasks::Runtime,
417        db_path: PathBuf,
418        commit_order: CommitOrder,
419    ) -> Self {
420        Self {
421            tx,
422            chain_spec,
423            static_file_provider,
424            prune_modes,
425            storage,
426            storage_settings,
427            rocksdb_provider,
428            changeset_cache,
429            runtime,
430            db_path,
431            pending_rocksdb_batches: Default::default(),
432            commit_order,
433            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
434            metrics: metrics::DatabaseProviderMetrics::default(),
435            reader_txn_tracker: None,
436        }
437    }
438
439    /// Creates a provider with an inner read-write transaction using normal commit order.
440    #[expect(clippy::too_many_arguments)]
441    pub fn new_rw(
442        tx: TX,
443        chain_spec: Arc<N::ChainSpec>,
444        static_file_provider: StaticFileProvider<N::Primitives>,
445        prune_modes: PruneModes,
446        storage: Arc<N::Storage>,
447        storage_settings: Arc<RwLock<StorageSettings>>,
448        rocksdb_provider: RocksDBProvider,
449        changeset_cache: ChangesetCache,
450        runtime: reth_tasks::Runtime,
451        db_path: PathBuf,
452    ) -> Self {
453        Self::new_rw_inner(
454            tx,
455            chain_spec,
456            static_file_provider,
457            prune_modes,
458            storage,
459            storage_settings,
460            rocksdb_provider,
461            changeset_cache,
462            runtime,
463            db_path,
464            CommitOrder::Normal,
465        )
466    }
467
468    /// Creates a provider with an inner read-write transaction using unwind commit order.
469    #[expect(clippy::too_many_arguments)]
470    pub fn new_unwind_rw(
471        tx: TX,
472        chain_spec: Arc<N::ChainSpec>,
473        static_file_provider: StaticFileProvider<N::Primitives>,
474        prune_modes: PruneModes,
475        storage: Arc<N::Storage>,
476        storage_settings: Arc<RwLock<StorageSettings>>,
477        rocksdb_provider: RocksDBProvider,
478        changeset_cache: ChangesetCache,
479        runtime: reth_tasks::Runtime,
480        db_path: PathBuf,
481    ) -> Self {
482        Self::new_rw_inner(
483            tx,
484            chain_spec,
485            static_file_provider,
486            prune_modes,
487            storage,
488            storage_settings,
489            rocksdb_provider,
490            changeset_cache,
491            runtime,
492            db_path,
493            CommitOrder::Unwind,
494        )
495    }
496}
497
498impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
499    fn as_ref(&self) -> &Self {
500        self
501    }
502}
503
504impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
505    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
506    ///
507    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
508    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
509    where
510        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
511    {
512        let rocksdb = self.rocksdb_provider();
513        let rocksdb_batch = rocksdb.batch();
514
515        let (result, raw_batch) = f(rocksdb_batch)?;
516
517        if let Some(batch) = raw_batch {
518            self.set_pending_rocksdb_batch(batch);
519        }
520        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
521
522        Ok(result)
523    }
524
525    /// Creates the context for static file writes.
526    fn static_file_write_ctx(
527        &self,
528        save_mode: SaveBlocksMode,
529        first_block: BlockNumber,
530        last_block: BlockNumber,
531    ) -> ProviderResult<StaticFileWriteCtx> {
532        let tip = self.last_block_number()?.max(last_block);
533        Ok(StaticFileWriteCtx {
534            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
535                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
536            write_receipts: save_mode.with_state() &&
537                EitherWriter::receipts_destination(self).is_static_file(),
538            write_account_changesets: save_mode.with_state() &&
539                EitherWriterDestination::account_changesets(self).is_static_file(),
540            write_storage_changesets: save_mode.with_state() &&
541                EitherWriterDestination::storage_changesets(self).is_static_file(),
542            tip,
543            receipts_prune_mode: self.prune_modes.receipts,
544            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
545            receipts_prunable: self
546                .static_file_provider
547                .get_highest_static_file_tx(StaticFileSegment::Receipts)
548                .is_none() &&
549                PruneMode::Distance(self.minimum_pruning_distance)
550                    .should_prune(first_block, tip),
551        })
552    }
553
554    /// Creates the context for `RocksDB` writes.
555    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
556        RocksDBWriteCtx {
557            first_block_number: first_block,
558            prune_tx_lookup: self.prune_modes.transaction_lookup,
559            storage_settings: self.cached_storage_settings(),
560            pending_batches: self.pending_rocksdb_batches.clone(),
561        }
562    }
563
564    /// Writes executed blocks and state to storage.
565    ///
566    /// This method parallelizes static file (SF) writes with MDBX writes.
567    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
568    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
569    ///
570    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
571    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
572    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
573    pub fn save_blocks(
574        &self,
575        blocks: Vec<ExecutedBlock<N::Primitives>>,
576        save_mode: SaveBlocksMode,
577    ) -> ProviderResult<()> {
578        if blocks.is_empty() {
579            debug!(target: "providers::db", "Attempted to write empty block range");
580            return Ok(())
581        }
582
583        let total_start = Instant::now();
584        let block_count = blocks.len() as u64;
585        let first_number = blocks.first().unwrap().recovered_block().number();
586        let last_block_number = blocks.last().unwrap().recovered_block().number();
587
588        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
589
590        // Compute tx_nums upfront (both threads need these)
591        let first_tx_num = self
592            .tx
593            .cursor_read::<tables::TransactionBlocks>()?
594            .last()?
595            .map(|(n, _)| n + 1)
596            .unwrap_or_default();
597
598        let tx_nums: Vec<TxNumber> = {
599            let mut nums = Vec::with_capacity(blocks.len());
600            let mut current = first_tx_num;
601            for block in &blocks {
602                nums.push(current);
603                current += block.recovered_block().body().transaction_count() as u64;
604            }
605            nums
606        };
607
608        let mut timings =
609            metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
610
611        // avoid capturing &self.tx in scope below.
612        let sf_provider = &self.static_file_provider;
613        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
614        let rocksdb_provider = self.rocksdb_provider.clone();
615        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
616        let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
617
618        let mut sf_result = None;
619        let mut rocksdb_result = None;
620
621        // Write to all backends in parallel.
622        let runtime = &self.runtime;
623        // Propagate tracing context into rayon-spawned threads so that static file
624        // and RocksDB write spans appear as children of save_blocks in traces.
625        let span = tracing::Span::current();
626        runtime.storage_pool().in_place_scope(|s| {
627            // SF writes
628            s.spawn(|_| {
629                let _guard = span.enter();
630                let start = Instant::now();
631                sf_result = Some(
632                    sf_provider
633                        .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
634                        .map(|()| start.elapsed()),
635                );
636            });
637
638            // RocksDB writes
639            if rocksdb_enabled {
640                s.spawn(|_| {
641                    let _guard = span.enter();
642                    let start = Instant::now();
643                    rocksdb_result = Some(
644                        rocksdb_provider
645                            .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
646                            .map(|()| start.elapsed()),
647                    );
648                });
649            }
650
651            // MDBX writes
652            let mdbx_start = Instant::now();
653
654            // Collect all transaction hashes across all blocks, sort them, and write in batch
655            if !self.cached_storage_settings().storage_v2 &&
656                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
657            {
658                let start = Instant::now();
659                let total_tx_count: usize =
660                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
661                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
662                for (i, block) in blocks.iter().enumerate() {
663                    let recovered_block = block.recovered_block();
664                    for (tx_num, transaction) in
665                        (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
666                    {
667                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
668                    }
669                }
670
671                // Sort by hash for optimal MDBX insertion performance
672                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
673
674                // Write all transaction hash numbers in a single batch
675                self.with_rocksdb_batch(|batch| {
676                    let mut tx_hash_writer =
677                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
678                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
679                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
680                    Ok(((), raw_batch))
681                })?;
682                self.metrics.record_duration(
683                    metrics::Action::InsertTransactionHashNumbers,
684                    start.elapsed(),
685                );
686            }
687
688            for (i, block) in blocks.iter().enumerate() {
689                let recovered_block = block.recovered_block();
690
691                let start = Instant::now();
692                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
693                timings.insert_block += start.elapsed();
694
695                if save_mode.with_state() {
696                    let execution_output = block.execution_outcome();
697
698                    // Write state and changesets to the database.
699                    // Must be written after blocks because of the receipt lookup.
700                    // Skip receipts/account changesets if they're being written to static files.
701                    let start = Instant::now();
702                    self.write_state(
703                        WriteStateInput::Single {
704                            outcome: execution_output,
705                            block: recovered_block.number(),
706                        },
707                        OriginalValuesKnown::No,
708                        StateWriteConfig {
709                            write_receipts: !sf_ctx.write_receipts,
710                            write_account_changesets: !sf_ctx.write_account_changesets,
711                            write_storage_changesets: !sf_ctx.write_storage_changesets,
712                        },
713                    )?;
714                    timings.write_state += start.elapsed();
715                }
716            }
717
718            // Write all hashed state and trie updates in single batches.
719            // This reduces cursor open/close overhead from N calls to 1.
720            if save_mode.with_state() {
721                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
722                let start = Instant::now();
723                let merged_hashed_state = HashedPostStateSorted::merge_batch(
724                    blocks.iter().rev().map(|b| b.trie_data().hashed_state),
725                );
726                if !merged_hashed_state.is_empty() {
727                    self.write_hashed_state(&merged_hashed_state)?;
728                }
729                timings.write_hashed_state += start.elapsed();
730
731                let start = Instant::now();
732                let merged_trie =
733                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
734                if !merged_trie.is_empty() {
735                    self.write_trie_updates_sorted(&merged_trie)?;
736                }
737                timings.write_trie_updates += start.elapsed();
738            }
739
740            // Full mode: update history indices
741            if save_mode.with_state() {
742                let start = Instant::now();
743                self.update_history_indices(first_number..=last_block_number)?;
744                timings.update_history_indices = start.elapsed();
745            }
746
747            // Update pipeline progress
748            let start = Instant::now();
749            self.update_pipeline_stages(last_block_number, false)?;
750            timings.update_pipeline_stages = start.elapsed();
751
752            timings.mdbx = mdbx_start.elapsed();
753
754            Ok::<_, ProviderError>(())
755        })?;
756
757        // Collect results from spawned tasks
758        timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
759
760        if rocksdb_enabled {
761            timings.rocksdb = rocksdb_result.ok_or_else(|| {
762                ProviderError::Database(reth_db_api::DatabaseError::Other(
763                    "RocksDB thread panicked".into(),
764                ))
765            })??;
766        }
767
768        timings.total = total_start.elapsed();
769
770        self.metrics.record_save_blocks(&timings);
771        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
772
773        Ok(())
774    }
775
776    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
777    ///
778    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
779    #[instrument(level = "debug", target = "providers::db", skip_all)]
780    fn insert_block_mdbx_only(
781        &self,
782        block: &RecoveredBlock<BlockTy<N>>,
783        first_tx_num: TxNumber,
784    ) -> ProviderResult<StoredBlockBodyIndices> {
785        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
786            EitherWriterDestination::senders(self).is_database()
787        {
788            let start = Instant::now();
789            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
790            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
791            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
792                cursor.append(tx_num, &sender)?;
793            }
794            self.metrics
795                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
796        }
797
798        let block_number = block.number();
799        let tx_count = block.body().transaction_count() as u64;
800
801        let start = Instant::now();
802        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
803        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
804
805        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
806
807        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
808    }
809
810    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
811    /// `Ommers`/`Withdrawals`).
812    fn write_block_body_indices(
813        &self,
814        block_number: BlockNumber,
815        body: &BodyTy<N>,
816        first_tx_num: TxNumber,
817        tx_count: u64,
818    ) -> ProviderResult<()> {
819        // MDBX: BlockBodyIndices
820        let start = Instant::now();
821        self.tx
822            .cursor_write::<tables::BlockBodyIndices>()?
823            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
824        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
825
826        // MDBX: TransactionBlocks (last tx -> block mapping)
827        if tx_count > 0 {
828            let start = Instant::now();
829            self.tx
830                .cursor_write::<tables::TransactionBlocks>()?
831                .append(first_tx_num + tx_count - 1, &block_number)?;
832            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
833        }
834
835        // MDBX: Ommers/Withdrawals
836        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
837
838        Ok(())
839    }
840
841    /// Unwinds trie state starting at and including the given block.
842    ///
843    /// This includes calculating the resulted state root and comparing it with the parent block
844    /// state root.
845    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
846        let changed_accounts = self.account_changesets_range(from..)?;
847
848        // Unwind account hashes.
849        self.unwind_account_hashing(changed_accounts.iter())?;
850
851        // Unwind account history indices.
852        self.unwind_account_history_indices(changed_accounts.iter())?;
853
854        let changed_storages = self.storage_changesets_range(from..)?;
855
856        // Unwind storage hashes.
857        self.unwind_storage_hashing(changed_storages.iter().copied())?;
858
859        // Unwind storage history indices.
860        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
861
862        // Unwind accounts/storages trie tables using the revert.
863        // Get the database tip block number
864        let db_tip_block = self
865            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
866            .as_ref()
867            .map(|chk| chk.block_number)
868            .ok_or_else(|| ProviderError::InsufficientChangesets {
869                requested: from,
870                available: 0..=0,
871            })?;
872
873        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
874        self.write_trie_updates_sorted(&trie_revert)?;
875
876        Ok(())
877    }
878
879    /// Removes receipts from all transactions starting with provided number (inclusive).
880    fn remove_receipts_from(
881        &self,
882        from_tx: TxNumber,
883        last_block: BlockNumber,
884    ) -> ProviderResult<()> {
885        // iterate over block body and remove receipts
886        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
887
888        if EitherWriter::receipts_destination(self).is_static_file() {
889            let static_file_receipt_num =
890                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
891
892            let to_delete = static_file_receipt_num
893                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
894                .unwrap_or_default();
895
896            self.static_file_provider
897                .latest_writer(StaticFileSegment::Receipts)?
898                .prune_receipts(to_delete, last_block)?;
899        }
900
901        Ok(())
902    }
903
904    /// Writes bytecodes to MDBX.
905    fn write_bytecodes(
906        &self,
907        bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
908    ) -> ProviderResult<()> {
909        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
910        for (hash, bytecode) in bytecodes {
911            bytecodes_cursor.upsert(hash, &bytecode)?;
912        }
913        Ok(())
914    }
915}
916
917impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
918    fn try_into_history_at_block(
919        self,
920        mut block_number: BlockNumber,
921    ) -> ProviderResult<StateProviderBox> {
922        let best_block = self.best_block_number().unwrap_or_default();
923
924        // Reject requests for blocks beyond the best block
925        if block_number > best_block {
926            return Err(ProviderError::BlockNotExecuted {
927                requested: block_number,
928                executed: best_block,
929            });
930        }
931
932        // If requesting state at the best block, use the latest state provider
933        if block_number == best_block {
934            return Ok(Box::new(LatestStateProvider::new(self)));
935        }
936
937        // +1 as the changeset that we want is the one that was applied after this block.
938        block_number += 1;
939
940        let account_history_prune_checkpoint =
941            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
942        let storage_history_prune_checkpoint =
943            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
944        let changeset_cache = self.changeset_cache.clone();
945
946        let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
947
948        // If we pruned account or storage history, we can't return state on every historical block.
949        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
950        if let Some(prune_checkpoint_block_number) =
951            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
952        {
953            state_provider = state_provider.with_lowest_available_account_history_block_number(
954                prune_checkpoint_block_number + 1,
955            );
956        }
957        if let Some(prune_checkpoint_block_number) =
958            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
959        {
960            state_provider = state_provider.with_lowest_available_storage_history_block_number(
961                prune_checkpoint_block_number + 1,
962            );
963        }
964
965        Ok(Box::new(state_provider))
966    }
967}
968
969/// For a given key, unwind all history shards that contain block numbers at or above the given
970/// block number.
971///
972/// S - Sharded key subtype.
973/// T - Table to walk over.
974/// C - Cursor implementation.
975///
976/// This function walks the entries from the given start key and deletes all shards that belong to
977/// the key and contain block numbers at or above the given block number. Shards entirely below
978/// the block number are preserved.
979///
980/// The boundary shard (the shard that spans across the block number) is removed from the database.
981/// Any indices that are below the block number are filtered out and returned for reinsertion.
982/// The boundary shard is returned for reinsertion (if it's not empty).
983fn unwind_history_shards<S, T, C>(
984    cursor: &mut C,
985    start_key: T::Key,
986    block_number: BlockNumber,
987    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
988) -> ProviderResult<Vec<u64>>
989where
990    T: Table<Value = BlockNumberList>,
991    T::Key: AsRef<ShardedKey<S>>,
992    C: DbCursorRO<T> + DbCursorRW<T>,
993{
994    // Start from the given key and iterate through shards
995    let mut item = cursor.seek_exact(start_key)?;
996    while let Some((sharded_key, list)) = item {
997        // If the shard does not belong to the key, break.
998        if !shard_belongs_to_key(&sharded_key) {
999            break
1000        }
1001
1002        // Always delete the current shard from the database first
1003        // We'll decide later what (if anything) to reinsert
1004        cursor.delete_current()?;
1005
1006        // Get the first (lowest) block number in this shard
1007        // All block numbers in a shard are sorted in ascending order
1008        let first = list.iter().next().expect("List can't be empty");
1009
1010        // Case 1: Entire shard is at or above the unwinding point
1011        // Keep it deleted (don't return anything for reinsertion)
1012        if first >= block_number {
1013            item = cursor.prev()?;
1014            continue
1015        }
1016        // Case 2: This is a boundary shard (spans across the unwinding point)
1017        // The shard contains some blocks below and some at/above the unwinding point
1018        else if block_number <= sharded_key.as_ref().highest_block_number {
1019            // Return only the block numbers that are below the unwinding point
1020            // These will be reinserted to preserve the historical data
1021            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1022        }
1023        // Case 3: Entire shard is below the unwinding point
1024        // Return all block numbers for reinsertion (preserve entire shard)
1025        return Ok(list.iter().collect::<Vec<_>>())
1026    }
1027
1028    // No shards found or all processed
1029    Ok(Vec::new())
1030}
1031
1032impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1033    /// Creates a provider with an inner read-only transaction.
1034    #[expect(clippy::too_many_arguments)]
1035    pub fn new(
1036        tx: TX,
1037        chain_spec: Arc<N::ChainSpec>,
1038        static_file_provider: StaticFileProvider<N::Primitives>,
1039        prune_modes: PruneModes,
1040        storage: Arc<N::Storage>,
1041        storage_settings: Arc<RwLock<StorageSettings>>,
1042        rocksdb_provider: RocksDBProvider,
1043        changeset_cache: ChangesetCache,
1044        runtime: reth_tasks::Runtime,
1045        db_path: PathBuf,
1046    ) -> Self {
1047        Self {
1048            tx,
1049            chain_spec,
1050            static_file_provider,
1051            prune_modes,
1052            storage,
1053            storage_settings,
1054            rocksdb_provider,
1055            changeset_cache,
1056            runtime,
1057            db_path,
1058            pending_rocksdb_batches: Default::default(),
1059            commit_order: CommitOrder::Normal,
1060            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1061            metrics: metrics::DatabaseProviderMetrics::default(),
1062            reader_txn_tracker: None,
1063        }
1064    }
1065
1066    /// Consume `DbTx` or `DbTxMut`.
1067    pub fn into_tx(self) -> TX {
1068        self.tx
1069    }
1070
1071    /// Pass `DbTx` or `DbTxMut` mutable reference.
1072    pub const fn tx_mut(&mut self) -> &mut TX {
1073        &mut self.tx
1074    }
1075
1076    /// Pass `DbTx` or `DbTxMut` immutable reference.
1077    pub const fn tx_ref(&self) -> &TX {
1078        &self.tx
1079    }
1080
1081    /// Returns a reference to the chain specification.
1082    pub fn chain_spec(&self) -> &N::ChainSpec {
1083        &self.chain_spec
1084    }
1085}
1086
1087impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1088    fn recovered_block<H, HF, B, BF>(
1089        &self,
1090        id: BlockHashOrNumber,
1091        _transaction_kind: TransactionVariant,
1092        header_by_number: HF,
1093        construct_block: BF,
1094    ) -> ProviderResult<Option<B>>
1095    where
1096        H: AsRef<HeaderTy<N>>,
1097        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1098        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1099    {
1100        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1101        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1102
1103        // Get the block body
1104        //
1105        // If the body indices are not found, this means that the transactions either do not exist
1106        // in the database yet, or they do exit but are not indexed. If they exist but are not
1107        // indexed, we don't have enough information to return the block anyways, so we return
1108        // `None`.
1109        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1110
1111        let tx_range = body.tx_num_range();
1112
1113        let transactions = if tx_range.is_empty() {
1114            vec![]
1115        } else {
1116            self.transactions_by_tx_range(tx_range.clone())?
1117        };
1118
1119        let body = self
1120            .storage
1121            .reader()
1122            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1123            .pop()
1124            .ok_or(ProviderError::InvalidStorageOutput)?;
1125
1126        let senders = if tx_range.is_empty() {
1127            vec![]
1128        } else {
1129            let known_senders: HashMap<TxNumber, Address> =
1130                EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1131
1132            let mut senders = Vec::with_capacity(body.transactions().len());
1133            for (tx_num, tx) in tx_range.zip(body.transactions()) {
1134                match known_senders.get(&tx_num) {
1135                    None => {
1136                        let sender = tx.recover_signer_unchecked()?;
1137                        senders.push(sender);
1138                    }
1139                    Some(sender) => senders.push(*sender),
1140                }
1141            }
1142            senders
1143        };
1144
1145        construct_block(header, body, senders)
1146    }
1147
1148    /// Returns a range of blocks from the database.
1149    ///
1150    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1151    /// construct blocks from the following inputs:
1152    ///     – Header
1153    ///     - Range of transaction numbers
1154    ///     – Ommers
1155    ///     – Withdrawals
1156    ///     – Senders
1157    fn block_range<F, H, HF, R>(
1158        &self,
1159        range: RangeInclusive<BlockNumber>,
1160        headers_range: HF,
1161        mut assemble_block: F,
1162    ) -> ProviderResult<Vec<R>>
1163    where
1164        H: AsRef<HeaderTy<N>>,
1165        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1166        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1167    {
1168        if range.is_empty() {
1169            return Ok(Vec::new())
1170        }
1171
1172        let len = range.end().saturating_sub(*range.start()) as usize + 1;
1173        let mut blocks = Vec::with_capacity(len);
1174
1175        let headers = headers_range(range.clone())?;
1176
1177        // If the body indices are not found, this means that the transactions either do
1178        // not exist in the database yet, or they do exit but are
1179        // not indexed. If they exist but are not indexed, we don't
1180        // have enough information to return the block anyways, so
1181        // we skip the block.
1182        let present_headers = self
1183            .block_body_indices_range(range)?
1184            .into_iter()
1185            .map(|b| b.tx_num_range())
1186            .zip(headers)
1187            .collect::<Vec<_>>();
1188
1189        let mut inputs = Vec::with_capacity(present_headers.len());
1190        for (tx_range, header) in &present_headers {
1191            let transactions = if tx_range.is_empty() {
1192                Vec::new()
1193            } else {
1194                self.transactions_by_tx_range(tx_range.clone())?
1195            };
1196
1197            inputs.push((header.as_ref(), transactions));
1198        }
1199
1200        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1201
1202        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1203            blocks.push(assemble_block(header, body, tx_range)?);
1204        }
1205
1206        Ok(blocks)
1207    }
1208
1209    /// Returns a range of blocks from the database, along with the senders of each
1210    /// transaction in the blocks.
1211    ///
1212    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1213    /// construct blocks from the following inputs:
1214    ///     – Header
1215    ///     - Transactions
1216    ///     – Ommers
1217    ///     – Withdrawals
1218    ///     – Senders
1219    fn block_with_senders_range<H, HF, B, BF>(
1220        &self,
1221        range: RangeInclusive<BlockNumber>,
1222        headers_range: HF,
1223        assemble_block: BF,
1224    ) -> ProviderResult<Vec<B>>
1225    where
1226        H: AsRef<HeaderTy<N>>,
1227        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1228        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1229    {
1230        self.block_range(range, headers_range, |header, body, tx_range| {
1231            let senders = if tx_range.is_empty() {
1232                Vec::new()
1233            } else {
1234                let known_senders: HashMap<TxNumber, Address> =
1235                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1236
1237                let mut senders = Vec::with_capacity(body.transactions().len());
1238                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1239                    match known_senders.get(&tx_num) {
1240                        None => {
1241                            // recover the sender from the transaction if not found
1242                            let sender = tx.recover_signer_unchecked()?;
1243                            senders.push(sender);
1244                        }
1245                        Some(sender) => senders.push(*sender),
1246                    }
1247                }
1248
1249                senders
1250            };
1251
1252            assemble_block(header, body, senders)
1253        })
1254    }
1255
1256    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1257    /// [`tables::PlainAccountState`] and [`tables::PlainStorageState`] tables, based on the given
1258    /// storage and account changesets.
1259    fn populate_bundle_state(
1260        &self,
1261        account_changeset: Vec<(u64, AccountBeforeTx)>,
1262        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1263        mut get_account: impl FnMut(Address) -> ProviderResult<Option<Account>>,
1264        mut get_storage: impl FnMut(Address, StorageKey) -> ProviderResult<Option<StorageValue>>,
1265    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1266        // iterate previous value and get plain state value to create changeset
1267        // Double option around Account represent if Account state is know (first option) and
1268        // account is removed (Second Option)
1269        let mut state: BundleStateInit = HashMap::default();
1270
1271        // This is not working for blocks that are not at tip. as plain state is not the last
1272        // state of end range. We should rename the functions or add support to access
1273        // History state. Accessing history state can be tricky but we are not gaining
1274        // anything.
1275
1276        let mut reverts: RevertsInit = HashMap::default();
1277
1278        // add account changeset changes
1279        for (block_number, account_before) in account_changeset.into_iter().rev() {
1280            let AccountBeforeTx { info: old_info, address } = account_before;
1281            match state.entry(address) {
1282                hash_map::Entry::Vacant(entry) => {
1283                    let new_info = get_account(address)?;
1284                    entry.insert((old_info, new_info, HashMap::default()));
1285                }
1286                hash_map::Entry::Occupied(mut entry) => {
1287                    // overwrite old account state.
1288                    entry.get_mut().0 = old_info;
1289                }
1290            }
1291            // insert old info into reverts.
1292            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1293        }
1294
1295        // add storage changeset changes
1296        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1297            let BlockNumberAddress((block_number, address)) = block_and_address;
1298            // get account state or insert from plain state.
1299            let account_state = match state.entry(address) {
1300                hash_map::Entry::Vacant(entry) => {
1301                    let present_info = get_account(address)?;
1302                    entry.insert((present_info, present_info, HashMap::default()))
1303                }
1304                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1305            };
1306
1307            // match storage.
1308            match account_state.2.entry(old_storage.key) {
1309                hash_map::Entry::Vacant(entry) => {
1310                    let new_storage = get_storage(address, old_storage.key)?.unwrap_or_default();
1311                    entry.insert((old_storage.value, new_storage));
1312                }
1313                hash_map::Entry::Occupied(mut entry) => {
1314                    entry.get_mut().0 = old_storage.value;
1315                }
1316            };
1317
1318            reverts
1319                .entry(block_number)
1320                .or_default()
1321                .entry(address)
1322                .or_default()
1323                .1
1324                .push(old_storage);
1325        }
1326
1327        Ok((state, reverts))
1328    }
1329
1330    /// Invokes [`populate_bundle_state`](Self::populate_bundle_state) with the given plain state
1331    /// cursors.
1332    fn populate_bundle_state_plain(
1333        &self,
1334        account_changeset: Vec<(u64, AccountBeforeTx)>,
1335        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1336        plain_accounts_cursor: &mut impl DbCursorRO<tables::PlainAccountState>,
1337        plain_storage_cursor: &mut impl DbDupCursorRO<tables::PlainStorageState>,
1338    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1339        self.populate_bundle_state(
1340            account_changeset,
1341            storage_changeset,
1342            |address| Ok(plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1)),
1343            |address, storage_key| {
1344                Ok(plain_storage_cursor
1345                    .seek_by_key_subkey(address, storage_key)?
1346                    .filter(|s| s.key == storage_key)
1347                    .map(|s| s.value))
1348            },
1349        )
1350    }
1351
1352    /// Like [`populate_bundle_state`](Self::populate_bundle_state), but reads current values from
1353    /// `HashedAccounts`/`HashedStorages`. Addresses and storage keys are hashed via `keccak256`
1354    /// for DB lookups. The output `BundleStateInit`/`RevertsInit` structures remain keyed by
1355    /// plain address and plain storage key.
1356    fn populate_bundle_state_hashed(
1357        &self,
1358        account_changeset: Vec<(u64, AccountBeforeTx)>,
1359        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1360        hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1361        hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1362    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1363        self.populate_bundle_state(
1364            account_changeset,
1365            storage_changeset,
1366            |address| Ok(hashed_accounts_cursor.seek_exact(keccak256(address))?.map(|kv| kv.1)),
1367            |address, storage_key| {
1368                let hashed_storage_key = keccak256(storage_key);
1369                Ok(hashed_storage_cursor
1370                    .seek_by_key_subkey(keccak256(address), hashed_storage_key)?
1371                    .filter(|s| s.key == hashed_storage_key)
1372                    .map(|s| s.value))
1373            },
1374        )
1375    }
1376
1377    fn populate_bundle_state_with_provider(
1378        &self,
1379        account_changeset: Vec<(u64, AccountBeforeTx)>,
1380        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1381        state_provider: impl StateProvider,
1382    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1383        self.populate_bundle_state(
1384            account_changeset,
1385            storage_changeset,
1386            |address| state_provider.basic_account(&address),
1387            |address, storage_key| state_provider.storage(address, storage_key),
1388        )
1389    }
1390}
1391
1392impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1393    /// Insert history index to the database.
1394    ///
1395    /// For each updated partial key, this function retrieves the last shard from the database
1396    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1397    /// the shards back into the database.
1398    ///
1399    /// This function is used by history indexing stages.
1400    fn append_history_index<P, T>(
1401        &self,
1402        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1403        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1404    ) -> ProviderResult<()>
1405    where
1406        P: Copy,
1407        T: Table<Value = BlockNumberList>,
1408    {
1409        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1410        // will append duplicate entries instead of updating existing ones, causing data corruption.
1411        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1412
1413        let mut cursor = self.tx.cursor_write::<T>()?;
1414
1415        for (partial_key, indices) in index_updates {
1416            let last_key = sharded_key_factory(partial_key, u64::MAX);
1417            let mut last_shard = cursor
1418                .seek_exact(last_key.clone())?
1419                .map(|(_, list)| list)
1420                .unwrap_or_else(BlockNumberList::empty);
1421
1422            last_shard.append(indices).map_err(ProviderError::other)?;
1423
1424            // fast path: all indices fit in one shard
1425            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1426                cursor.upsert(last_key, &last_shard)?;
1427                continue;
1428            }
1429
1430            // slow path: rechunk into multiple shards
1431            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1432            let mut chunks_peekable = chunks.into_iter().peekable();
1433
1434            while let Some(chunk) = chunks_peekable.next() {
1435                let shard = BlockNumberList::new_pre_sorted(chunk);
1436                let highest_block_number = if chunks_peekable.peek().is_some() {
1437                    shard.iter().next_back().expect("`chunks` does not return empty list")
1438                } else {
1439                    // Insert last list with `u64::MAX`.
1440                    u64::MAX
1441                };
1442
1443                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1444            }
1445        }
1446
1447        Ok(())
1448    }
1449}
1450
1451impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1452    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1453        if self.cached_storage_settings().use_hashed_state() {
1454            let hashed_address = keccak256(address);
1455            Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1456        } else {
1457            Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1458        }
1459    }
1460}
1461
1462impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1463    fn changed_accounts_with_range(
1464        &self,
1465        range: RangeInclusive<BlockNumber>,
1466    ) -> ProviderResult<BTreeSet<Address>> {
1467        let mut reader = EitherReader::new_account_changesets(self)?;
1468
1469        reader.changed_accounts_with_range(range)
1470    }
1471
1472    fn basic_accounts(
1473        &self,
1474        iter: impl IntoIterator<Item = Address>,
1475    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1476        if self.cached_storage_settings().use_hashed_state() {
1477            let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1478            Ok(iter
1479                .into_iter()
1480                .map(|address| {
1481                    let hashed_address = keccak256(address);
1482                    hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1483                })
1484                .collect::<Result<Vec<_>, _>>()?)
1485        } else {
1486            let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1487            Ok(iter
1488                .into_iter()
1489                .map(|address| {
1490                    plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1491                })
1492                .collect::<Result<Vec<_>, _>>()?)
1493        }
1494    }
1495
1496    fn changed_accounts_and_blocks_with_range(
1497        &self,
1498        range: RangeInclusive<BlockNumber>,
1499    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1500        let highest_static_block = self
1501            .static_file_provider
1502            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1503
1504        if let Some(highest) = highest_static_block &&
1505            self.cached_storage_settings().storage_v2
1506        {
1507            let start = *range.start();
1508            let static_end = (*range.end()).min(highest);
1509
1510            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1511            if start <= static_end {
1512                for block in start..=static_end {
1513                    let block_changesets = self.account_block_changeset(block)?;
1514                    for changeset in block_changesets {
1515                        changed_accounts_and_blocks
1516                            .entry(changeset.address)
1517                            .or_default()
1518                            .push(block);
1519                    }
1520                }
1521            }
1522
1523            Ok(changed_accounts_and_blocks)
1524        } else {
1525            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1526
1527            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1528                BTreeMap::new(),
1529                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1530                    let (index, account) = entry?;
1531                    accounts.entry(account.address).or_default().push(index);
1532                    Ok(accounts)
1533                },
1534            )?;
1535
1536            Ok(account_transitions)
1537        }
1538    }
1539}
1540
1541impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1542    fn storage_changeset(
1543        &self,
1544        block_number: BlockNumber,
1545    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1546        if self.cached_storage_settings().storage_v2 {
1547            self.static_file_provider.storage_changeset(block_number)
1548        } else {
1549            let range = block_number..=block_number;
1550            let storage_range = BlockNumberAddress::range(range);
1551            self.tx
1552                .cursor_dup_read::<tables::StorageChangeSets>()?
1553                .walk_range(storage_range)?
1554                .map(|r| {
1555                    let (bna, entry) = r?;
1556                    Ok((bna, entry))
1557                })
1558                .collect()
1559        }
1560    }
1561
1562    fn get_storage_before_block(
1563        &self,
1564        block_number: BlockNumber,
1565        address: Address,
1566        storage_key: B256,
1567    ) -> ProviderResult<Option<StorageEntry>> {
1568        if self.cached_storage_settings().storage_v2 {
1569            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1570        } else {
1571            Ok(self
1572                .tx
1573                .cursor_dup_read::<tables::StorageChangeSets>()?
1574                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1575                .filter(|entry| entry.key == storage_key))
1576        }
1577    }
1578
1579    fn storage_changesets_range(
1580        &self,
1581        range: impl RangeBounds<BlockNumber>,
1582    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1583        if self.cached_storage_settings().storage_v2 {
1584            self.static_file_provider.storage_changesets_range(range)
1585        } else {
1586            self.tx
1587                .cursor_dup_read::<tables::StorageChangeSets>()?
1588                .walk_range(BlockNumberAddressRange::from(range))?
1589                .map(|r| {
1590                    let (bna, entry) = r?;
1591                    Ok((bna, entry))
1592                })
1593                .collect()
1594        }
1595    }
1596}
1597
1598impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1599    fn account_block_changeset(
1600        &self,
1601        block_number: BlockNumber,
1602    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1603        if self.cached_storage_settings().storage_v2 {
1604            let static_changesets =
1605                self.static_file_provider.account_block_changeset(block_number)?;
1606            Ok(static_changesets)
1607        } else {
1608            let range = block_number..=block_number;
1609            self.tx
1610                .cursor_read::<tables::AccountChangeSets>()?
1611                .walk_range(range)?
1612                .map(|result| -> ProviderResult<_> {
1613                    let (_, account_before) = result?;
1614                    Ok(account_before)
1615                })
1616                .collect()
1617        }
1618    }
1619
1620    fn get_account_before_block(
1621        &self,
1622        block_number: BlockNumber,
1623        address: Address,
1624    ) -> ProviderResult<Option<AccountBeforeTx>> {
1625        if self.cached_storage_settings().storage_v2 {
1626            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1627        } else {
1628            self.tx
1629                .cursor_dup_read::<tables::AccountChangeSets>()?
1630                .seek_by_key_subkey(block_number, address)?
1631                .filter(|acc| acc.address == address)
1632                .map(Ok)
1633                .transpose()
1634        }
1635    }
1636
1637    fn account_changesets_range(
1638        &self,
1639        range: impl core::ops::RangeBounds<BlockNumber>,
1640    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1641        if self.cached_storage_settings().storage_v2 {
1642            self.static_file_provider.account_changesets_range(range)
1643        } else {
1644            self.tx
1645                .cursor_read::<tables::AccountChangeSets>()?
1646                .walk_range(to_range(range))?
1647                .map(|r| r.map_err(Into::into))
1648                .collect()
1649        }
1650    }
1651}
1652
1653impl<Tx: DbTx + 'static, N: NodeTypesForProvider> StateReader for DatabaseProvider<Tx, N> {
1654    type Receipt = ReceiptTy<N>;
1655
1656    fn get_state(
1657        &self,
1658        block: BlockNumber,
1659    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1660        let Some(block_body) = self.block_body_indices(block)? else { return Ok(None) };
1661
1662        let from_transaction_num = block_body.first_tx_num();
1663        let to_transaction_num = block_body.last_tx_num();
1664
1665        let account_changeset = self.account_changesets_range(block..=block)?;
1666        let storage_changeset = self.storage_changeset(block)?;
1667
1668        let Some(block_hash) = self.block_hash(block)? else { return Ok(None) };
1669        let state_provider = self.history_by_block_hash(block_hash)?;
1670        let (state, reverts) = self.populate_bundle_state_with_provider(
1671            account_changeset,
1672            storage_changeset,
1673            state_provider,
1674        )?;
1675
1676        let receipts = self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?;
1677
1678        Ok(Some(ExecutionOutcome::new_init(
1679            state,
1680            reverts,
1681            // We skip new contracts since we never delete them from the database
1682            Vec::new(),
1683            vec![receipts],
1684            block,
1685            Vec::new(),
1686        )))
1687    }
1688}
1689
1690impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1691    for DatabaseProvider<TX, N>
1692{
1693    type Header = HeaderTy<N>;
1694
1695    fn local_tip_header(
1696        &self,
1697        highest_uninterrupted_block: BlockNumber,
1698    ) -> ProviderResult<SealedHeader<Self::Header>> {
1699        let static_file_provider = self.static_file_provider();
1700
1701        // Make sure Headers static file is at the same height. If it's further, this
1702        // input execution was interrupted previously and we need to unwind the static file.
1703        let next_static_file_block_num = static_file_provider
1704            .get_highest_static_file_block(StaticFileSegment::Headers)
1705            .map(|id| id + 1)
1706            .unwrap_or_default();
1707        let next_block = highest_uninterrupted_block + 1;
1708
1709        match next_static_file_block_num.cmp(&next_block) {
1710            // The node shutdown between an executed static file commit and before the database
1711            // commit, so we need to unwind the static files.
1712            Ordering::Greater => {
1713                let mut static_file_producer =
1714                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1715                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1716                // Since this is a database <-> static file inconsistency, we commit the change
1717                // straight away.
1718                static_file_producer.commit()?
1719            }
1720            Ordering::Less => {
1721                // There's either missing or corrupted files.
1722                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1723            }
1724            Ordering::Equal => {}
1725        }
1726
1727        let local_head = static_file_provider
1728            .sealed_header(highest_uninterrupted_block)?
1729            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1730
1731        Ok(local_head)
1732    }
1733}
1734
1735impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1736    type Header = HeaderTy<N>;
1737
1738    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1739        if let Some(num) = self.block_number(block_hash)? {
1740            Ok(self.header_by_number(num)?)
1741        } else {
1742            Ok(None)
1743        }
1744    }
1745
1746    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1747        self.static_file_provider.header_by_number(num)
1748    }
1749
1750    fn headers_range(
1751        &self,
1752        range: impl RangeBounds<BlockNumber>,
1753    ) -> ProviderResult<Vec<Self::Header>> {
1754        self.static_file_provider.headers_range(range)
1755    }
1756
1757    fn sealed_header(
1758        &self,
1759        number: BlockNumber,
1760    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1761        self.static_file_provider.sealed_header(number)
1762    }
1763
1764    fn sealed_headers_while(
1765        &self,
1766        range: impl RangeBounds<BlockNumber>,
1767        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1768    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1769        self.static_file_provider.sealed_headers_while(range, predicate)
1770    }
1771}
1772
1773impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1774    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1775        self.static_file_provider.block_hash(number)
1776    }
1777
1778    fn canonical_hashes_range(
1779        &self,
1780        start: BlockNumber,
1781        end: BlockNumber,
1782    ) -> ProviderResult<Vec<B256>> {
1783        self.static_file_provider.canonical_hashes_range(start, end)
1784    }
1785}
1786
1787impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1788    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1789        let best_number = self.best_block_number()?;
1790        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1791        Ok(ChainInfo { best_hash, best_number })
1792    }
1793
1794    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1795        // The best block number is tracked via the finished stage which gets updated in the same tx
1796        // when new blocks committed
1797        Ok(self
1798            .get_stage_checkpoint(StageId::Finish)?
1799            .map(|checkpoint| checkpoint.block_number)
1800            .unwrap_or_default())
1801    }
1802
1803    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1804        self.static_file_provider.last_block_number()
1805    }
1806
1807    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1808        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1809    }
1810}
1811
1812impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1813    type Block = BlockTy<N>;
1814
1815    fn find_block_by_hash(
1816        &self,
1817        hash: B256,
1818        source: BlockSource,
1819    ) -> ProviderResult<Option<Self::Block>> {
1820        if source.is_canonical() {
1821            self.block(hash.into())
1822        } else {
1823            Ok(None)
1824        }
1825    }
1826
1827    /// Returns the block with matching number from database.
1828    ///
1829    /// If the header for this block is not found, this returns `None`.
1830    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1831    /// will return None.
1832    ///
1833    /// Returns an error if the requested block is below the earliest available history.
1834    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1835        if let Some(number) = self.convert_hash_or_number(id)? {
1836            let earliest_available = self.static_file_provider.earliest_history_height();
1837            if number < earliest_available {
1838                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1839            }
1840
1841            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1842
1843            // If the body indices are not found, this means that the transactions either do not
1844            // exist in the database yet, or they do exit but are not indexed.
1845            // If they exist but are not indexed, we don't have enough
1846            // information to return the block anyways, so we return `None`.
1847            let Some(transactions) = self.transactions_by_block(number.into())? else {
1848                return Ok(None)
1849            };
1850
1851            let body = self
1852                .storage
1853                .reader()
1854                .read_block_bodies(self, vec![(&header, transactions)])?
1855                .pop()
1856                .ok_or(ProviderError::InvalidStorageOutput)?;
1857
1858            return Ok(Some(Self::Block::new(header, body)))
1859        }
1860
1861        Ok(None)
1862    }
1863
1864    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1865        Ok(None)
1866    }
1867
1868    fn pending_block_and_receipts(
1869        &self,
1870    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1871        Ok(None)
1872    }
1873
1874    /// Returns the block with senders with matching number or hash from database.
1875    ///
1876    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1877    /// spot, and we want fast querying.**
1878    ///
1879    /// If the header for this block is not found, this returns `None`.
1880    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1881    /// will return None.
1882    fn recovered_block(
1883        &self,
1884        id: BlockHashOrNumber,
1885        transaction_kind: TransactionVariant,
1886    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1887        self.recovered_block(
1888            id,
1889            transaction_kind,
1890            |block_number| self.header_by_number(block_number),
1891            |header, body, senders| {
1892                Self::Block::new(header, body)
1893                    // Note: we're using unchecked here because we know the block contains valid txs
1894                    // wrt to its height and can ignore the s value check so pre
1895                    // EIP-2 txs are allowed
1896                    .try_into_recovered_unchecked(senders)
1897                    .map(Some)
1898                    .map_err(|_| ProviderError::SenderRecoveryError)
1899            },
1900        )
1901    }
1902
1903    fn sealed_block_with_senders(
1904        &self,
1905        id: BlockHashOrNumber,
1906        transaction_kind: TransactionVariant,
1907    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1908        self.recovered_block(
1909            id,
1910            transaction_kind,
1911            |block_number| self.sealed_header(block_number),
1912            |header, body, senders| {
1913                Self::Block::new_sealed(header, body)
1914                    // Note: we're using unchecked here because we know the block contains valid txs
1915                    // wrt to its height and can ignore the s value check so pre
1916                    // EIP-2 txs are allowed
1917                    .try_with_senders_unchecked(senders)
1918                    .map(Some)
1919                    .map_err(|_| ProviderError::SenderRecoveryError)
1920            },
1921        )
1922    }
1923
1924    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1925        self.block_range(
1926            range,
1927            |range| self.headers_range(range),
1928            |header, body, _| Ok(Self::Block::new(header, body)),
1929        )
1930    }
1931
1932    fn block_with_senders_range(
1933        &self,
1934        range: RangeInclusive<BlockNumber>,
1935    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1936        self.block_with_senders_range(
1937            range,
1938            |range| self.headers_range(range),
1939            |header, body, senders| {
1940                Self::Block::new(header, body)
1941                    .try_into_recovered_unchecked(senders)
1942                    .map_err(|_| ProviderError::SenderRecoveryError)
1943            },
1944        )
1945    }
1946
1947    fn recovered_block_range(
1948        &self,
1949        range: RangeInclusive<BlockNumber>,
1950    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1951        self.block_with_senders_range(
1952            range,
1953            |range| self.sealed_headers_range(range),
1954            |header, body, senders| {
1955                Self::Block::new_sealed(header, body)
1956                    .try_with_senders(senders)
1957                    .map_err(|_| ProviderError::SenderRecoveryError)
1958            },
1959        )
1960    }
1961
1962    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1963        Ok(self
1964            .tx
1965            .cursor_read::<tables::TransactionBlocks>()?
1966            .seek(id)
1967            .map(|b| b.map(|(_, bn)| bn))?)
1968    }
1969}
1970
1971impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1972    for DatabaseProvider<TX, N>
1973{
1974    /// Recovers transaction hashes by walking through `Transactions` table and
1975    /// calculating them in a parallel manner. Returned unsorted.
1976    fn transaction_hashes_by_range(
1977        &self,
1978        tx_range: Range<TxNumber>,
1979    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1980        self.static_file_provider.transaction_hashes_by_range(tx_range)
1981    }
1982}
1983
1984// Calculates the hash of the given transaction
1985impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1986    type Transaction = TxTy<N>;
1987
1988    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1989        self.with_rocksdb_snapshot(|rocksdb_ref| {
1990            let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1991            reader.get_transaction_hash_number(tx_hash)
1992        })
1993    }
1994
1995    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1996        self.static_file_provider.transaction_by_id(id)
1997    }
1998
1999    fn transaction_by_id_unhashed(
2000        &self,
2001        id: TxNumber,
2002    ) -> ProviderResult<Option<Self::Transaction>> {
2003        self.static_file_provider.transaction_by_id_unhashed(id)
2004    }
2005
2006    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2007        if let Some(id) = self.transaction_id(hash)? {
2008            Ok(self.transaction_by_id_unhashed(id)?)
2009        } else {
2010            Ok(None)
2011        }
2012    }
2013
2014    fn transaction_by_hash_with_meta(
2015        &self,
2016        tx_hash: TxHash,
2017    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2018        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
2019            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
2020            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
2021            let Some(sealed_header) = self.sealed_header(block_number)?
2022        {
2023            let (header, block_hash) = sealed_header.split();
2024            if let Some(block_body) = self.block_body_indices(block_number)? {
2025                // the index of the tx in the block is the offset:
2026                // len([start..tx_id])
2027                // NOTE: `transaction_id` is always `>=` the block's first
2028                // index
2029                let index = transaction_id - block_body.first_tx_num();
2030
2031                let meta = TransactionMeta {
2032                    tx_hash,
2033                    index,
2034                    block_hash,
2035                    block_number,
2036                    base_fee: header.base_fee_per_gas(),
2037                    excess_blob_gas: header.excess_blob_gas(),
2038                    timestamp: header.timestamp(),
2039                };
2040
2041                return Ok(Some((transaction, meta)))
2042            }
2043        }
2044
2045        Ok(None)
2046    }
2047
2048    fn transactions_by_block(
2049        &self,
2050        id: BlockHashOrNumber,
2051    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2052        if let Some(block_number) = self.convert_hash_or_number(id)? &&
2053            let Some(body) = self.block_body_indices(block_number)?
2054        {
2055            let tx_range = body.tx_num_range();
2056            return if tx_range.is_empty() {
2057                Ok(Some(Vec::new()))
2058            } else {
2059                self.transactions_by_tx_range(tx_range).map(Some)
2060            }
2061        }
2062        Ok(None)
2063    }
2064
2065    fn transactions_by_block_range(
2066        &self,
2067        range: impl RangeBounds<BlockNumber>,
2068    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2069        let range = to_range(range);
2070
2071        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2072            .into_iter()
2073            .map(|body| {
2074                let tx_num_range = body.tx_num_range();
2075                if tx_num_range.is_empty() {
2076                    Ok(Vec::new())
2077                } else {
2078                    self.transactions_by_tx_range(tx_num_range)
2079                }
2080            })
2081            .collect()
2082    }
2083
2084    fn transactions_by_tx_range(
2085        &self,
2086        range: impl RangeBounds<TxNumber>,
2087    ) -> ProviderResult<Vec<Self::Transaction>> {
2088        self.static_file_provider.transactions_by_tx_range(range)
2089    }
2090
2091    fn senders_by_tx_range(
2092        &self,
2093        range: impl RangeBounds<TxNumber>,
2094    ) -> ProviderResult<Vec<Address>> {
2095        if EitherWriterDestination::senders(self).is_static_file() {
2096            self.static_file_provider.senders_by_tx_range(range)
2097        } else {
2098            self.cursor_read_collect::<tables::TransactionSenders>(range)
2099        }
2100    }
2101
2102    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2103        if EitherWriterDestination::senders(self).is_static_file() {
2104            self.static_file_provider.transaction_sender(id)
2105        } else {
2106            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2107        }
2108    }
2109}
2110
2111impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2112    type Receipt = ReceiptTy<N>;
2113
2114    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2115        self.static_file_provider.get_with_static_file_or_database(
2116            StaticFileSegment::Receipts,
2117            id,
2118            |static_file| static_file.receipt(id),
2119            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2120        )
2121    }
2122
2123    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2124        if let Some(id) = self.transaction_id(hash)? {
2125            self.receipt(id)
2126        } else {
2127            Ok(None)
2128        }
2129    }
2130
2131    fn receipts_by_block(
2132        &self,
2133        block: BlockHashOrNumber,
2134    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2135        if let Some(number) = self.convert_hash_or_number(block)? &&
2136            let Some(body) = self.block_body_indices(number)?
2137        {
2138            let tx_range = body.tx_num_range();
2139            return if tx_range.is_empty() {
2140                Ok(Some(Vec::new()))
2141            } else {
2142                self.receipts_by_tx_range(tx_range).map(Some)
2143            }
2144        }
2145        Ok(None)
2146    }
2147
2148    fn receipts_by_tx_range(
2149        &self,
2150        range: impl RangeBounds<TxNumber>,
2151    ) -> ProviderResult<Vec<Self::Receipt>> {
2152        self.static_file_provider.get_range_with_static_file_or_database(
2153            StaticFileSegment::Receipts,
2154            to_range(range),
2155            |static_file, range, _| static_file.receipts_by_tx_range(range),
2156            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2157            |_| true,
2158        )
2159    }
2160
2161    fn receipts_by_block_range(
2162        &self,
2163        block_range: RangeInclusive<BlockNumber>,
2164    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2165        if block_range.is_empty() {
2166            return Ok(Vec::new());
2167        }
2168
2169        // collect block body indices for each block in the range
2170        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2171        let mut block_body_indices = Vec::with_capacity(range_len);
2172        for block_num in block_range {
2173            if let Some(indices) = self.block_body_indices(block_num)? {
2174                block_body_indices.push(indices);
2175            } else {
2176                // use default indices for missing blocks (empty block)
2177                block_body_indices.push(StoredBlockBodyIndices::default());
2178            }
2179        }
2180
2181        if block_body_indices.is_empty() {
2182            return Ok(Vec::new());
2183        }
2184
2185        // find blocks with transactions to determine transaction range
2186        let non_empty_blocks: Vec<_> =
2187            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2188
2189        if non_empty_blocks.is_empty() {
2190            // all blocks are empty
2191            return Ok(vec![Vec::new(); block_body_indices.len()]);
2192        }
2193
2194        // calculate the overall transaction range
2195        let first_tx = non_empty_blocks[0].first_tx_num();
2196        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2197
2198        // fetch all receipts in the transaction range
2199        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2200        let mut receipts_iter = all_receipts.into_iter();
2201
2202        // distribute receipts to their respective blocks
2203        let mut result = Vec::with_capacity(block_body_indices.len());
2204        for indices in &block_body_indices {
2205            if indices.tx_count == 0 {
2206                result.push(Vec::new());
2207            } else {
2208                let block_receipts =
2209                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2210                result.push(block_receipts);
2211            }
2212        }
2213
2214        Ok(result)
2215    }
2216}
2217
2218impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2219    for DatabaseProvider<TX, N>
2220{
2221    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2222        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2223    }
2224
2225    fn block_body_indices_range(
2226        &self,
2227        range: RangeInclusive<BlockNumber>,
2228    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2229        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2230    }
2231}
2232
2233impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2234    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2235        Ok(if let Some(encoded) = id.get_pre_encoded() {
2236            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2237        } else {
2238            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2239        })
2240    }
2241
2242    /// Get stage checkpoint progress.
2243    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2244        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2245    }
2246
2247    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2248        self.tx
2249            .cursor_read::<tables::StageCheckpoints>()?
2250            .walk(None)?
2251            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2252            .map_err(ProviderError::Database)
2253    }
2254}
2255
2256impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2257    /// Save stage checkpoint.
2258    fn save_stage_checkpoint(
2259        &self,
2260        id: StageId,
2261        checkpoint: StageCheckpoint,
2262    ) -> ProviderResult<()> {
2263        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2264    }
2265
2266    /// Save stage checkpoint progress.
2267    fn save_stage_checkpoint_progress(
2268        &self,
2269        id: StageId,
2270        checkpoint: Vec<u8>,
2271    ) -> ProviderResult<()> {
2272        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2273    }
2274
2275    #[instrument(level = "debug", target = "providers::db", skip_all)]
2276    fn update_pipeline_stages(
2277        &self,
2278        block_number: BlockNumber,
2279        drop_stage_checkpoint: bool,
2280    ) -> ProviderResult<()> {
2281        // iterate over all existing stages in the table and update its progress.
2282        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2283        for stage_id in StageId::ALL {
2284            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2285            cursor.upsert(
2286                stage_id.to_string(),
2287                &StageCheckpoint {
2288                    block_number,
2289                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2290                },
2291            )?;
2292        }
2293
2294        Ok(())
2295    }
2296}
2297
2298impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2299    fn plain_state_storages(
2300        &self,
2301        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2302    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2303        if self.cached_storage_settings().use_hashed_state() {
2304            let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2305
2306            addresses_with_keys
2307                .into_iter()
2308                .map(|(address, storage)| {
2309                    let hashed_address = keccak256(address);
2310                    storage
2311                        .into_iter()
2312                        .map(|key| -> ProviderResult<_> {
2313                            let hashed_key = keccak256(key);
2314                            let value = hashed_storage
2315                                .seek_by_key_subkey(hashed_address, hashed_key)?
2316                                .filter(|v| v.key == hashed_key)
2317                                .map(|v| v.value)
2318                                .unwrap_or_default();
2319                            Ok(StorageEntry { key, value })
2320                        })
2321                        .collect::<ProviderResult<Vec<_>>>()
2322                        .map(|storage| (address, storage))
2323                })
2324                .collect::<ProviderResult<Vec<(_, _)>>>()
2325        } else {
2326            let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2327
2328            addresses_with_keys
2329                .into_iter()
2330                .map(|(address, storage)| {
2331                    storage
2332                        .into_iter()
2333                        .map(|key| -> ProviderResult<_> {
2334                            Ok(plain_storage
2335                                .seek_by_key_subkey(address, key)?
2336                                .filter(|v| v.key == key)
2337                                .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2338                        })
2339                        .collect::<ProviderResult<Vec<_>>>()
2340                        .map(|storage| (address, storage))
2341                })
2342                .collect::<ProviderResult<Vec<(_, _)>>>()
2343        }
2344    }
2345
2346    fn changed_storages_with_range(
2347        &self,
2348        range: RangeInclusive<BlockNumber>,
2349    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2350        if self.cached_storage_settings().storage_v2 {
2351            self.storage_changesets_range(range)?.into_iter().try_fold(
2352                BTreeMap::new(),
2353                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2354                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2355                    accounts.entry(address).or_default().insert(storage_entry.key);
2356                    Ok(accounts)
2357                },
2358            )
2359        } else {
2360            self.tx
2361                .cursor_read::<tables::StorageChangeSets>()?
2362                .walk_range(BlockNumberAddress::range(range))?
2363                // fold all storages and save its old state so we can remove it from HashedStorage
2364                // it is needed as it is dup table.
2365                .try_fold(
2366                    BTreeMap::new(),
2367                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2368                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2369                        accounts.entry(address).or_default().insert(storage_entry.key);
2370                        Ok(accounts)
2371                    },
2372                )
2373        }
2374    }
2375
2376    fn changed_storages_and_blocks_with_range(
2377        &self,
2378        range: RangeInclusive<BlockNumber>,
2379    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2380        if self.cached_storage_settings().storage_v2 {
2381            self.storage_changesets_range(range)?.into_iter().try_fold(
2382                BTreeMap::new(),
2383                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2384                    storages
2385                        .entry((index.address(), storage.key))
2386                        .or_default()
2387                        .push(index.block_number());
2388                    Ok(storages)
2389                },
2390            )
2391        } else {
2392            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2393
2394            let storage_changeset_lists =
2395                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2396                    BTreeMap::new(),
2397                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2398                     entry|
2399                     -> ProviderResult<_> {
2400                        let (index, storage) = entry?;
2401                        storages
2402                            .entry((index.address(), storage.key))
2403                            .or_default()
2404                            .push(index.block_number());
2405                        Ok(storages)
2406                    },
2407                )?;
2408
2409            Ok(storage_changeset_lists)
2410        }
2411    }
2412}
2413
2414impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2415    for DatabaseProvider<TX, N>
2416{
2417    type Receipt = ReceiptTy<N>;
2418
2419    #[instrument(level = "debug", target = "providers::db", skip_all)]
2420    fn write_state<'a>(
2421        &self,
2422        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2423        is_value_known: OriginalValuesKnown,
2424        config: StateWriteConfig,
2425    ) -> ProviderResult<()> {
2426        let execution_outcome = execution_outcome.into();
2427
2428        if self.cached_storage_settings().use_hashed_state() &&
2429            !config.write_receipts &&
2430            !config.write_account_changesets &&
2431            !config.write_storage_changesets
2432        {
2433            // In storage v2 with all outputs directed to static files, plain state and changesets
2434            // are written elsewhere. Only bytecodes need MDBX writes, so skip the expensive
2435            // to_plain_state_and_reverts conversion that iterates all accounts and storage.
2436            self.write_bytecodes(
2437                execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2438            )?;
2439            return Ok(());
2440        }
2441
2442        let first_block = execution_outcome.first_block();
2443        let (plain_state, reverts) =
2444            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2445
2446        self.write_state_reverts(reverts, first_block, config)?;
2447        self.write_state_changes(plain_state)?;
2448
2449        if !config.write_receipts {
2450            return Ok(());
2451        }
2452
2453        let block_count = execution_outcome.len() as u64;
2454        let last_block = execution_outcome.last_block();
2455        let block_range = first_block..=last_block;
2456
2457        let tip = self.last_block_number()?.max(last_block);
2458
2459        // Fetch the first transaction number for each block in the range
2460        let block_indices: Vec<_> = self
2461            .block_body_indices_range(block_range)?
2462            .into_iter()
2463            .map(|b| b.first_tx_num)
2464            .collect();
2465
2466        // Ensure all expected blocks are present.
2467        if block_indices.len() < block_count as usize {
2468            let missing_blocks = block_count - block_indices.len() as u64;
2469            return Err(ProviderError::BlockBodyIndicesNotFound(
2470                last_block.saturating_sub(missing_blocks - 1),
2471            ));
2472        }
2473
2474        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2475
2476        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2477        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2478
2479        // All receipts from the last 128 blocks are required for blockchain tree, even with
2480        // [`PruneSegment::ContractLogs`].
2481        //
2482        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2483        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2484        // files yet. Once receipts exist in static files, we must continue writing to maintain
2485        // continuity and have no gaps.
2486        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2487            self.static_file_provider()
2488                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2489                .is_none()) &&
2490            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2491
2492        // Prepare set of addresses which logs should not be pruned.
2493        let mut allowed_addresses: AddressSet = AddressSet::default();
2494        for (_, addresses) in contract_log_pruner.range(..first_block) {
2495            allowed_addresses.extend(addresses.iter().copied());
2496        }
2497
2498        for (idx, (receipts, first_tx_index)) in
2499            execution_outcome.receipts().zip(block_indices).enumerate()
2500        {
2501            let block_number = first_block + idx as u64;
2502
2503            // Increment block number for receipts static file writer
2504            receipts_writer.increment_block(block_number)?;
2505
2506            // Skip writing receipts if pruning configuration requires us to.
2507            if prunable_receipts &&
2508                self.prune_modes
2509                    .receipts
2510                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2511            {
2512                continue
2513            }
2514
2515            // If there are new addresses to retain after this block number, track them
2516            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2517                allowed_addresses.extend(new_addresses.iter().copied());
2518            }
2519
2520            for (idx, receipt) in receipts.iter().enumerate() {
2521                let receipt_idx = first_tx_index + idx as u64;
2522                // Skip writing receipt if log filter is active and it does not have any logs to
2523                // retain
2524                if prunable_receipts &&
2525                    has_contract_log_filter &&
2526                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2527                {
2528                    continue
2529                }
2530
2531                receipts_writer.append_receipt(receipt_idx, receipt)?;
2532            }
2533        }
2534
2535        Ok(())
2536    }
2537
2538    fn write_state_reverts(
2539        &self,
2540        reverts: PlainStateReverts,
2541        first_block: BlockNumber,
2542        config: StateWriteConfig,
2543    ) -> ProviderResult<()> {
2544        // Write storage changes
2545        if config.write_storage_changesets {
2546            tracing::trace!("Writing storage changes");
2547            let mut storages_cursor =
2548                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2549            for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2550                let block_number = first_block + block_index as BlockNumber;
2551
2552                tracing::trace!(block_number, "Writing block change");
2553                // sort changes by address.
2554                storage_changes.par_sort_unstable_by_key(|a| a.address);
2555                let total_changes =
2556                    storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2557                let mut changeset = Vec::with_capacity(total_changes);
2558                for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2559                    let mut storage = storage_revert
2560                        .into_iter()
2561                        .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2562                        .collect::<Vec<_>>();
2563                    // sort storage slots by key.
2564                    storage.par_sort_unstable_by_key(|a| a.0);
2565
2566                    // If we are writing the primary storage wipe transition, the pre-existing
2567                    // storage state has to be taken from the database and written to storage
2568                    // history. See [StorageWipe::Primary] for more details.
2569                    //
2570                    // TODO(mediocregopher): This could be rewritten in a way which doesn't
2571                    // require collecting wiped entries into a Vec like this, see
2572                    // `write_storage_trie_changesets`.
2573                    let mut wiped_storage = Vec::new();
2574                    if wiped {
2575                        tracing::trace!(?address, "Wiping storage");
2576                        if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2577                            wiped_storage.push((entry.key, entry.value));
2578                            while let Some(entry) = storages_cursor.next_dup_val()? {
2579                                wiped_storage.push((entry.key, entry.value))
2580                            }
2581                        }
2582                    }
2583
2584                    tracing::trace!(?address, ?storage, "Writing storage reverts");
2585                    for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2586                        changeset.push(StorageBeforeTx { address, key, value });
2587                    }
2588                }
2589
2590                let mut storage_changesets_writer =
2591                    EitherWriter::new_storage_changesets(self, block_number)?;
2592                storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2593            }
2594        }
2595
2596        if !config.write_account_changesets {
2597            return Ok(());
2598        }
2599
2600        // Write account changes
2601        tracing::trace!(?first_block, "Writing account changes");
2602        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2603            let block_number = first_block + block_index as BlockNumber;
2604            let changeset = account_block_reverts
2605                .into_iter()
2606                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2607                .collect::<Vec<_>>();
2608            let mut account_changesets_writer =
2609                EitherWriter::new_account_changesets(self, block_number)?;
2610
2611            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2612        }
2613
2614        Ok(())
2615    }
2616
2617    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2618        // sort all entries so they can be written to database in more performant way.
2619        // and take smaller memory footprint.
2620        changes.accounts.par_sort_by_key(|a| a.0);
2621        changes.storage.par_sort_by_key(|a| a.address);
2622        changes.contracts.par_sort_by_key(|a| a.0);
2623
2624        if !self.cached_storage_settings().use_hashed_state() {
2625            // Write new account state
2626            tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2627            let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2628            // write account to database.
2629            for (address, account) in changes.accounts {
2630                if let Some(account) = account {
2631                    tracing::trace!(?address, "Updating plain state account");
2632                    accounts_cursor.upsert(address, &account.into())?;
2633                } else if accounts_cursor.seek_exact(address)?.is_some() {
2634                    tracing::trace!(?address, "Deleting plain state account");
2635                    accounts_cursor.delete_current()?;
2636                }
2637            }
2638
2639            // Write new storage state and wipe storage if needed.
2640            tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2641            let mut storages_cursor =
2642                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2643            for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2644                // Wiping of storage.
2645                if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2646                    storages_cursor.delete_current_duplicates()?;
2647                }
2648                // cast storages to B256.
2649                let mut storage = storage
2650                    .into_iter()
2651                    .map(|(k, value)| StorageEntry { key: k.into(), value })
2652                    .collect::<Vec<_>>();
2653                // sort storage slots by key.
2654                storage.par_sort_unstable_by_key(|a| a.key);
2655
2656                for entry in storage {
2657                    tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2658                    if let Some(db_entry) =
2659                        storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2660                        db_entry.key == entry.key
2661                    {
2662                        storages_cursor.delete_current()?;
2663                    }
2664
2665                    if !entry.value.is_zero() {
2666                        storages_cursor.upsert(address, &entry)?;
2667                    }
2668                }
2669            }
2670        }
2671
2672        // Write bytecode
2673        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2674        self.write_bytecodes(
2675            changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2676        )?;
2677
2678        Ok(())
2679    }
2680
2681    #[instrument(level = "debug", target = "providers::db", skip_all)]
2682    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2683        // Write hashed account updates.
2684        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2685        for (hashed_address, account) in hashed_state.accounts() {
2686            if let Some(account) = account {
2687                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2688            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2689                hashed_accounts_cursor.delete_current()?;
2690            }
2691        }
2692
2693        // Write hashed storage changes.
2694        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2695        let mut hashed_storage_cursor =
2696            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2697        for (hashed_address, storage) in sorted_storages {
2698            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2699                hashed_storage_cursor.delete_current_duplicates()?;
2700            }
2701
2702            for (hashed_slot, value) in storage.storage_slots_ref() {
2703                let entry = StorageEntry { key: *hashed_slot, value: *value };
2704
2705                if let Some(db_entry) =
2706                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2707                    db_entry.key == entry.key
2708                {
2709                    hashed_storage_cursor.delete_current()?;
2710                }
2711
2712                if !entry.value.is_zero() {
2713                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2714                }
2715            }
2716        }
2717
2718        Ok(())
2719    }
2720
2721    /// Remove the last N blocks of state.
2722    ///
2723    /// The latest state will be unwound
2724    ///
2725    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2726    ///    transaction ids.
2727    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2728    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2729    ///    the changesets.
2730    ///    - In order to have both the old and new values in the changesets, we also access the
2731    ///      plain state tables.
2732    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2733    ///    we:
2734    ///     1. Take the old value from the changeset
2735    ///     2. Take the new value from the plain state
2736    ///     3. Save the old value to the local state
2737    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2738    ///    have seen before we:
2739    ///     1. Take the old value from the changeset
2740    ///     2. Take the new value from the local state
2741    ///     3. Set the local state to the value in the changeset
2742    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2743        let range = block + 1..=self.last_block_number()?;
2744
2745        if range.is_empty() {
2746            return Ok(());
2747        }
2748
2749        // We are not removing block meta as it is used to get block changesets.
2750        let block_bodies = self.block_body_indices_range(range.clone())?;
2751
2752        // get transaction receipts
2753        let from_transaction_num =
2754            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2755
2756        let storage_range = BlockNumberAddress::range(range.clone());
2757        let storage_changeset = if self.cached_storage_settings().storage_v2 {
2758            let changesets = self.storage_changesets_range(range.clone())?;
2759            let mut changeset_writer =
2760                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2761            changeset_writer.prune_storage_changesets(block)?;
2762            changesets
2763        } else {
2764            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2765        };
2766        let account_changeset = if self.cached_storage_settings().storage_v2 {
2767            let changesets = self.account_changesets_range(range)?;
2768            let mut changeset_writer =
2769                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2770            changeset_writer.prune_account_changesets(block)?;
2771            changesets
2772        } else {
2773            self.take::<tables::AccountChangeSets>(range)?
2774        };
2775
2776        if self.cached_storage_settings().use_hashed_state() {
2777            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2778            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2779
2780            let (state, _) = self.populate_bundle_state_hashed(
2781                account_changeset,
2782                storage_changeset,
2783                &mut hashed_accounts_cursor,
2784                &mut hashed_storage_cursor,
2785            )?;
2786
2787            for (address, (old_account, new_account, storage)) in &state {
2788                if old_account != new_account {
2789                    let hashed_address = keccak256(address);
2790                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2791                    if let Some(account) = old_account {
2792                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2793                    } else if existing_entry.is_some() {
2794                        hashed_accounts_cursor.delete_current()?;
2795                    }
2796                }
2797
2798                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2799                    let hashed_address = keccak256(address);
2800                    let hashed_storage_key = keccak256(storage_key);
2801                    let storage_entry =
2802                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2803                    if hashed_storage_cursor
2804                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2805                        .is_some_and(|s| s.key == hashed_storage_key)
2806                    {
2807                        hashed_storage_cursor.delete_current()?
2808                    }
2809
2810                    if !old_storage_value.is_zero() {
2811                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2812                    }
2813                }
2814            }
2815        } else {
2816            // This is not working for blocks that are not at tip. as plain state is not the last
2817            // state of end range. We should rename the functions or add support to access
2818            // History state. Accessing history state can be tricky but we are not gaining
2819            // anything.
2820            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2821            let mut plain_storage_cursor =
2822                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2823
2824            let (state, _) = self.populate_bundle_state_plain(
2825                account_changeset,
2826                storage_changeset,
2827                &mut plain_accounts_cursor,
2828                &mut plain_storage_cursor,
2829            )?;
2830
2831            for (address, (old_account, new_account, storage)) in &state {
2832                if old_account != new_account {
2833                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2834                    if let Some(account) = old_account {
2835                        plain_accounts_cursor.upsert(*address, account)?;
2836                    } else if existing_entry.is_some() {
2837                        plain_accounts_cursor.delete_current()?;
2838                    }
2839                }
2840
2841                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2842                    let storage_entry =
2843                        StorageEntry { key: *storage_key, value: *old_storage_value };
2844                    if plain_storage_cursor
2845                        .seek_by_key_subkey(*address, *storage_key)?
2846                        .is_some_and(|s| s.key == *storage_key)
2847                    {
2848                        plain_storage_cursor.delete_current()?
2849                    }
2850
2851                    if !old_storage_value.is_zero() {
2852                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2853                    }
2854                }
2855            }
2856        }
2857
2858        self.remove_receipts_from(from_transaction_num, block)?;
2859
2860        Ok(())
2861    }
2862
2863    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2864    ///
2865    /// The latest state will be unwound and returned back with all the blocks
2866    ///
2867    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2868    ///    transaction ids.
2869    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2870    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2871    ///    the changesets.
2872    ///    - In order to have both the old and new values in the changesets, we also access the
2873    ///      plain state tables.
2874    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2875    ///    we:
2876    ///     1. Take the old value from the changeset
2877    ///     2. Take the new value from the plain state
2878    ///     3. Save the old value to the local state
2879    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2880    ///    have seen before we:
2881    ///     1. Take the old value from the changeset
2882    ///     2. Take the new value from the local state
2883    ///     3. Set the local state to the value in the changeset
2884    fn take_state_above(
2885        &self,
2886        block: BlockNumber,
2887    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2888        let range = block + 1..=self.last_block_number()?;
2889
2890        if range.is_empty() {
2891            return Ok(ExecutionOutcome::default())
2892        }
2893        let start_block_number = *range.start();
2894
2895        // We are not removing block meta as it is used to get block changesets.
2896        let block_bodies = self.block_body_indices_range(range.clone())?;
2897
2898        // get transaction receipts
2899        let from_transaction_num =
2900            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2901        let to_transaction_num =
2902            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2903
2904        let storage_range = BlockNumberAddress::range(range.clone());
2905        let storage_changeset = if let Some(highest_block) = self
2906            .static_file_provider
2907            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2908            self.cached_storage_settings().storage_v2
2909        {
2910            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2911            let mut changeset_writer =
2912                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2913            changeset_writer.prune_storage_changesets(block)?;
2914            changesets
2915        } else {
2916            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2917        };
2918
2919        // if there are static files for this segment, prune them.
2920        let highest_changeset_block = self
2921            .static_file_provider
2922            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2923        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2924            self.cached_storage_settings().storage_v2
2925        {
2926            // TODO: add a `take` method that removes and returns the items instead of doing this
2927            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2928            let mut changeset_writer =
2929                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2930            changeset_writer.prune_account_changesets(block)?;
2931
2932            changesets
2933        } else {
2934            // Have to remove from static files if they exist, otherwise remove using `take` for the
2935            // changeset tables
2936            self.take::<tables::AccountChangeSets>(range)?
2937        };
2938
2939        let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2940            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2941            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2942
2943            let (state, reverts) = self.populate_bundle_state_hashed(
2944                account_changeset,
2945                storage_changeset,
2946                &mut hashed_accounts_cursor,
2947                &mut hashed_storage_cursor,
2948            )?;
2949
2950            for (address, (old_account, new_account, storage)) in &state {
2951                if old_account != new_account {
2952                    let hashed_address = keccak256(address);
2953                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2954                    if let Some(account) = old_account {
2955                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2956                    } else if existing_entry.is_some() {
2957                        hashed_accounts_cursor.delete_current()?;
2958                    }
2959                }
2960
2961                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2962                    let hashed_address = keccak256(address);
2963                    let hashed_storage_key = keccak256(storage_key);
2964                    let storage_entry =
2965                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2966                    if hashed_storage_cursor
2967                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2968                        .is_some_and(|s| s.key == hashed_storage_key)
2969                    {
2970                        hashed_storage_cursor.delete_current()?
2971                    }
2972
2973                    if !old_storage_value.is_zero() {
2974                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2975                    }
2976                }
2977            }
2978
2979            (state, reverts)
2980        } else {
2981            // This is not working for blocks that are not at tip. as plain state is not the last
2982            // state of end range. We should rename the functions or add support to access
2983            // History state. Accessing history state can be tricky but we are not gaining
2984            // anything.
2985            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2986            let mut plain_storage_cursor =
2987                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2988
2989            let (state, reverts) = self.populate_bundle_state_plain(
2990                account_changeset,
2991                storage_changeset,
2992                &mut plain_accounts_cursor,
2993                &mut plain_storage_cursor,
2994            )?;
2995
2996            for (address, (old_account, new_account, storage)) in &state {
2997                if old_account != new_account {
2998                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2999                    if let Some(account) = old_account {
3000                        plain_accounts_cursor.upsert(*address, account)?;
3001                    } else if existing_entry.is_some() {
3002                        plain_accounts_cursor.delete_current()?;
3003                    }
3004                }
3005
3006                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
3007                    let storage_entry =
3008                        StorageEntry { key: *storage_key, value: *old_storage_value };
3009                    if plain_storage_cursor
3010                        .seek_by_key_subkey(*address, *storage_key)?
3011                        .is_some_and(|s| s.key == *storage_key)
3012                    {
3013                        plain_storage_cursor.delete_current()?
3014                    }
3015
3016                    if !old_storage_value.is_zero() {
3017                        plain_storage_cursor.upsert(*address, &storage_entry)?;
3018                    }
3019                }
3020            }
3021
3022            (state, reverts)
3023        };
3024
3025        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
3026        let mut receipts_iter = self
3027            .static_file_provider
3028            .get_range_with_static_file_or_database(
3029                StaticFileSegment::Receipts,
3030                from_transaction_num..to_transaction_num + 1,
3031                |static_file, range, _| {
3032                    static_file
3033                        .receipts_by_tx_range(range.clone())
3034                        .map(|r| range.into_iter().zip(r).collect())
3035                },
3036                |range, _| {
3037                    self.tx
3038                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
3039                        .walk_range(range)?
3040                        .map(|r| r.map_err(Into::into))
3041                        .collect()
3042                },
3043                |_| true,
3044            )?
3045            .into_iter()
3046            .peekable();
3047
3048        let mut receipts = Vec::with_capacity(block_bodies.len());
3049        // loop break if we are at the end of the blocks.
3050        for block_body in block_bodies {
3051            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3052            for num in block_body.tx_num_range() {
3053                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3054                    block_receipts.push(receipts_iter.next().unwrap().1);
3055                }
3056            }
3057            receipts.push(block_receipts);
3058        }
3059
3060        self.remove_receipts_from(from_transaction_num, block)?;
3061
3062        Ok(ExecutionOutcome::new_init(
3063            state,
3064            reverts,
3065            Vec::new(),
3066            receipts,
3067            start_block_number,
3068            Vec::new(),
3069        ))
3070    }
3071}
3072
3073impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3074    fn write_account_trie_updates<A: TrieTableAdapter>(
3075        tx: &TX,
3076        trie_updates: &TrieUpdatesSorted,
3077        num_entries: &mut usize,
3078    ) -> ProviderResult<()>
3079    where
3080        TX: DbTxMut,
3081    {
3082        let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3083        // Process sorted account nodes
3084        for (key, updated_node) in trie_updates.account_nodes_ref() {
3085            let nibbles = A::AccountKey::from(*key);
3086            match updated_node {
3087                Some(node) => {
3088                    if !key.is_empty() {
3089                        *num_entries += 1;
3090                        account_trie_cursor.upsert(nibbles, node)?;
3091                    }
3092                }
3093                None => {
3094                    *num_entries += 1;
3095                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3096                        account_trie_cursor.delete_current()?;
3097                    }
3098                }
3099            }
3100        }
3101        Ok(())
3102    }
3103
3104    fn write_storage_tries<A: TrieTableAdapter>(
3105        tx: &TX,
3106        storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3107        num_entries: &mut usize,
3108    ) -> ProviderResult<()>
3109    where
3110        TX: DbTxMut,
3111    {
3112        let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3113        for (hashed_address, storage_trie_updates) in storage_tries {
3114            let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3115                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3116            *num_entries +=
3117                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3118            cursor = db_storage_trie_cursor.cursor;
3119        }
3120        Ok(())
3121    }
3122}
3123
3124impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3125    /// Writes trie updates to the database with already sorted updates.
3126    ///
3127    /// Returns the number of entries modified.
3128    #[instrument(level = "debug", target = "providers::db", skip_all)]
3129    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3130        if trie_updates.is_empty() {
3131            return Ok(0)
3132        }
3133
3134        // Track the number of inserted entries.
3135        let mut num_entries = 0;
3136
3137        reth_trie_db::with_adapter!(self, |A| {
3138            Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3139        });
3140
3141        num_entries +=
3142            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3143
3144        Ok(num_entries)
3145    }
3146}
3147
3148impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3149    /// Writes storage trie updates from the given storage trie map with already sorted updates.
3150    ///
3151    /// Expects the storage trie updates to already be sorted by the hashed address key.
3152    ///
3153    /// Returns the number of entries modified.
3154    fn write_storage_trie_updates_sorted<'a>(
3155        &self,
3156        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3157    ) -> ProviderResult<usize> {
3158        let mut num_entries = 0;
3159        let mut storage_tries = storage_tries.collect::<Vec<_>>();
3160        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3161        reth_trie_db::with_adapter!(self, |A| {
3162            Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3163        });
3164        Ok(num_entries)
3165    }
3166}
3167
3168impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3169    fn unwind_account_hashing<'a>(
3170        &self,
3171        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3172    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3173        // Aggregate all block changesets and make a list of accounts that have been changed.
3174        // Note that collecting and then reversing the order is necessary to ensure that the
3175        // changes are applied in the correct order.
3176        let hashed_accounts = changesets
3177            .into_iter()
3178            .map(|(_, e)| (keccak256(e.address), e.info))
3179            .collect::<Vec<_>>()
3180            .into_iter()
3181            .rev()
3182            .collect::<BTreeMap<_, _>>();
3183
3184        // Apply values to HashedState, and remove the account if it's None.
3185        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3186        for (hashed_address, account) in &hashed_accounts {
3187            if let Some(account) = account {
3188                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3189            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3190                hashed_accounts_cursor.delete_current()?;
3191            }
3192        }
3193
3194        Ok(hashed_accounts)
3195    }
3196
3197    fn unwind_account_hashing_range(
3198        &self,
3199        range: impl RangeBounds<BlockNumber>,
3200    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3201        let changesets = self.account_changesets_range(range)?;
3202        self.unwind_account_hashing(changesets.iter())
3203    }
3204
3205    fn insert_account_for_hashing(
3206        &self,
3207        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3208    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3209        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3210        let hashed_accounts =
3211            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3212        for (hashed_address, account) in &hashed_accounts {
3213            if let Some(account) = account {
3214                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3215            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3216                hashed_accounts_cursor.delete_current()?;
3217            }
3218        }
3219        Ok(hashed_accounts)
3220    }
3221
3222    fn unwind_storage_hashing(
3223        &self,
3224        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3225    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3226        // Aggregate all block changesets and make list of accounts that have been changed.
3227        let mut hashed_storages = changesets
3228            .into_iter()
3229            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3230                let hashed_key = keccak256(storage_entry.key);
3231                (keccak256(address), hashed_key, storage_entry.value)
3232            })
3233            .collect::<Vec<_>>();
3234        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3235
3236        // Apply values to HashedState, and remove the account if it's None.
3237        let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3238            B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3239        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3240        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3241            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3242
3243            if hashed_storage
3244                .seek_by_key_subkey(hashed_address, key)?
3245                .is_some_and(|entry| entry.key == key)
3246            {
3247                hashed_storage.delete_current()?;
3248            }
3249
3250            if !value.is_zero() {
3251                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3252            }
3253        }
3254        Ok(hashed_storage_keys)
3255    }
3256
3257    fn unwind_storage_hashing_range(
3258        &self,
3259        range: impl RangeBounds<BlockNumber>,
3260    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3261        let changesets = self.storage_changesets_range(range)?;
3262        self.unwind_storage_hashing(changesets.into_iter())
3263    }
3264
3265    fn insert_storage_for_hashing(
3266        &self,
3267        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3268    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3269        // hash values
3270        let hashed_storages =
3271            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3272                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3273                    map.insert(keccak256(entry.key), entry.value);
3274                    map
3275                });
3276                map.insert(keccak256(address), storage);
3277                map
3278            });
3279
3280        let hashed_storage_keys = hashed_storages
3281            .iter()
3282            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3283            .collect();
3284
3285        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3286        // Hash the address and key and apply them to HashedStorage (if Storage is None
3287        // just remove it);
3288        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3289            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3290                if hashed_storage_cursor
3291                    .seek_by_key_subkey(hashed_address, key)?
3292                    .is_some_and(|entry| entry.key == key)
3293                {
3294                    hashed_storage_cursor.delete_current()?;
3295                }
3296
3297                if !value.is_zero() {
3298                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3299                }
3300                Ok(())
3301            })
3302        })?;
3303
3304        Ok(hashed_storage_keys)
3305    }
3306}
3307
3308impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3309    fn unwind_account_history_indices<'a>(
3310        &self,
3311        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3312    ) -> ProviderResult<usize> {
3313        let mut last_indices = changesets
3314            .into_iter()
3315            .map(|(index, account)| (account.address, *index))
3316            .collect::<Vec<_>>();
3317        last_indices.sort_unstable_by_key(|(a, _)| *a);
3318
3319        if self.cached_storage_settings().storage_v2 {
3320            let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3321            self.pending_rocksdb_batches.lock().push(batch);
3322        } else {
3323            // Unwind the account history index in MDBX.
3324            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3325            for &(address, rem_index) in &last_indices {
3326                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3327                    &mut cursor,
3328                    ShardedKey::last(address),
3329                    rem_index,
3330                    |sharded_key| sharded_key.key == address,
3331                )?;
3332
3333                // Check the last returned partial shard.
3334                // If it's not empty, the shard needs to be reinserted.
3335                if !partial_shard.is_empty() {
3336                    cursor.insert(
3337                        ShardedKey::last(address),
3338                        &BlockNumberList::new_pre_sorted(partial_shard),
3339                    )?;
3340                }
3341            }
3342        }
3343
3344        let changesets = last_indices.len();
3345        Ok(changesets)
3346    }
3347
3348    fn unwind_account_history_indices_range(
3349        &self,
3350        range: impl RangeBounds<BlockNumber>,
3351    ) -> ProviderResult<usize> {
3352        let changesets = self.account_changesets_range(range)?;
3353        self.unwind_account_history_indices(changesets.iter())
3354    }
3355
3356    fn insert_account_history_index(
3357        &self,
3358        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3359    ) -> ProviderResult<()> {
3360        self.append_history_index::<_, tables::AccountsHistory>(
3361            account_transitions,
3362            ShardedKey::new,
3363        )
3364    }
3365
3366    fn unwind_storage_history_indices(
3367        &self,
3368        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3369    ) -> ProviderResult<usize> {
3370        let mut storage_changesets = changesets
3371            .into_iter()
3372            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3373            .collect::<Vec<_>>();
3374        storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3375
3376        if self.cached_storage_settings().storage_v2 {
3377            let batch =
3378                self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3379            self.pending_rocksdb_batches.lock().push(batch);
3380        } else {
3381            // Unwind the storage history index in MDBX.
3382            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3383            for &(address, storage_key, rem_index) in &storage_changesets {
3384                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3385                    &mut cursor,
3386                    StorageShardedKey::last(address, storage_key),
3387                    rem_index,
3388                    |storage_sharded_key| {
3389                        storage_sharded_key.address == address &&
3390                            storage_sharded_key.sharded_key.key == storage_key
3391                    },
3392                )?;
3393
3394                // Check the last returned partial shard.
3395                // If it's not empty, the shard needs to be reinserted.
3396                if !partial_shard.is_empty() {
3397                    cursor.insert(
3398                        StorageShardedKey::last(address, storage_key),
3399                        &BlockNumberList::new_pre_sorted(partial_shard),
3400                    )?;
3401                }
3402            }
3403        }
3404
3405        let changesets = storage_changesets.len();
3406        Ok(changesets)
3407    }
3408
3409    fn unwind_storage_history_indices_range(
3410        &self,
3411        range: impl RangeBounds<BlockNumber>,
3412    ) -> ProviderResult<usize> {
3413        let changesets = self.storage_changesets_range(range)?;
3414        self.unwind_storage_history_indices(changesets.into_iter())
3415    }
3416
3417    fn insert_storage_history_index(
3418        &self,
3419        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3420    ) -> ProviderResult<()> {
3421        self.append_history_index::<_, tables::StoragesHistory>(
3422            storage_transitions,
3423            |(address, storage_key), highest_block_number| {
3424                StorageShardedKey::new(address, storage_key, highest_block_number)
3425            },
3426        )
3427    }
3428
3429    #[instrument(level = "debug", target = "providers::db", skip_all)]
3430    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3431        let storage_settings = self.cached_storage_settings();
3432        if !storage_settings.storage_v2 {
3433            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3434            self.insert_account_history_index(indices)?;
3435        }
3436
3437        if !storage_settings.storage_v2 {
3438            let indices = self.changed_storages_and_blocks_with_range(range)?;
3439            self.insert_storage_history_index(indices)?;
3440        }
3441
3442        Ok(())
3443    }
3444}
3445
3446impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3447    for DatabaseProvider<TX, N>
3448{
3449    fn take_block_and_execution_above(
3450        &self,
3451        block: BlockNumber,
3452    ) -> ProviderResult<Chain<Self::Primitives>> {
3453        let range = block + 1..=self.last_block_number()?;
3454
3455        self.unwind_trie_state_from(block + 1)?;
3456
3457        // get execution res
3458        let execution_state = self.take_state_above(block)?;
3459
3460        let blocks = self.recovered_block_range(range)?;
3461
3462        // remove block bodies it is needed for both get block range and get block execution results
3463        // that is why it is deleted afterwards.
3464        self.remove_blocks_above(block)?;
3465
3466        // Update pipeline progress
3467        self.update_pipeline_stages(block, true)?;
3468
3469        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3470    }
3471
3472    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3473        self.unwind_trie_state_from(block + 1)?;
3474
3475        // remove execution res
3476        self.remove_state_above(block)?;
3477
3478        // remove block bodies it is needed for both get block range and get block execution results
3479        // that is why it is deleted afterwards.
3480        self.remove_blocks_above(block)?;
3481
3482        // Update pipeline progress
3483        self.update_pipeline_stages(block, true)?;
3484
3485        Ok(())
3486    }
3487}
3488
3489impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3490    for DatabaseProvider<TX, N>
3491{
3492    type Block = BlockTy<N>;
3493    type Receipt = ReceiptTy<N>;
3494
3495    /// Inserts the block into the database, writing to both static files and MDBX.
3496    ///
3497    /// This is a convenience method primarily used in tests. For production use,
3498    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3499    fn insert_block(
3500        &self,
3501        block: &RecoveredBlock<Self::Block>,
3502    ) -> ProviderResult<StoredBlockBodyIndices> {
3503        let block_number = block.number();
3504
3505        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3506        let executed_block = ExecutedBlock::new(
3507            Arc::new(block.clone()),
3508            Arc::new(BlockExecutionOutput {
3509                result: BlockExecutionResult {
3510                    receipts: Default::default(),
3511                    requests: Default::default(),
3512                    gas_used: 0,
3513                    blob_gas_used: 0,
3514                },
3515                state: Default::default(),
3516            }),
3517            ComputedTrieData::default(),
3518        );
3519
3520        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3521        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3522
3523        // Return the body indices
3524        self.block_body_indices(block_number)?
3525            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3526    }
3527
3528    fn append_block_bodies(
3529        &self,
3530        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3531    ) -> ProviderResult<()> {
3532        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3533
3534        // Initialize writer if we will be writing transactions to staticfiles
3535        let mut tx_writer =
3536            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3537
3538        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3539        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3540
3541        // Get id for the next tx_num or zero if there are no transactions.
3542        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3543
3544        for (block_number, body) in &bodies {
3545            // Increment block on static file header.
3546            tx_writer.increment_block(*block_number)?;
3547
3548            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3549            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3550
3551            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3552
3553            // insert block meta
3554            block_indices_cursor.append(*block_number, &block_indices)?;
3555
3556            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3557
3558            let Some(body) = body else { continue };
3559
3560            // write transaction block index
3561            if !body.transactions().is_empty() {
3562                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3563                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3564            }
3565
3566            // write transactions
3567            for transaction in body.transactions() {
3568                tx_writer.append_transaction(next_tx_num, transaction)?;
3569
3570                // Increment transaction id for each transaction.
3571                next_tx_num += 1;
3572            }
3573        }
3574
3575        self.storage.writer().write_block_bodies(self, bodies)?;
3576
3577        Ok(())
3578    }
3579
3580    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3581        let last_block_number = self.last_block_number()?;
3582        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3583        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3584            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3585        }
3586
3587        // Get highest static file block for the total block range
3588        let highest_static_file_block = self
3589            .static_file_provider()
3590            .get_highest_static_file_block(StaticFileSegment::Headers)
3591            .expect("todo: error handling, headers should exist");
3592
3593        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3594        // we remove only what is ABOVE the block.
3595        //
3596        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3597        // will have three blocks to remove, which will be block 8, 7, and 6.
3598        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3599        self.static_file_provider()
3600            .get_writer(block, StaticFileSegment::Headers)?
3601            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3602
3603        // First transaction to be removed
3604        let unwind_tx_from = self
3605            .block_body_indices(block)?
3606            .map(|b| b.next_tx_num())
3607            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3608
3609        // Last transaction to be removed
3610        let unwind_tx_to = self
3611            .tx
3612            .cursor_read::<tables::BlockBodyIndices>()?
3613            .last()?
3614            // shouldn't happen because this was OK above
3615            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3616            .1
3617            .last_tx_num();
3618
3619        if unwind_tx_from <= unwind_tx_to {
3620            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3621            self.with_rocksdb_batch(|batch| {
3622                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3623                for (hash, _) in hashes {
3624                    writer.delete_transaction_hash_number(hash)?;
3625                }
3626                Ok(((), writer.into_raw_rocksdb_batch()))
3627            })?;
3628        }
3629
3630        // Skip sender pruning when sender_recovery is fully pruned, since no sender data
3631        // exists in static files or the database.
3632        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3633            EitherWriter::new_senders(self, last_block_number)?
3634                .prune_senders(unwind_tx_from, block)?;
3635        }
3636
3637        self.remove_bodies_above(block)?;
3638
3639        Ok(())
3640    }
3641
3642    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3643        self.storage.writer().remove_block_bodies_above(self, block)?;
3644
3645        // First transaction to be removed
3646        let unwind_tx_from = self
3647            .block_body_indices(block)?
3648            .map(|b| b.next_tx_num())
3649            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3650
3651        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3652        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3653
3654        let static_file_tx_num =
3655            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3656
3657        let to_delete = static_file_tx_num
3658            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3659            .unwrap_or_default();
3660
3661        self.static_file_provider
3662            .latest_writer(StaticFileSegment::Transactions)?
3663            .prune_transactions(to_delete, block)?;
3664
3665        Ok(())
3666    }
3667
3668    /// Appends blocks with their execution state to the database.
3669    ///
3670    /// **Note:** This function is only used in tests.
3671    ///
3672    /// History indices are written to the appropriate backend based on storage settings:
3673    /// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
3674    ///
3675    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3676    fn append_blocks_with_state(
3677        &self,
3678        blocks: Vec<RecoveredBlock<Self::Block>>,
3679        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3680        hashed_state: HashedPostStateSorted,
3681    ) -> ProviderResult<()> {
3682        if blocks.is_empty() {
3683            debug!(target: "providers::db", "Attempted to append empty block range");
3684            return Ok(())
3685        }
3686
3687        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3688        // `None`.
3689        let first_number = blocks[0].number();
3690
3691        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3692        // `None`.
3693        let last_block_number = blocks[blocks.len() - 1].number();
3694
3695        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3696
3697        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3698        // This is necessary because with edge storage, changesets are written to static files
3699        // whose index isn't updated until commit, making them invisible to subsequent reads
3700        // within the same transaction.
3701        let (account_transitions, storage_transitions) = {
3702            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3703            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3704            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3705                let block_number = first_number + block_idx as u64;
3706                for (address, account_revert) in block_reverts {
3707                    account_transitions.entry(*address).or_default().push(block_number);
3708                    for storage_key in account_revert.storage.keys() {
3709                        let key = B256::from(storage_key.to_be_bytes());
3710                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3711                    }
3712                }
3713            }
3714            (account_transitions, storage_transitions)
3715        };
3716
3717        // Insert the blocks
3718        for block in blocks {
3719            self.insert_block(&block)?;
3720            durations_recorder.record_relative(metrics::Action::InsertBlock);
3721        }
3722
3723        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3724        durations_recorder.record_relative(metrics::Action::InsertState);
3725
3726        // insert hashes and intermediate merkle nodes
3727        self.write_hashed_state(&hashed_state)?;
3728        durations_recorder.record_relative(metrics::Action::InsertHashes);
3729
3730        // Use pre-computed transitions for history indices since static file
3731        // writes aren't visible until commit.
3732        // Note: For MDBX we use insert_*_history_index. For RocksDB we use
3733        // append_*_history_shard which handles read-merge-write internally.
3734        let storage_settings = self.cached_storage_settings();
3735        if storage_settings.storage_v2 {
3736            self.with_rocksdb_batch(|mut batch| {
3737                for (address, blocks) in account_transitions {
3738                    batch.append_account_history_shard(address, blocks)?;
3739                }
3740                Ok(((), Some(batch.into_inner())))
3741            })?;
3742        } else {
3743            self.insert_account_history_index(account_transitions)?;
3744        }
3745        if storage_settings.storage_v2 {
3746            self.with_rocksdb_batch(|mut batch| {
3747                for ((address, key), blocks) in storage_transitions {
3748                    batch.append_storage_history_shard(address, key, blocks)?;
3749                }
3750                Ok(((), Some(batch.into_inner())))
3751            })?;
3752        } else {
3753            self.insert_storage_history_index(storage_transitions)?;
3754        }
3755        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3756
3757        // Update pipeline progress
3758        self.update_pipeline_stages(last_block_number, false)?;
3759        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3760
3761        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3762
3763        Ok(())
3764    }
3765}
3766
3767impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3768    fn get_prune_checkpoint(
3769        &self,
3770        segment: PruneSegment,
3771    ) -> ProviderResult<Option<PruneCheckpoint>> {
3772        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3773    }
3774
3775    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3776        Ok(PruneSegment::variants()
3777            .filter_map(|segment| {
3778                self.tx
3779                    .get::<tables::PruneCheckpoints>(segment)
3780                    .transpose()
3781                    .map(|chk| chk.map(|chk| (segment, chk)))
3782            })
3783            .collect::<Result<_, _>>()?)
3784    }
3785}
3786
3787impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3788    fn save_prune_checkpoint(
3789        &self,
3790        segment: PruneSegment,
3791        checkpoint: PruneCheckpoint,
3792    ) -> ProviderResult<()> {
3793        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3794    }
3795}
3796
3797impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3798    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3799        let db_entries = self.tx.entries::<T>()?;
3800        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3801            Ok(entries) => entries,
3802            Err(ProviderError::UnsupportedProvider) => 0,
3803            Err(err) => return Err(err),
3804        };
3805
3806        Ok(db_entries + static_file_entries)
3807    }
3808}
3809
3810impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3811    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3812        let mut finalized_blocks = self
3813            .tx
3814            .cursor_read::<tables::ChainState>()?
3815            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3816            .take(1)
3817            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3818
3819        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3820        Ok(last_finalized_block_number)
3821    }
3822
3823    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3824        let mut finalized_blocks = self
3825            .tx
3826            .cursor_read::<tables::ChainState>()?
3827            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3828            .take(1)
3829            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3830
3831        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3832        Ok(last_finalized_block_number)
3833    }
3834}
3835
3836impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3837    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3838        Ok(self
3839            .tx
3840            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3841    }
3842
3843    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3844        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3845    }
3846}
3847
3848impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3849    type Tx = TX;
3850
3851    fn tx_ref(&self) -> &Self::Tx {
3852        &self.tx
3853    }
3854
3855    fn tx_mut(&mut self) -> &mut Self::Tx {
3856        &mut self.tx
3857    }
3858
3859    fn into_tx(self) -> Self::Tx {
3860        self.tx
3861    }
3862
3863    fn prune_modes_ref(&self) -> &PruneModes {
3864        self.prune_modes_ref()
3865    }
3866
3867    /// Commit database transaction, static files, and pending `RocksDB` batches.
3868    #[instrument(
3869        name = "DatabaseProvider::commit",
3870        level = "debug",
3871        target = "providers::db",
3872        skip_all
3873    )]
3874    fn commit(self) -> ProviderResult<()> {
3875        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3876            self.commit_unwind()?;
3877        } else {
3878            // Normal path: finalize() will call sync_all() if not already synced
3879            let mut timings = metrics::CommitTimings::default();
3880
3881            let start = Instant::now();
3882            self.static_file_provider.finalize()?;
3883            timings.sf = start.elapsed();
3884
3885            let start = Instant::now();
3886            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3887            for batch in batches {
3888                self.rocksdb_provider.commit_batch(batch)?;
3889            }
3890            timings.rocksdb = start.elapsed();
3891
3892            let start = Instant::now();
3893            self.tx.commit()?;
3894            timings.mdbx = start.elapsed();
3895
3896            self.metrics.record_commit(&timings);
3897        }
3898
3899        Ok(())
3900    }
3901}
3902
3903impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3904    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3905        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3906    }
3907}
3908
3909impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3910    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3911        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3912    }
3913}
3914
3915impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3916    fn cached_storage_settings(&self) -> StorageSettings {
3917        *self.storage_settings.read()
3918    }
3919
3920    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3921        *self.storage_settings.write() = settings;
3922    }
3923}
3924
3925impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3926    fn storage_path(&self) -> PathBuf {
3927        self.db_path.clone()
3928    }
3929}
3930
3931#[cfg(test)]
3932mod tests {
3933    use super::*;
3934    use crate::{
3935        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3936        BlockWriter,
3937    };
3938    use alloy_consensus::Header;
3939    use alloy_primitives::{
3940        map::{AddressMap, B256Map},
3941        U256,
3942    };
3943    use reth_chain_state::ExecutedBlock;
3944    use reth_db_api::models::StorageSettings;
3945    use reth_ethereum_primitives::Receipt;
3946    use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3947    use reth_primitives_traits::SealedBlock;
3948    use reth_storage_api::MetadataWriter;
3949    use reth_testing_utils::generators::{self, random_block, BlockParams};
3950    use reth_trie::{
3951        HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3952    };
3953    use revm_database::BundleState;
3954    use revm_state::AccountInfo;
3955    use std::{sync::mpsc, time::Duration};
3956
3957    #[test]
3958    fn test_receipts_by_block_range_empty_range() {
3959        let factory = create_test_provider_factory();
3960        let provider = factory.provider().unwrap();
3961
3962        // empty range should return empty vec
3963        let start = 10u64;
3964        let end = 9u64;
3965        let result = provider.receipts_by_block_range(start..=end).unwrap();
3966        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3967    }
3968
3969    #[test]
3970    fn unwind_commit_waits_for_pre_commit_readers() {
3971        let factory = create_test_provider_factory();
3972
3973        let reader = factory.provider().unwrap();
3974        let provider_rw = factory.unwind_provider_rw().unwrap();
3975        provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3976        let (done_tx, done_rx) = mpsc::channel();
3977
3978        let handle = std::thread::spawn(move || {
3979            let result = provider_rw.commit();
3980            done_tx.send(result).unwrap();
3981        });
3982
3983        assert!(
3984            done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3985            "unwind commit should wait while an older read transaction is still open"
3986        );
3987
3988        drop(reader);
3989
3990        done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
3991        handle.join().unwrap();
3992    }
3993
3994    #[test]
3995    fn test_receipts_by_block_range_nonexistent_blocks() {
3996        let factory = create_test_provider_factory();
3997        let provider = factory.provider().unwrap();
3998
3999        // non-existent blocks should return empty vecs for each block
4000        let result = provider.receipts_by_block_range(10..=12).unwrap();
4001        assert_eq!(result, vec![vec![], vec![], vec![]]);
4002    }
4003
4004    #[test]
4005    fn test_receipts_by_block_range_single_block() {
4006        let factory = create_test_provider_factory();
4007        let data = BlockchainTestData::default();
4008
4009        let provider_rw = factory.provider_rw().unwrap();
4010        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4011        provider_rw
4012            .write_state(
4013                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4014                crate::OriginalValuesKnown::No,
4015                StateWriteConfig::default(),
4016            )
4017            .unwrap();
4018        provider_rw.insert_block(&data.blocks[0].0).unwrap();
4019        provider_rw
4020            .write_state(
4021                &data.blocks[0].1,
4022                crate::OriginalValuesKnown::No,
4023                StateWriteConfig::default(),
4024            )
4025            .unwrap();
4026        provider_rw.commit().unwrap();
4027
4028        let provider = factory.provider().unwrap();
4029        let result = provider.receipts_by_block_range(1..=1).unwrap();
4030
4031        // should have one vec with one receipt
4032        assert_eq!(result.len(), 1);
4033        assert_eq!(result[0].len(), 1);
4034        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4035    }
4036
4037    #[test]
4038    fn test_receipts_by_block_range_multiple_blocks() {
4039        let factory = create_test_provider_factory();
4040        let data = BlockchainTestData::default();
4041
4042        let provider_rw = factory.provider_rw().unwrap();
4043        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4044        provider_rw
4045            .write_state(
4046                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4047                crate::OriginalValuesKnown::No,
4048                StateWriteConfig::default(),
4049            )
4050            .unwrap();
4051        for i in 0..3 {
4052            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4053            provider_rw
4054                .write_state(
4055                    &data.blocks[i].1,
4056                    crate::OriginalValuesKnown::No,
4057                    StateWriteConfig::default(),
4058                )
4059                .unwrap();
4060        }
4061        provider_rw.commit().unwrap();
4062
4063        let provider = factory.provider().unwrap();
4064        let result = provider.receipts_by_block_range(1..=3).unwrap();
4065
4066        // should have 3 vecs, each with one receipt
4067        assert_eq!(result.len(), 3);
4068        for (i, block_receipts) in result.iter().enumerate() {
4069            assert_eq!(block_receipts.len(), 1);
4070            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4071        }
4072    }
4073
4074    #[test]
4075    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4076        let factory = create_test_provider_factory();
4077        let data = BlockchainTestData::default();
4078
4079        let provider_rw = factory.provider_rw().unwrap();
4080        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4081        provider_rw
4082            .write_state(
4083                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4084                crate::OriginalValuesKnown::No,
4085                StateWriteConfig::default(),
4086            )
4087            .unwrap();
4088
4089        // insert blocks 1-3 with receipts
4090        for i in 0..3 {
4091            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4092            provider_rw
4093                .write_state(
4094                    &data.blocks[i].1,
4095                    crate::OriginalValuesKnown::No,
4096                    StateWriteConfig::default(),
4097                )
4098                .unwrap();
4099        }
4100        provider_rw.commit().unwrap();
4101
4102        let provider = factory.provider().unwrap();
4103        let result = provider.receipts_by_block_range(1..=3).unwrap();
4104
4105        // verify each block has one receipt
4106        assert_eq!(result.len(), 3);
4107        for block_receipts in &result {
4108            assert_eq!(block_receipts.len(), 1);
4109        }
4110    }
4111
4112    #[test]
4113    fn test_receipts_by_block_range_partial_range() {
4114        let factory = create_test_provider_factory();
4115        let data = BlockchainTestData::default();
4116
4117        let provider_rw = factory.provider_rw().unwrap();
4118        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4119        provider_rw
4120            .write_state(
4121                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4122                crate::OriginalValuesKnown::No,
4123                StateWriteConfig::default(),
4124            )
4125            .unwrap();
4126        for i in 0..3 {
4127            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4128            provider_rw
4129                .write_state(
4130                    &data.blocks[i].1,
4131                    crate::OriginalValuesKnown::No,
4132                    StateWriteConfig::default(),
4133                )
4134                .unwrap();
4135        }
4136        provider_rw.commit().unwrap();
4137
4138        let provider = factory.provider().unwrap();
4139
4140        // request range that includes both existing and non-existing blocks
4141        let result = provider.receipts_by_block_range(2..=5).unwrap();
4142        assert_eq!(result.len(), 4);
4143
4144        // blocks 2-3 should have receipts, blocks 4-5 should be empty
4145        assert_eq!(result[0].len(), 1); // block 2
4146        assert_eq!(result[1].len(), 1); // block 3
4147        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
4148        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
4149
4150        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4151        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4152    }
4153
4154    #[test]
4155    fn test_receipts_by_block_range_all_empty_blocks() {
4156        let factory = create_test_provider_factory();
4157        let mut rng = generators::rng();
4158
4159        // create blocks with no transactions
4160        let mut blocks = Vec::new();
4161        for i in 0..3 {
4162            let block =
4163                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4164            blocks.push(block);
4165        }
4166
4167        let provider_rw = factory.provider_rw().unwrap();
4168        for block in blocks {
4169            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4170        }
4171        provider_rw.commit().unwrap();
4172
4173        let provider = factory.provider().unwrap();
4174        let result = provider.receipts_by_block_range(1..=3).unwrap();
4175
4176        assert_eq!(result.len(), 3);
4177        for block_receipts in result {
4178            assert_eq!(block_receipts.len(), 0);
4179        }
4180    }
4181
4182    #[test]
4183    fn test_receipts_by_block_range_consistency_with_individual_calls() {
4184        let factory = create_test_provider_factory();
4185        let data = BlockchainTestData::default();
4186
4187        let provider_rw = factory.provider_rw().unwrap();
4188        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4189        provider_rw
4190            .write_state(
4191                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4192                crate::OriginalValuesKnown::No,
4193                StateWriteConfig::default(),
4194            )
4195            .unwrap();
4196        for i in 0..3 {
4197            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4198            provider_rw
4199                .write_state(
4200                    &data.blocks[i].1,
4201                    crate::OriginalValuesKnown::No,
4202                    StateWriteConfig::default(),
4203                )
4204                .unwrap();
4205        }
4206        provider_rw.commit().unwrap();
4207
4208        let provider = factory.provider().unwrap();
4209
4210        // get receipts using block range method
4211        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4212
4213        // get receipts using individual block calls
4214        let mut individual_results = Vec::new();
4215        for block_num in 1..=3 {
4216            let receipts =
4217                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4218            individual_results.push(receipts);
4219        }
4220
4221        assert_eq!(range_result, individual_results);
4222    }
4223
4224    #[test]
4225    fn test_write_trie_updates_sorted() {
4226        use reth_trie::{
4227            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4228            BranchNodeCompact, StorageTrieEntry,
4229        };
4230
4231        let factory = create_test_provider_factory();
4232        let provider_rw = factory.provider_rw().unwrap();
4233
4234        // Pre-populate account trie with data that will be deleted
4235        {
4236            let tx = provider_rw.tx_ref();
4237            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4238
4239            // Add account node that will be deleted
4240            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4241            cursor
4242                .upsert(
4243                    to_delete,
4244                    &BranchNodeCompact::new(
4245                        0b1010_1010_1010_1010, // state_mask
4246                        0b0000_0000_0000_0000, // tree_mask
4247                        0b0000_0000_0000_0000, // hash_mask
4248                        vec![],
4249                        None,
4250                    ),
4251                )
4252                .unwrap();
4253
4254            // Add account node that will be updated
4255            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4256            cursor
4257                .upsert(
4258                    to_update,
4259                    &BranchNodeCompact::new(
4260                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4261                        0b0000_0000_0000_0000, // tree_mask
4262                        0b0000_0000_0000_0000, // hash_mask
4263                        vec![],
4264                        None,
4265                    ),
4266                )
4267                .unwrap();
4268        }
4269
4270        // Pre-populate storage tries with data
4271        let storage_address1 = B256::from([1u8; 32]);
4272        let storage_address2 = B256::from([2u8; 32]);
4273        {
4274            let tx = provider_rw.tx_ref();
4275            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4276
4277            // Add storage nodes for address1 (one will be deleted)
4278            storage_cursor
4279                .upsert(
4280                    storage_address1,
4281                    &StorageTrieEntry {
4282                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4283                        node: BranchNodeCompact::new(
4284                            0b0011_0011_0011_0011, // will be deleted
4285                            0b0000_0000_0000_0000,
4286                            0b0000_0000_0000_0000,
4287                            vec![],
4288                            None,
4289                        ),
4290                    },
4291                )
4292                .unwrap();
4293
4294            // Add storage nodes for address2 (will be wiped)
4295            storage_cursor
4296                .upsert(
4297                    storage_address2,
4298                    &StorageTrieEntry {
4299                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4300                        node: BranchNodeCompact::new(
4301                            0b1100_1100_1100_1100, // will be wiped
4302                            0b0000_0000_0000_0000,
4303                            0b0000_0000_0000_0000,
4304                            vec![],
4305                            None,
4306                        ),
4307                    },
4308                )
4309                .unwrap();
4310            storage_cursor
4311                .upsert(
4312                    storage_address2,
4313                    &StorageTrieEntry {
4314                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4315                        node: BranchNodeCompact::new(
4316                            0b0011_1100_0011_1100, // will be wiped
4317                            0b0000_0000_0000_0000,
4318                            0b0000_0000_0000_0000,
4319                            vec![],
4320                            None,
4321                        ),
4322                    },
4323                )
4324                .unwrap();
4325        }
4326
4327        // Create sorted account trie updates
4328        let account_nodes = vec![
4329            (
4330                Nibbles::from_nibbles([0x1, 0x2]),
4331                Some(BranchNodeCompact::new(
4332                    0b1111_1111_1111_1111, // state_mask (updated)
4333                    0b0000_0000_0000_0000, // tree_mask
4334                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4335                    vec![],
4336                    None,
4337                )),
4338            ),
4339            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4340            (
4341                Nibbles::from_nibbles([0x5, 0x6]),
4342                Some(BranchNodeCompact::new(
4343                    0b1111_1111_1111_1111, // state_mask
4344                    0b0000_0000_0000_0000, // tree_mask
4345                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4346                    vec![],
4347                    None,
4348                )),
4349            ),
4350        ];
4351
4352        // Create sorted storage trie updates
4353        let storage_trie1 = StorageTrieUpdatesSorted {
4354            is_deleted: false,
4355            storage_nodes: vec![
4356                (
4357                    Nibbles::from_nibbles([0x1, 0x0]),
4358                    Some(BranchNodeCompact::new(
4359                        0b1111_0000_0000_0000, // state_mask
4360                        0b0000_0000_0000_0000, // tree_mask
4361                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4362                        vec![],
4363                        None,
4364                    )),
4365                ),
4366                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4367            ],
4368        };
4369
4370        let storage_trie2 = StorageTrieUpdatesSorted {
4371            is_deleted: true, // Wipe all storage for this address
4372            storage_nodes: vec![],
4373        };
4374
4375        let mut storage_tries = B256Map::default();
4376        storage_tries.insert(storage_address1, storage_trie1);
4377        storage_tries.insert(storage_address2, storage_trie2);
4378
4379        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4380
4381        // Write the sorted trie updates
4382        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4383
4384        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4385        // storage deletion = 5
4386        assert_eq!(num_entries, 5);
4387
4388        // Verify account trie updates were written correctly
4389        let tx = provider_rw.tx_ref();
4390        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4391
4392        // Check first account node was updated
4393        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4394        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4395        assert!(entry1.is_some(), "Updated account node should exist");
4396        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4397        assert_eq!(
4398            entry1.unwrap().1.state_mask,
4399            expected_mask,
4400            "Account node should have updated state_mask"
4401        );
4402
4403        // Check deleted account node no longer exists
4404        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4405        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4406        assert!(entry2.is_none(), "Deleted account node should not exist");
4407
4408        // Check new account node exists
4409        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4410        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4411        assert!(entry3.is_some(), "New account node should exist");
4412
4413        // Verify storage trie updates were written correctly
4414        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4415
4416        // Check storage for address1
4417        let storage_entries1: Vec<_> = storage_cursor
4418            .walk_dup(Some(storage_address1), None)
4419            .unwrap()
4420            .collect::<Result<Vec<_>, _>>()
4421            .unwrap();
4422        assert_eq!(
4423            storage_entries1.len(),
4424            1,
4425            "Storage address1 should have 1 entry after deletion"
4426        );
4427        assert_eq!(
4428            storage_entries1[0].1.nibbles.0,
4429            Nibbles::from_nibbles([0x1, 0x0]),
4430            "Remaining entry should be [0x1, 0x0]"
4431        );
4432
4433        // Check storage for address2 was wiped
4434        let storage_entries2: Vec<_> = storage_cursor
4435            .walk_dup(Some(storage_address2), None)
4436            .unwrap()
4437            .collect::<Result<Vec<_>, _>>()
4438            .unwrap();
4439        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4440
4441        provider_rw.commit().unwrap();
4442    }
4443
4444    #[test]
4445    fn test_prunable_receipts_logic() {
4446        let insert_blocks =
4447            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4448                let mut rng = generators::rng();
4449                for block_num in 0..=tip_block {
4450                    let block = random_block(
4451                        &mut rng,
4452                        block_num,
4453                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4454                    );
4455                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4456                }
4457            };
4458
4459        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4460            let outcome = ExecutionOutcome {
4461                first_block: block,
4462                receipts: vec![vec![Receipt {
4463                    tx_type: Default::default(),
4464                    success: true,
4465                    cumulative_gas_used: block, // identifier to assert against
4466                    logs: vec![],
4467                }]],
4468                ..Default::default()
4469            };
4470            provider_rw
4471                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4472                .unwrap();
4473            provider_rw.commit().unwrap();
4474        };
4475
4476        // Legacy mode (receipts in DB) - should be prunable
4477        {
4478            let factory = create_test_provider_factory();
4479            let storage_settings = StorageSettings::v1();
4480            factory.set_storage_settings_cache(storage_settings);
4481            let factory = factory.with_prune_modes(PruneModes {
4482                receipts: Some(PruneMode::Before(100)),
4483                ..Default::default()
4484            });
4485
4486            let tip_block = 200u64;
4487            let first_block = 1u64;
4488
4489            // create chain
4490            let provider_rw = factory.provider_rw().unwrap();
4491            insert_blocks(&provider_rw, tip_block, 1);
4492            provider_rw.commit().unwrap();
4493
4494            write_receipts(
4495                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4496                first_block,
4497            );
4498            write_receipts(
4499                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4500                tip_block - 1,
4501            );
4502
4503            let provider = factory.provider().unwrap();
4504
4505            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4506                assert!(provider
4507                    .receipts_by_block(block.into())
4508                    .unwrap()
4509                    .is_some_and(|r| r.len() == num_receipts));
4510            }
4511        }
4512
4513        // Static files mode
4514        {
4515            let factory = create_test_provider_factory();
4516            let storage_settings = StorageSettings::v2();
4517            factory.set_storage_settings_cache(storage_settings);
4518            let factory = factory.with_prune_modes(PruneModes {
4519                receipts: Some(PruneMode::Before(2)),
4520                ..Default::default()
4521            });
4522
4523            let tip_block = 200u64;
4524
4525            // create chain
4526            let provider_rw = factory.provider_rw().unwrap();
4527            insert_blocks(&provider_rw, tip_block, 1);
4528            provider_rw.commit().unwrap();
4529
4530            // Attempt to write receipts for block 0 and 1 (should be skipped)
4531            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4532            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4533
4534            assert!(factory
4535                .static_file_provider()
4536                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4537                .is_none(),);
4538            assert!(factory
4539                .static_file_provider()
4540                .get_highest_static_file_block(StaticFileSegment::Receipts)
4541                .is_some_and(|b| b == 1),);
4542
4543            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4544            // static files.
4545            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4546            assert!(factory
4547                .static_file_provider()
4548                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4549                .is_some_and(|num| num == 2),);
4550
4551            // After having a receipt already in static files, attempt to skip the next receipt by
4552            // changing the prune mode. It should NOT skip it and should still write the receipt,
4553            // since static files do not support gaps.
4554            let factory = factory.with_prune_modes(PruneModes {
4555                receipts: Some(PruneMode::Before(100)),
4556                ..Default::default()
4557            });
4558            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4559            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4560            write_receipts(provider_rw, 3);
4561
4562            // Ensure we can only fetch the 2 last receipts.
4563            //
4564            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4565            // to the block number it belongs to easily identify and assert.
4566            let provider = factory.provider().unwrap();
4567            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4568            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4569                assert!(provider
4570                    .receipts_by_block(num.into())
4571                    .unwrap()
4572                    .is_some_and(|r| r.len() == num_receipts));
4573
4574                let receipt = provider.receipt(num).unwrap();
4575                if num_receipts > 0 {
4576                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4577                } else {
4578                    assert!(receipt.is_none());
4579                }
4580            }
4581        }
4582    }
4583
4584    #[test]
4585    fn test_try_into_history_rejects_unexecuted_blocks() {
4586        use reth_storage_api::TryIntoHistoricalStateProvider;
4587
4588        let factory = create_test_provider_factory();
4589
4590        // Insert genesis block to have some data
4591        let data = BlockchainTestData::default();
4592        let provider_rw = factory.provider_rw().unwrap();
4593        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4594        provider_rw
4595            .write_state(
4596                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4597                crate::OriginalValuesKnown::No,
4598                StateWriteConfig::default(),
4599            )
4600            .unwrap();
4601        provider_rw.commit().unwrap();
4602
4603        // Get a fresh provider - Execution checkpoint is 0, no receipts written beyond genesis
4604        let provider = factory.provider().unwrap();
4605
4606        // Requesting historical state for block 0 (executed) should succeed
4607        let result = provider.try_into_history_at_block(0);
4608        assert!(result.is_ok(), "Block 0 should be available");
4609
4610        // Get another provider and request state for block 100 (not executed)
4611        let provider = factory.provider().unwrap();
4612        let result = provider.try_into_history_at_block(100);
4613
4614        // Should fail with BlockNotExecuted error
4615        match result {
4616            Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4617            Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4618            Ok(_) => panic!("Expected error, got Ok"),
4619        }
4620    }
4621
4622    #[test]
4623    fn test_unwind_storage_hashing_with_hashed_state() {
4624        let factory = create_test_provider_factory();
4625        let storage_settings = StorageSettings::v2();
4626        factory.set_storage_settings_cache(storage_settings);
4627
4628        let address = Address::random();
4629        let hashed_address = keccak256(address);
4630
4631        let plain_slot = B256::random();
4632        let hashed_slot = keccak256(plain_slot);
4633
4634        let current_value = U256::from(100);
4635        let old_value = U256::from(42);
4636
4637        let provider_rw = factory.provider_rw().unwrap();
4638        provider_rw
4639            .tx
4640            .cursor_dup_write::<tables::HashedStorages>()
4641            .unwrap()
4642            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4643            .unwrap();
4644
4645        let changesets = vec![(
4646            BlockNumberAddress((1, address)),
4647            StorageEntry { key: plain_slot, value: old_value },
4648        )];
4649
4650        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4651
4652        assert_eq!(result.len(), 1);
4653        assert!(result.contains_key(&hashed_address));
4654        assert!(result[&hashed_address].contains(&hashed_slot));
4655
4656        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4657        let entry = cursor
4658            .seek_by_key_subkey(hashed_address, hashed_slot)
4659            .unwrap()
4660            .expect("entry should exist");
4661        assert_eq!(entry.key, hashed_slot);
4662        assert_eq!(entry.value, old_value);
4663    }
4664
4665    #[test]
4666    fn test_write_and_remove_state_roundtrip_legacy() {
4667        let factory = create_test_provider_factory();
4668        let storage_settings = StorageSettings::v1();
4669        assert!(!storage_settings.use_hashed_state());
4670        factory.set_storage_settings_cache(storage_settings);
4671
4672        let address = Address::with_last_byte(1);
4673        let hashed_address = keccak256(address);
4674        let slot = U256::from(5);
4675        let slot_key = B256::from(slot);
4676        let hashed_slot = keccak256(slot_key);
4677
4678        let mut rng = generators::rng();
4679        let block0 =
4680            random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4681        let block1 =
4682            random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4683
4684        {
4685            let provider_rw = factory.provider_rw().unwrap();
4686            provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4687            provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4688            provider_rw
4689                .tx
4690                .cursor_write::<tables::PlainAccountState>()
4691                .unwrap()
4692                .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4693                .unwrap();
4694            provider_rw.commit().unwrap();
4695        }
4696
4697        let provider_rw = factory.provider_rw().unwrap();
4698
4699        let mut state_init: BundleStateInit = AddressMap::default();
4700        let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4701        storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4702        state_init.insert(
4703            address,
4704            (
4705                Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4706                Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4707                storage_map,
4708            ),
4709        );
4710
4711        let mut reverts_init: RevertsInit = HashMap::default();
4712        let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4713        block_reverts.insert(
4714            address,
4715            (
4716                Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4717                vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4718            ),
4719        );
4720        reverts_init.insert(1, block_reverts);
4721
4722        let execution_outcome =
4723            ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4724
4725        provider_rw
4726            .write_state(
4727                &execution_outcome,
4728                OriginalValuesKnown::Yes,
4729                StateWriteConfig {
4730                    write_receipts: false,
4731                    write_account_changesets: true,
4732                    write_storage_changesets: true,
4733                },
4734            )
4735            .unwrap();
4736
4737        let hashed_state =
4738            execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4739        provider_rw.write_hashed_state(&hashed_state).unwrap();
4740
4741        let account = provider_rw
4742            .tx
4743            .cursor_read::<tables::PlainAccountState>()
4744            .unwrap()
4745            .seek_exact(address)
4746            .unwrap()
4747            .unwrap()
4748            .1;
4749        assert_eq!(account.nonce, 1);
4750
4751        let storage_entry = provider_rw
4752            .tx
4753            .cursor_dup_read::<tables::PlainStorageState>()
4754            .unwrap()
4755            .seek_by_key_subkey(address, slot_key)
4756            .unwrap()
4757            .unwrap();
4758        assert_eq!(storage_entry.key, slot_key);
4759        assert_eq!(storage_entry.value, U256::from(10));
4760
4761        let hashed_entry = provider_rw
4762            .tx
4763            .cursor_dup_read::<tables::HashedStorages>()
4764            .unwrap()
4765            .seek_by_key_subkey(hashed_address, hashed_slot)
4766            .unwrap()
4767            .unwrap();
4768        assert_eq!(hashed_entry.key, hashed_slot);
4769        assert_eq!(hashed_entry.value, U256::from(10));
4770
4771        let account_cs_entries = provider_rw
4772            .tx
4773            .cursor_dup_read::<tables::AccountChangeSets>()
4774            .unwrap()
4775            .walk(Some(1))
4776            .unwrap()
4777            .collect::<Result<Vec<_>, _>>()
4778            .unwrap();
4779        assert!(!account_cs_entries.is_empty());
4780
4781        let storage_cs_entries = provider_rw
4782            .tx
4783            .cursor_read::<tables::StorageChangeSets>()
4784            .unwrap()
4785            .walk(Some(BlockNumberAddress((1, address))))
4786            .unwrap()
4787            .collect::<Result<Vec<_>, _>>()
4788            .unwrap();
4789        assert!(!storage_cs_entries.is_empty());
4790        assert_eq!(storage_cs_entries[0].1.key, slot_key);
4791
4792        provider_rw.remove_state_above(0).unwrap();
4793
4794        let restored_account = provider_rw
4795            .tx
4796            .cursor_read::<tables::PlainAccountState>()
4797            .unwrap()
4798            .seek_exact(address)
4799            .unwrap()
4800            .unwrap()
4801            .1;
4802        assert_eq!(restored_account.nonce, 0);
4803
4804        let storage_gone = provider_rw
4805            .tx
4806            .cursor_dup_read::<tables::PlainStorageState>()
4807            .unwrap()
4808            .seek_by_key_subkey(address, slot_key)
4809            .unwrap();
4810        assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4811
4812        let account_cs_after = provider_rw
4813            .tx
4814            .cursor_dup_read::<tables::AccountChangeSets>()
4815            .unwrap()
4816            .walk(Some(1))
4817            .unwrap()
4818            .collect::<Result<Vec<_>, _>>()
4819            .unwrap();
4820        assert!(account_cs_after.is_empty());
4821
4822        let storage_cs_after = provider_rw
4823            .tx
4824            .cursor_read::<tables::StorageChangeSets>()
4825            .unwrap()
4826            .walk(Some(BlockNumberAddress((1, address))))
4827            .unwrap()
4828            .collect::<Result<Vec<_>, _>>()
4829            .unwrap();
4830        assert!(storage_cs_after.is_empty());
4831    }
4832
4833    #[test]
4834    fn test_unwind_storage_hashing_legacy() {
4835        let factory = create_test_provider_factory();
4836        let storage_settings = StorageSettings::v1();
4837        assert!(!storage_settings.use_hashed_state());
4838        factory.set_storage_settings_cache(storage_settings);
4839
4840        let address = Address::random();
4841        let hashed_address = keccak256(address);
4842
4843        let plain_slot = B256::random();
4844        let hashed_slot = keccak256(plain_slot);
4845
4846        let current_value = U256::from(100);
4847        let old_value = U256::from(42);
4848
4849        let provider_rw = factory.provider_rw().unwrap();
4850        provider_rw
4851            .tx
4852            .cursor_dup_write::<tables::HashedStorages>()
4853            .unwrap()
4854            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4855            .unwrap();
4856
4857        let changesets = vec![(
4858            BlockNumberAddress((1, address)),
4859            StorageEntry { key: plain_slot, value: old_value },
4860        )];
4861
4862        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4863
4864        assert_eq!(result.len(), 1);
4865        assert!(result.contains_key(&hashed_address));
4866        assert!(result[&hashed_address].contains(&hashed_slot));
4867
4868        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4869        let entry = cursor
4870            .seek_by_key_subkey(hashed_address, hashed_slot)
4871            .unwrap()
4872            .expect("entry should exist");
4873        assert_eq!(entry.key, hashed_slot);
4874        assert_eq!(entry.value, old_value);
4875    }
4876
4877    #[test]
4878    fn test_write_state_and_historical_read_hashed() {
4879        use reth_storage_api::StateProvider;
4880        use reth_trie::{HashedPostState, KeccakKeyHasher};
4881        use revm_database::BundleState;
4882        use revm_state::AccountInfo;
4883
4884        let factory = create_test_provider_factory();
4885        factory.set_storage_settings_cache(StorageSettings::v2());
4886
4887        let address = Address::with_last_byte(1);
4888        let slot = U256::from(5);
4889        let slot_key = B256::from(slot);
4890        let hashed_address = keccak256(address);
4891        let hashed_slot = keccak256(slot_key);
4892
4893        {
4894            let sf = factory.static_file_provider();
4895            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4896            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4897            hw.append_header(&h0, &B256::ZERO).unwrap();
4898            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4899            hw.append_header(&h1, &B256::ZERO).unwrap();
4900            hw.commit().unwrap();
4901
4902            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4903            aw.append_account_changeset(vec![], 0).unwrap();
4904            aw.commit().unwrap();
4905
4906            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4907            sw.append_storage_changeset(vec![], 0).unwrap();
4908            sw.commit().unwrap();
4909        }
4910
4911        let provider_rw = factory.provider_rw().unwrap();
4912
4913        let bundle = BundleState::builder(1..=1)
4914            .state_present_account_info(
4915                address,
4916                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4917            )
4918            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4919            .revert_account_info(1, address, Some(None))
4920            .revert_storage(1, address, vec![(slot, U256::ZERO)])
4921            .build();
4922
4923        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4924
4925        provider_rw
4926            .tx
4927            .put::<tables::BlockBodyIndices>(
4928                1,
4929                StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4930            )
4931            .unwrap();
4932
4933        provider_rw
4934            .write_state(
4935                &execution_outcome,
4936                OriginalValuesKnown::Yes,
4937                StateWriteConfig {
4938                    write_receipts: false,
4939                    write_account_changesets: true,
4940                    write_storage_changesets: true,
4941                },
4942            )
4943            .unwrap();
4944
4945        let hashed_state =
4946            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4947        provider_rw.write_hashed_state(&hashed_state).unwrap();
4948
4949        let plain_storage_entries = provider_rw
4950            .tx
4951            .cursor_dup_read::<tables::PlainStorageState>()
4952            .unwrap()
4953            .walk(None)
4954            .unwrap()
4955            .collect::<Result<Vec<_>, _>>()
4956            .unwrap();
4957        assert!(plain_storage_entries.is_empty());
4958
4959        let hashed_entry = provider_rw
4960            .tx
4961            .cursor_dup_read::<tables::HashedStorages>()
4962            .unwrap()
4963            .seek_by_key_subkey(hashed_address, hashed_slot)
4964            .unwrap()
4965            .unwrap();
4966        assert_eq!(hashed_entry.key, hashed_slot);
4967        assert_eq!(hashed_entry.value, U256::from(10));
4968
4969        provider_rw.static_file_provider().commit().unwrap();
4970
4971        let sf = factory.static_file_provider();
4972        let storage_cs = sf.storage_changeset(1).unwrap();
4973        assert!(!storage_cs.is_empty());
4974        assert_eq!(storage_cs[0].1.key, slot_key);
4975
4976        let account_cs = sf.account_block_changeset(1).unwrap();
4977        assert!(!account_cs.is_empty());
4978        assert_eq!(account_cs[0].address, address);
4979
4980        let historical_value =
4981            HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
4982                .storage(address, slot_key)
4983                .unwrap();
4984        assert_eq!(historical_value, None);
4985    }
4986
4987    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4988    enum StorageMode {
4989        V1,
4990        V2,
4991    }
4992
4993    fn run_save_blocks_and_verify(mode: StorageMode) {
4994        use alloy_primitives::map::{FbBuildHasher, HashMap};
4995
4996        let factory = create_test_provider_factory();
4997
4998        match mode {
4999            StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
5000            StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
5001        }
5002
5003        let num_blocks = 3u64;
5004        let accounts_per_block = 5usize;
5005        let slots_per_account = 3usize;
5006
5007        let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
5008            SealedHeader::new(
5009                Header { number: 0, difficulty: U256::from(1), ..Default::default() },
5010                B256::ZERO,
5011            ),
5012            Default::default(),
5013        );
5014
5015        let genesis_executed = ExecutedBlock::new(
5016            Arc::new(genesis.try_recover().unwrap()),
5017            Arc::new(BlockExecutionOutput {
5018                result: BlockExecutionResult {
5019                    receipts: vec![],
5020                    requests: Default::default(),
5021                    gas_used: 0,
5022                    blob_gas_used: 0,
5023                },
5024                state: Default::default(),
5025            }),
5026            ComputedTrieData::default(),
5027        );
5028        let provider_rw = factory.provider_rw().unwrap();
5029        provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5030        provider_rw.commit().unwrap();
5031
5032        let mut blocks: Vec<ExecutedBlock> = Vec::new();
5033        let mut parent_hash = B256::ZERO;
5034
5035        for block_num in 1..=num_blocks {
5036            let mut builder = BundleState::builder(block_num..=block_num);
5037
5038            for acct_idx in 0..accounts_per_block {
5039                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5040                let info = AccountInfo {
5041                    nonce: block_num,
5042                    balance: U256::from(block_num * 100 + acct_idx as u64),
5043                    ..Default::default()
5044                };
5045
5046                let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5047                    slots_per_account as u64)
5048                    .map(|s| {
5049                        (
5050                            U256::from(s + acct_idx as u64 * 100),
5051                            (U256::ZERO, U256::from(block_num * 1000 + s)),
5052                        )
5053                    })
5054                    .collect();
5055
5056                let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5057                    .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5058                    .collect();
5059
5060                builder = builder
5061                    .state_present_account_info(address, info)
5062                    .revert_account_info(block_num, address, Some(None))
5063                    .state_storage(address, storage)
5064                    .revert_storage(block_num, address, revert_storage);
5065            }
5066
5067            let bundle = builder.build();
5068
5069            let hashed_state =
5070                HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5071
5072            let header = Header {
5073                number: block_num,
5074                parent_hash,
5075                difficulty: U256::from(1),
5076                ..Default::default()
5077            };
5078            let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5079                header,
5080                Default::default(),
5081            );
5082            parent_hash = block.hash();
5083
5084            let executed = ExecutedBlock::new(
5085                Arc::new(block.try_recover().unwrap()),
5086                Arc::new(BlockExecutionOutput {
5087                    result: BlockExecutionResult {
5088                        receipts: vec![],
5089                        requests: Default::default(),
5090                        gas_used: 0,
5091                        blob_gas_used: 0,
5092                    },
5093                    state: bundle,
5094                }),
5095                ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5096            );
5097            blocks.push(executed);
5098        }
5099
5100        let provider_rw = factory.provider_rw().unwrap();
5101        provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5102        provider_rw.commit().unwrap();
5103
5104        let provider = factory.provider().unwrap();
5105
5106        for block_num in 1..=num_blocks {
5107            for acct_idx in 0..accounts_per_block {
5108                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5109                let hashed_address = keccak256(address);
5110
5111                let ha_entry = provider
5112                    .tx_ref()
5113                    .cursor_read::<tables::HashedAccounts>()
5114                    .unwrap()
5115                    .seek_exact(hashed_address)
5116                    .unwrap();
5117                assert!(
5118                    ha_entry.is_some(),
5119                    "HashedAccounts missing for block {block_num} acct {acct_idx}"
5120                );
5121
5122                for s in 1..=slots_per_account as u64 {
5123                    let slot = U256::from(s + acct_idx as u64 * 100);
5124                    let slot_key = B256::from(slot);
5125                    let hashed_slot = keccak256(slot_key);
5126
5127                    let hs_entry = provider
5128                        .tx_ref()
5129                        .cursor_dup_read::<tables::HashedStorages>()
5130                        .unwrap()
5131                        .seek_by_key_subkey(hashed_address, hashed_slot)
5132                        .unwrap();
5133                    assert!(
5134                        hs_entry.is_some(),
5135                        "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5136                    );
5137                    let entry = hs_entry.unwrap();
5138                    assert_eq!(entry.key, hashed_slot);
5139                    assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5140                }
5141            }
5142        }
5143
5144        for block_num in 1..=num_blocks {
5145            let header = provider.header_by_number(block_num).unwrap();
5146            assert!(header.is_some(), "Header missing for block {block_num}");
5147
5148            let indices = provider.block_body_indices(block_num).unwrap();
5149            assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5150        }
5151
5152        let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5153        let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5154
5155        if mode == StorageMode::V2 {
5156            assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5157            assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5158
5159            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5160            assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5161
5162            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5163            assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5164
5165            provider.static_file_provider().commit().unwrap();
5166            let sf = factory.static_file_provider();
5167
5168            for block_num in 1..=num_blocks {
5169                let account_cs = sf.account_block_changeset(block_num).unwrap();
5170                assert!(
5171                    !account_cs.is_empty(),
5172                    "v2: static file AccountChangeSets should exist for block {block_num}"
5173                );
5174
5175                let storage_cs = sf.storage_changeset(block_num).unwrap();
5176                assert!(
5177                    !storage_cs.is_empty(),
5178                    "v2: static file StorageChangeSets should exist for block {block_num}"
5179                );
5180
5181                for (_, entry) in &storage_cs {
5182                    assert!(
5183                        entry.key != keccak256(entry.key),
5184                        "v2: static file storage changeset should have plain slot keys"
5185                    );
5186                }
5187            }
5188
5189            let rocksdb = factory.rocksdb_provider();
5190            for block_num in 1..=num_blocks {
5191                for acct_idx in 0..accounts_per_block {
5192                    let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5193                    let shards = rocksdb.account_history_shards(address).unwrap();
5194                    assert!(
5195                        !shards.is_empty(),
5196                        "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5197                    );
5198
5199                    for s in 1..=slots_per_account as u64 {
5200                        let slot = U256::from(s + acct_idx as u64 * 100);
5201                        let slot_key = B256::from(slot);
5202                        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5203                        assert!(
5204                            !shards.is_empty(),
5205                            "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5206                        );
5207                    }
5208                }
5209            }
5210        } else {
5211            assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5212            assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5213
5214            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5215            assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5216
5217            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5218            assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5219
5220            for block_num in 1..=num_blocks {
5221                let storage_entries: Vec<_> = provider
5222                    .tx_ref()
5223                    .cursor_dup_read::<tables::StorageChangeSets>()
5224                    .unwrap()
5225                    .walk_range(BlockNumberAddress::range(block_num..=block_num))
5226                    .unwrap()
5227                    .collect::<Result<Vec<_>, _>>()
5228                    .unwrap();
5229                assert!(
5230                    !storage_entries.is_empty(),
5231                    "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5232                );
5233
5234                for (_, entry) in &storage_entries {
5235                    let slot_key = B256::from(entry.key);
5236                    assert!(
5237                        slot_key != keccak256(slot_key),
5238                        "v1: storage changeset keys should be plain (not hashed)"
5239                    );
5240                }
5241            }
5242
5243            let mdbx_account_history =
5244                provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5245            assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5246
5247            let mdbx_storage_history =
5248                provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5249            assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5250        }
5251    }
5252
5253    #[test]
5254    fn test_save_blocks_v1_table_assertions() {
5255        run_save_blocks_and_verify(StorageMode::V1);
5256    }
5257
5258    #[test]
5259    fn test_save_blocks_v2_table_assertions() {
5260        run_save_blocks_and_verify(StorageMode::V2);
5261    }
5262
5263    #[test]
5264    fn test_write_and_remove_state_roundtrip_v2() {
5265        let factory = create_test_provider_factory();
5266        let storage_settings = StorageSettings::v2();
5267        assert!(storage_settings.use_hashed_state());
5268        factory.set_storage_settings_cache(storage_settings);
5269
5270        let address = Address::with_last_byte(1);
5271        let hashed_address = keccak256(address);
5272        let slot = U256::from(5);
5273        let slot_key = B256::from(slot);
5274        let hashed_slot = keccak256(slot_key);
5275
5276        {
5277            let sf = factory.static_file_provider();
5278            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5279            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5280            hw.append_header(&h0, &B256::ZERO).unwrap();
5281            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5282            hw.append_header(&h1, &B256::ZERO).unwrap();
5283            hw.commit().unwrap();
5284
5285            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5286            aw.append_account_changeset(vec![], 0).unwrap();
5287            aw.commit().unwrap();
5288
5289            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5290            sw.append_storage_changeset(vec![], 0).unwrap();
5291            sw.commit().unwrap();
5292        }
5293
5294        {
5295            let provider_rw = factory.provider_rw().unwrap();
5296            provider_rw
5297                .tx
5298                .put::<tables::BlockBodyIndices>(
5299                    0,
5300                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5301                )
5302                .unwrap();
5303            provider_rw
5304                .tx
5305                .put::<tables::BlockBodyIndices>(
5306                    1,
5307                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5308                )
5309                .unwrap();
5310            provider_rw
5311                .tx
5312                .cursor_write::<tables::HashedAccounts>()
5313                .unwrap()
5314                .upsert(
5315                    hashed_address,
5316                    &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5317                )
5318                .unwrap();
5319            provider_rw.commit().unwrap();
5320        }
5321
5322        let provider_rw = factory.provider_rw().unwrap();
5323
5324        let bundle = BundleState::builder(1..=1)
5325            .state_present_account_info(
5326                address,
5327                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5328            )
5329            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5330            .revert_account_info(1, address, Some(None))
5331            .revert_storage(1, address, vec![(slot, U256::ZERO)])
5332            .build();
5333
5334        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5335
5336        provider_rw
5337            .write_state(
5338                &execution_outcome,
5339                OriginalValuesKnown::Yes,
5340                StateWriteConfig {
5341                    write_receipts: false,
5342                    write_account_changesets: true,
5343                    write_storage_changesets: true,
5344                },
5345            )
5346            .unwrap();
5347
5348        let hashed_state =
5349            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5350        provider_rw.write_hashed_state(&hashed_state).unwrap();
5351
5352        let hashed_account = provider_rw
5353            .tx
5354            .cursor_read::<tables::HashedAccounts>()
5355            .unwrap()
5356            .seek_exact(hashed_address)
5357            .unwrap()
5358            .unwrap()
5359            .1;
5360        assert_eq!(hashed_account.nonce, 1);
5361
5362        let hashed_entry = provider_rw
5363            .tx
5364            .cursor_dup_read::<tables::HashedStorages>()
5365            .unwrap()
5366            .seek_by_key_subkey(hashed_address, hashed_slot)
5367            .unwrap()
5368            .unwrap();
5369        assert_eq!(hashed_entry.key, hashed_slot);
5370        assert_eq!(hashed_entry.value, U256::from(10));
5371
5372        let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5373        assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5374
5375        let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5376        assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5377
5378        provider_rw.static_file_provider().commit().unwrap();
5379
5380        let sf = factory.static_file_provider();
5381        let storage_cs = sf.storage_changeset(1).unwrap();
5382        assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5383        assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5384
5385        provider_rw.remove_state_above(0).unwrap();
5386
5387        let restored_account = provider_rw
5388            .tx
5389            .cursor_read::<tables::HashedAccounts>()
5390            .unwrap()
5391            .seek_exact(hashed_address)
5392            .unwrap();
5393        assert!(
5394            restored_account.is_none(),
5395            "v2: account should be removed (didn't exist before block 1)"
5396        );
5397
5398        let storage_gone = provider_rw
5399            .tx
5400            .cursor_dup_read::<tables::HashedStorages>()
5401            .unwrap()
5402            .seek_by_key_subkey(hashed_address, hashed_slot)
5403            .unwrap();
5404        assert!(
5405            storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5406            "v2: storage should be reverted (removed or different key)"
5407        );
5408
5409        let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5410        assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5411
5412        let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5413        assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5414    }
5415
5416    #[test]
5417    fn test_unwind_storage_history_indices_v2() {
5418        let factory = create_test_provider_factory();
5419        factory.set_storage_settings_cache(StorageSettings::v2());
5420
5421        let address = Address::with_last_byte(1);
5422        let slot_key = B256::from(U256::from(42));
5423
5424        {
5425            let rocksdb = factory.rocksdb_provider();
5426            let mut batch = rocksdb.batch();
5427            batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5428            batch.commit().unwrap();
5429
5430            let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5431            assert!(!shards.is_empty(), "history should be written to rocksdb");
5432        }
5433
5434        let provider_rw = factory.provider_rw().unwrap();
5435
5436        let changesets = vec![
5437            (
5438                BlockNumberAddress((7, address)),
5439                StorageEntry { key: slot_key, value: U256::from(5) },
5440            ),
5441            (
5442                BlockNumberAddress((10, address)),
5443                StorageEntry { key: slot_key, value: U256::from(8) },
5444            ),
5445        ];
5446
5447        let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5448        assert_eq!(count, 2);
5449
5450        provider_rw.commit().unwrap();
5451
5452        let rocksdb = factory.rocksdb_provider();
5453        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5454
5455        assert!(
5456            !shards.is_empty(),
5457            "history shards should still exist with block 3 after partial unwind"
5458        );
5459
5460        let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5461        assert!(all_blocks.contains(&3), "block 3 should remain");
5462        assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5463        assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5464    }
5465}