Skip to main content

reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, AddressSet, B256Map, HashMap},
31    Address, BlockHash, BlockNumber, StorageKey, StorageValue, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::{Database, ReaderTxnTracker},
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44        StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55    SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StateReader, StateWriteConfig, StorageChangeSetReader,
65    StoragePath, StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use smallvec::SmallVec;
77use std::{
78    cmp::Ordering,
79    collections::{BTreeMap, BTreeSet},
80    fmt::Debug,
81    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
82    path::PathBuf,
83    sync::Arc,
84};
85use tracing::{debug, instrument, trace};
86
87/// Determines the commit order for database operations.
88#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
89pub enum CommitOrder {
90    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
91    #[default]
92    Normal,
93    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
94    /// Used for unwind operations to allow recovery by truncating static files on restart.
95    Unwind,
96}
97
98impl CommitOrder {
99    /// Returns true if this is unwind commit order.
100    pub const fn is_unwind(&self) -> bool {
101        matches!(self, Self::Unwind)
102    }
103}
104
105/// A [`DatabaseProvider`] that holds a read-only database transaction.
106pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
107
108/// A [`DatabaseProvider`] that holds a read-write database transaction.
109///
110/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
111/// Once that issue is solved, we can probably revert back to being an alias type.
112#[derive(Debug)]
113pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
114    pub DatabaseProvider<<DB as Database>::TXMut, N>,
115);
116
117impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
118    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
119
120    fn deref(&self) -> &Self::Target {
121        &self.0
122    }
123}
124
125impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
126    fn deref_mut(&mut self) -> &mut Self::Target {
127        &mut self.0
128    }
129}
130
131impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
132    for DatabaseProviderRW<DB, N>
133{
134    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
135        &self.0
136    }
137}
138
139impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
140    /// Commit database transaction and static file if it exists.
141    pub fn commit(self) -> ProviderResult<()> {
142        self.0.commit()
143    }
144
145    /// Consume `DbTx` or `DbTxMut`.
146    pub fn into_tx(self) -> <DB as Database>::TXMut {
147        self.0.into_tx()
148    }
149
150    /// Override the minimum pruning distance for testing purposes.
151    #[cfg(any(test, feature = "test-utils"))]
152    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
153        self.0.minimum_pruning_distance = distance;
154        self
155    }
156}
157
158impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
159    for DatabaseProvider<<DB as Database>::TXMut, N>
160{
161    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
162        provider.0
163    }
164}
165
166/// Mode for [`DatabaseProvider::save_blocks`].
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub enum SaveBlocksMode {
169    /// Full mode: write block structure + receipts + state + trie.
170    /// Used by engine/production code.
171    Full,
172    /// Blocks only: write block structure (headers, txs, senders, indices).
173    /// Receipts/state/trie are skipped - they may come later via separate calls.
174    /// Used by `insert_block`.
175    BlocksOnly,
176}
177
178impl SaveBlocksMode {
179    /// Returns `true` if this is [`SaveBlocksMode::Full`].
180    pub const fn with_state(self) -> bool {
181        matches!(self, Self::Full)
182    }
183}
184
185/// A provider struct that fetches data from the database.
186/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
187pub struct DatabaseProvider<TX, N: NodeTypes> {
188    /// Database transaction.
189    tx: TX,
190    /// Chain spec
191    chain_spec: Arc<N::ChainSpec>,
192    /// Static File provider
193    static_file_provider: StaticFileProvider<N::Primitives>,
194    /// Pruning configuration
195    prune_modes: PruneModes,
196    /// Node storage handler.
197    storage: Arc<N::Storage>,
198    /// Storage configuration settings for this node
199    storage_settings: Arc<RwLock<StorageSettings>>,
200    /// `RocksDB` provider
201    rocksdb_provider: RocksDBProvider,
202    /// Changeset cache for trie unwinding
203    changeset_cache: ChangesetCache,
204    /// Task runtime for spawning parallel I/O work.
205    runtime: reth_tasks::Runtime,
206    /// Path to the database directory.
207    db_path: PathBuf,
208    /// Pending `RocksDB` batches to be committed at provider commit time.
209    pending_rocksdb_batches: PendingRocksDBBatches,
210    /// Commit order for database operations.
211    commit_order: CommitOrder,
212    /// Minimum distance from tip required for pruning
213    minimum_pruning_distance: u64,
214    /// Database provider metrics
215    metrics: metrics::DatabaseProviderMetrics,
216    /// Database handle used to inspect active MDBX readers during unwind commits.
217    reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
218}
219
220impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        let mut s = f.debug_struct("DatabaseProvider");
223        s.field("tx", &self.tx)
224            .field("chain_spec", &self.chain_spec)
225            .field("static_file_provider", &self.static_file_provider)
226            .field("prune_modes", &self.prune_modes)
227            .field("storage", &self.storage)
228            .field("storage_settings", &self.storage_settings)
229            .field("rocksdb_provider", &self.rocksdb_provider)
230            .field("changeset_cache", &self.changeset_cache)
231            .field("runtime", &self.runtime)
232            .field("pending_rocksdb_batches", &"<pending batches>")
233            .field("commit_order", &self.commit_order)
234            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
235            .field("reader_txn_tracker", &"<reader txn tracker>")
236            .finish()
237    }
238}
239
240impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
241    /// Returns reference to prune modes.
242    pub const fn prune_modes_ref(&self) -> &PruneModes {
243        &self.prune_modes
244    }
245
246    /// Sets the minimum pruning distance.
247    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
248        self.minimum_pruning_distance = distance;
249        self
250    }
251
252    /// Attaches reader tracking so unwind commits can wait on active readers.
253    pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
254    where
255        T: ReaderTxnTracker + 'static,
256    {
257        self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
258        self
259    }
260}
261
262impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
263    /// Commits unwind writes in MDBX -> `RocksDB` -> static-file order.
264    ///
265    /// This keeps MDBX as the first durable step so an interrupted unwind can be recovered by
266    /// truncating static files from checkpoints on the next startup.
267    ///
268    /// This waits after the MDBX commit so readers holding older MDBX-visible views cannot overlap
269    /// later cross-store unwind steps.
270    ///
271    /// Historical `storage_v2` reads ignore `RocksDB` history entries above their MDBX-visible tip,
272    /// so no additional post-`RocksDB` wait is needed before static-file commit.
273    fn commit_unwind(self) -> ProviderResult<()> {
274        let storage_v2 = self.cached_storage_settings().storage_v2;
275        let reader_txn_tracker = self.reader_txn_tracker.clone();
276        self.tx.commit()?;
277
278        if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
279            reader_txn_tracker.wait_for_pre_commit_readers();
280        }
281
282        if storage_v2 {
283            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
284            for batch in batches {
285                self.rocksdb_provider.commit_batch(batch)?;
286            }
287        }
288
289        self.static_file_provider.commit()?;
290        Ok(())
291    }
292
293    /// State provider for latest state
294    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
295        trace!(target: "providers::db", "Returning latest state provider");
296        Box::new(LatestStateProviderRef::new(self))
297    }
298
299    /// Storage provider for state at that given block hash
300    pub fn history_by_block_hash<'a>(
301        &'a self,
302        block_hash: BlockHash,
303    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
304        let block_number =
305            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
306        self.history_by_block_number(block_number)
307    }
308
309    /// Storage provider for state at that given block number
310    pub fn history_by_block_number<'a>(
311        &'a self,
312        mut block_number: BlockNumber,
313    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
314        if block_number == self.best_block_number().unwrap_or_default() &&
315            block_number == self.last_block_number().unwrap_or_default()
316        {
317            return Ok(Box::new(LatestStateProviderRef::new(self)))
318        }
319
320        // +1 as the changeset that we want is the one that was applied after this block.
321        block_number += 1;
322
323        let account_history_prune_checkpoint =
324            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
325        let storage_history_prune_checkpoint =
326            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
327
328        let mut state_provider =
329            HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
330        // If we pruned account or storage history, we can't return state on every historical block.
331        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
332        if let Some(prune_checkpoint_block_number) =
333            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
334        {
335            state_provider = state_provider.with_lowest_available_account_history_block_number(
336                prune_checkpoint_block_number + 1,
337            );
338        }
339        if let Some(prune_checkpoint_block_number) =
340            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
341        {
342            state_provider = state_provider.with_lowest_available_storage_history_block_number(
343                prune_checkpoint_block_number + 1,
344            );
345        }
346
347        Ok(Box::new(state_provider))
348    }
349
350    #[cfg(feature = "test-utils")]
351    /// Sets the prune modes for provider.
352    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
353        self.prune_modes = prune_modes;
354    }
355}
356
357impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
358    type Primitives = N::Primitives;
359}
360
361impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
362    /// Returns a static file provider
363    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
364        self.static_file_provider.clone()
365    }
366
367    fn get_static_file_writer(
368        &self,
369        block: BlockNumber,
370        segment: StaticFileSegment,
371    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
372        self.static_file_provider.get_writer(block, segment)
373    }
374}
375
376impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
377    /// Returns the `RocksDB` provider.
378    fn rocksdb_provider(&self) -> RocksDBProvider {
379        self.rocksdb_provider.clone()
380    }
381
382    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
383        self.pending_rocksdb_batches.lock().push(batch);
384    }
385
386    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
387        let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
388        for batch in batches {
389            self.rocksdb_provider.commit_batch(batch)?;
390        }
391        Ok(())
392    }
393}
394
395impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
396    for DatabaseProvider<TX, N>
397{
398    type ChainSpec = N::ChainSpec;
399
400    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
401        self.chain_spec.clone()
402    }
403}
404
405impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
406    /// Creates a provider with an inner read-write transaction.
407    #[expect(clippy::too_many_arguments)]
408    fn new_rw_inner(
409        tx: TX,
410        chain_spec: Arc<N::ChainSpec>,
411        static_file_provider: StaticFileProvider<N::Primitives>,
412        prune_modes: PruneModes,
413        storage: Arc<N::Storage>,
414        storage_settings: Arc<RwLock<StorageSettings>>,
415        rocksdb_provider: RocksDBProvider,
416        changeset_cache: ChangesetCache,
417        runtime: reth_tasks::Runtime,
418        db_path: PathBuf,
419        commit_order: CommitOrder,
420    ) -> Self {
421        Self {
422            tx,
423            chain_spec,
424            static_file_provider,
425            prune_modes,
426            storage,
427            storage_settings,
428            rocksdb_provider,
429            changeset_cache,
430            runtime,
431            db_path,
432            pending_rocksdb_batches: Default::default(),
433            commit_order,
434            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
435            metrics: metrics::DatabaseProviderMetrics::default(),
436            reader_txn_tracker: None,
437        }
438    }
439
440    /// Creates a provider with an inner read-write transaction using normal commit order.
441    #[expect(clippy::too_many_arguments)]
442    pub fn new_rw(
443        tx: TX,
444        chain_spec: Arc<N::ChainSpec>,
445        static_file_provider: StaticFileProvider<N::Primitives>,
446        prune_modes: PruneModes,
447        storage: Arc<N::Storage>,
448        storage_settings: Arc<RwLock<StorageSettings>>,
449        rocksdb_provider: RocksDBProvider,
450        changeset_cache: ChangesetCache,
451        runtime: reth_tasks::Runtime,
452        db_path: PathBuf,
453    ) -> Self {
454        Self::new_rw_inner(
455            tx,
456            chain_spec,
457            static_file_provider,
458            prune_modes,
459            storage,
460            storage_settings,
461            rocksdb_provider,
462            changeset_cache,
463            runtime,
464            db_path,
465            CommitOrder::Normal,
466        )
467    }
468
469    /// Creates a provider with an inner read-write transaction using unwind commit order.
470    #[expect(clippy::too_many_arguments)]
471    pub fn new_unwind_rw(
472        tx: TX,
473        chain_spec: Arc<N::ChainSpec>,
474        static_file_provider: StaticFileProvider<N::Primitives>,
475        prune_modes: PruneModes,
476        storage: Arc<N::Storage>,
477        storage_settings: Arc<RwLock<StorageSettings>>,
478        rocksdb_provider: RocksDBProvider,
479        changeset_cache: ChangesetCache,
480        runtime: reth_tasks::Runtime,
481        db_path: PathBuf,
482    ) -> Self {
483        Self::new_rw_inner(
484            tx,
485            chain_spec,
486            static_file_provider,
487            prune_modes,
488            storage,
489            storage_settings,
490            rocksdb_provider,
491            changeset_cache,
492            runtime,
493            db_path,
494            CommitOrder::Unwind,
495        )
496    }
497}
498
499impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
500    fn as_ref(&self) -> &Self {
501        self
502    }
503}
504
505impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
506    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
507    ///
508    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
509    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
510    where
511        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
512    {
513        let rocksdb = self.rocksdb_provider();
514        let rocksdb_batch = rocksdb.batch();
515
516        let (result, raw_batch) = f(rocksdb_batch)?;
517
518        if let Some(batch) = raw_batch {
519            self.set_pending_rocksdb_batch(batch);
520        }
521        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
522
523        Ok(result)
524    }
525
526    /// Creates the context for static file writes.
527    fn static_file_write_ctx(
528        &self,
529        save_mode: SaveBlocksMode,
530        first_block: BlockNumber,
531        last_block: BlockNumber,
532    ) -> ProviderResult<StaticFileWriteCtx> {
533        let tip = self.last_block_number()?.max(last_block);
534        Ok(StaticFileWriteCtx {
535            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
536                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
537            write_receipts: save_mode.with_state() &&
538                EitherWriter::receipts_destination(self).is_static_file(),
539            write_account_changesets: save_mode.with_state() &&
540                EitherWriterDestination::account_changesets(self).is_static_file(),
541            write_storage_changesets: save_mode.with_state() &&
542                EitherWriterDestination::storage_changesets(self).is_static_file(),
543            tip,
544            receipts_prune_mode: self.prune_modes.receipts,
545            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
546            receipts_prunable: self
547                .static_file_provider
548                .get_highest_static_file_tx(StaticFileSegment::Receipts)
549                .is_none() &&
550                PruneMode::Distance(self.minimum_pruning_distance)
551                    .should_prune(first_block, tip),
552        })
553    }
554
555    /// Creates the context for `RocksDB` writes.
556    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
557        RocksDBWriteCtx {
558            first_block_number: first_block,
559            prune_tx_lookup: self.prune_modes.transaction_lookup,
560            storage_settings: self.cached_storage_settings(),
561            pending_batches: self.pending_rocksdb_batches.clone(),
562        }
563    }
564
565    /// Writes executed blocks and state to storage.
566    ///
567    /// This method parallelizes static file (SF) writes with MDBX writes.
568    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
569    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
570    ///
571    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
572    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
573    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
574    pub fn save_blocks(
575        &self,
576        blocks: Vec<ExecutedBlock<N::Primitives>>,
577        save_mode: SaveBlocksMode,
578    ) -> ProviderResult<()> {
579        if blocks.is_empty() {
580            debug!(target: "providers::db", "Attempted to write empty block range");
581            return Ok(())
582        }
583
584        let total_start = Instant::now();
585        let block_count = blocks.len() as u64;
586        let first_number = blocks.first().unwrap().recovered_block().number();
587        let last_block_number = blocks.last().unwrap().recovered_block().number();
588
589        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
590
591        // Compute tx_nums upfront (both threads need these)
592        let first_tx_num = self
593            .tx
594            .cursor_read::<tables::TransactionBlocks>()?
595            .last()?
596            .map(|(n, _)| n + 1)
597            .unwrap_or_default();
598
599        let tx_nums: SmallVec<[TxNumber; 4]> = {
600            let mut nums = SmallVec::with_capacity(blocks.len());
601            let mut current = first_tx_num;
602            for block in &blocks {
603                nums.push(current);
604                current += block.recovered_block().body().transaction_count() as u64;
605            }
606            nums
607        };
608
609        let mut timings =
610            metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
611
612        // avoid capturing &self.tx in scope below.
613        let sf_provider = &self.static_file_provider;
614        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
615        let rocksdb_provider = self.rocksdb_provider.clone();
616        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
617        let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
618
619        let mut sf_result = None;
620        let mut rocksdb_result = None;
621
622        // Write to all backends in parallel.
623        let runtime = &self.runtime;
624        // Propagate tracing context into rayon-spawned threads so that static file
625        // and RocksDB write spans appear as children of save_blocks in traces.
626        let span = tracing::Span::current();
627        runtime.storage_pool().in_place_scope(|s| {
628            // SF writes
629            s.spawn(|_| {
630                let _guard = span.enter();
631                let start = Instant::now();
632                sf_result = Some(
633                    sf_provider
634                        .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
635                        .map(|()| start.elapsed()),
636                );
637            });
638
639            // RocksDB writes
640            if rocksdb_enabled {
641                s.spawn(|_| {
642                    let _guard = span.enter();
643                    let start = Instant::now();
644                    rocksdb_result = Some(
645                        rocksdb_provider
646                            .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
647                            .map(|()| start.elapsed()),
648                    );
649                });
650            }
651
652            // MDBX writes
653            let mdbx_start = Instant::now();
654
655            // Collect all transaction hashes across all blocks, sort them, and write in batch
656            if !self.cached_storage_settings().storage_v2 &&
657                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
658            {
659                let start = Instant::now();
660                let total_tx_count: usize =
661                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
662                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
663                for (i, block) in blocks.iter().enumerate() {
664                    let recovered_block = block.recovered_block();
665                    for (tx_num, transaction) in
666                        (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
667                    {
668                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
669                    }
670                }
671
672                // Sort by hash for optimal MDBX insertion performance
673                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
674
675                // Write all transaction hash numbers in a single batch
676                self.with_rocksdb_batch(|batch| {
677                    let mut tx_hash_writer =
678                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
679                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
680                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
681                    Ok(((), raw_batch))
682                })?;
683                self.metrics.record_duration(
684                    metrics::Action::InsertTransactionHashNumbers,
685                    start.elapsed(),
686                );
687            }
688
689            for (i, block) in blocks.iter().enumerate() {
690                let recovered_block = block.recovered_block();
691
692                let start = Instant::now();
693                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
694                timings.insert_block += start.elapsed();
695
696                if save_mode.with_state() {
697                    let execution_output = block.execution_outcome();
698
699                    // Write state and changesets to the database.
700                    // Must be written after blocks because of the receipt lookup.
701                    // Skip receipts/account changesets if they're being written to static files.
702                    let start = Instant::now();
703                    self.write_state(
704                        WriteStateInput::Single {
705                            outcome: execution_output,
706                            block: recovered_block.number(),
707                        },
708                        OriginalValuesKnown::No,
709                        StateWriteConfig {
710                            write_receipts: !sf_ctx.write_receipts,
711                            write_account_changesets: !sf_ctx.write_account_changesets,
712                            write_storage_changesets: !sf_ctx.write_storage_changesets,
713                        },
714                    )?;
715                    timings.write_state += start.elapsed();
716                }
717            }
718
719            // Write all hashed state and trie updates in single batches.
720            // This reduces cursor open/close overhead from N calls to 1.
721            if save_mode.with_state() {
722                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
723                let start = Instant::now();
724                let merged_hashed_state = HashedPostStateSorted::merge_batch(
725                    blocks.iter().rev().map(|b| b.trie_data().hashed_state),
726                );
727                if !merged_hashed_state.is_empty() {
728                    self.write_hashed_state(&merged_hashed_state)?;
729                }
730                timings.write_hashed_state += start.elapsed();
731
732                let start = Instant::now();
733                let merged_trie =
734                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
735                if !merged_trie.is_empty() {
736                    self.write_trie_updates_sorted(&merged_trie)?;
737                }
738                timings.write_trie_updates += start.elapsed();
739            }
740
741            // Full mode: update history indices
742            if save_mode.with_state() {
743                let start = Instant::now();
744                self.update_history_indices(first_number..=last_block_number)?;
745                timings.update_history_indices = start.elapsed();
746            }
747
748            // Update pipeline progress
749            let start = Instant::now();
750            self.update_pipeline_stages(last_block_number, false)?;
751            timings.update_pipeline_stages = start.elapsed();
752
753            timings.mdbx = mdbx_start.elapsed();
754
755            Ok::<_, ProviderError>(())
756        })?;
757
758        // Collect results from spawned tasks
759        timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
760
761        if rocksdb_enabled {
762            timings.rocksdb = rocksdb_result.ok_or_else(|| {
763                ProviderError::Database(reth_db_api::DatabaseError::Other(
764                    "RocksDB thread panicked".into(),
765                ))
766            })??;
767        }
768
769        timings.total = total_start.elapsed();
770
771        self.metrics.record_save_blocks(&timings);
772        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
773
774        Ok(())
775    }
776
777    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
778    ///
779    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
780    #[instrument(level = "debug", target = "providers::db", skip_all)]
781    fn insert_block_mdbx_only(
782        &self,
783        block: &RecoveredBlock<BlockTy<N>>,
784        first_tx_num: TxNumber,
785    ) -> ProviderResult<StoredBlockBodyIndices> {
786        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
787            EitherWriterDestination::senders(self).is_database()
788        {
789            let start = Instant::now();
790            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
791            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
792            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
793                cursor.append(tx_num, &sender)?;
794            }
795            self.metrics
796                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
797        }
798
799        let block_number = block.number();
800        let tx_count = block.body().transaction_count() as u64;
801
802        let start = Instant::now();
803        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
804        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
805
806        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
807
808        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
809    }
810
811    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
812    /// `Ommers`/`Withdrawals`).
813    fn write_block_body_indices(
814        &self,
815        block_number: BlockNumber,
816        body: &BodyTy<N>,
817        first_tx_num: TxNumber,
818        tx_count: u64,
819    ) -> ProviderResult<()> {
820        // MDBX: BlockBodyIndices
821        let start = Instant::now();
822        self.tx
823            .cursor_write::<tables::BlockBodyIndices>()?
824            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
825        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
826
827        // MDBX: TransactionBlocks (last tx -> block mapping)
828        if tx_count > 0 {
829            let start = Instant::now();
830            self.tx
831                .cursor_write::<tables::TransactionBlocks>()?
832                .append(first_tx_num + tx_count - 1, &block_number)?;
833            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
834        }
835
836        // MDBX: Ommers/Withdrawals
837        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
838
839        Ok(())
840    }
841
842    /// Unwinds trie state starting at and including the given block.
843    ///
844    /// This includes calculating the resulted state root and comparing it with the parent block
845    /// state root.
846    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
847        let changed_accounts = self.account_changesets_range(from..)?;
848
849        // Unwind account hashes.
850        self.unwind_account_hashing(changed_accounts.iter())?;
851
852        // Unwind account history indices.
853        self.unwind_account_history_indices(changed_accounts.iter())?;
854
855        let changed_storages = self.storage_changesets_range(from..)?;
856
857        // Unwind storage hashes.
858        self.unwind_storage_hashing(changed_storages.iter().copied())?;
859
860        // Unwind storage history indices.
861        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
862
863        // Unwind accounts/storages trie tables using the revert.
864        // Get the database tip block number
865        let db_tip_block = self
866            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
867            .as_ref()
868            .map(|chk| chk.block_number)
869            .ok_or_else(|| ProviderError::InsufficientChangesets {
870                requested: from,
871                available: 0..=0,
872            })?;
873
874        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
875        self.write_trie_updates_sorted(&trie_revert)?;
876
877        Ok(())
878    }
879
880    /// Removes receipts from all transactions starting with provided number (inclusive).
881    fn remove_receipts_from(
882        &self,
883        from_tx: TxNumber,
884        last_block: BlockNumber,
885    ) -> ProviderResult<()> {
886        // iterate over block body and remove receipts
887        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
888
889        if EitherWriter::receipts_destination(self).is_static_file() {
890            let static_file_receipt_num =
891                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
892
893            let to_delete = static_file_receipt_num
894                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
895                .unwrap_or_default();
896
897            self.static_file_provider
898                .latest_writer(StaticFileSegment::Receipts)?
899                .prune_receipts(to_delete, last_block)?;
900        }
901
902        Ok(())
903    }
904
905    /// Writes bytecodes to MDBX.
906    fn write_bytecodes(
907        &self,
908        bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
909    ) -> ProviderResult<()> {
910        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
911        for (hash, bytecode) in bytecodes {
912            bytecodes_cursor.upsert(hash, &bytecode)?;
913        }
914        Ok(())
915    }
916}
917
918impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
919    fn try_into_history_at_block(
920        self,
921        mut block_number: BlockNumber,
922    ) -> ProviderResult<StateProviderBox> {
923        let best_block = self.best_block_number().unwrap_or_default();
924
925        // Reject requests for blocks beyond the best block
926        if block_number > best_block {
927            return Err(ProviderError::BlockNotExecuted {
928                requested: block_number,
929                executed: best_block,
930            });
931        }
932
933        // If requesting state at the best block, use the latest state provider
934        if block_number == best_block {
935            return Ok(Box::new(LatestStateProvider::new(self)));
936        }
937
938        // +1 as the changeset that we want is the one that was applied after this block.
939        block_number += 1;
940
941        let account_history_prune_checkpoint =
942            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
943        let storage_history_prune_checkpoint =
944            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
945        let changeset_cache = self.changeset_cache.clone();
946
947        let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
948
949        // If we pruned account or storage history, we can't return state on every historical block.
950        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
951        if let Some(prune_checkpoint_block_number) =
952            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
953        {
954            state_provider = state_provider.with_lowest_available_account_history_block_number(
955                prune_checkpoint_block_number + 1,
956            );
957        }
958        if let Some(prune_checkpoint_block_number) =
959            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
960        {
961            state_provider = state_provider.with_lowest_available_storage_history_block_number(
962                prune_checkpoint_block_number + 1,
963            );
964        }
965
966        Ok(Box::new(state_provider))
967    }
968}
969
970/// For a given key, unwind all history shards that contain block numbers at or above the given
971/// block number.
972///
973/// S - Sharded key subtype.
974/// T - Table to walk over.
975/// C - Cursor implementation.
976///
977/// This function walks the entries from the given start key and deletes all shards that belong to
978/// the key and contain block numbers at or above the given block number. Shards entirely below
979/// the block number are preserved.
980///
981/// The boundary shard (the shard that spans across the block number) is removed from the database.
982/// Any indices that are below the block number are filtered out and returned for reinsertion.
983/// The boundary shard is returned for reinsertion (if it's not empty).
984fn unwind_history_shards<S, T, C>(
985    cursor: &mut C,
986    start_key: T::Key,
987    block_number: BlockNumber,
988    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
989) -> ProviderResult<Vec<u64>>
990where
991    T: Table<Value = BlockNumberList>,
992    T::Key: AsRef<ShardedKey<S>>,
993    C: DbCursorRO<T> + DbCursorRW<T>,
994{
995    // Start from the given key and iterate through shards
996    let mut item = cursor.seek_exact(start_key)?;
997    while let Some((sharded_key, list)) = item {
998        // If the shard does not belong to the key, break.
999        if !shard_belongs_to_key(&sharded_key) {
1000            break
1001        }
1002
1003        // Always delete the current shard from the database first
1004        // We'll decide later what (if anything) to reinsert
1005        cursor.delete_current()?;
1006
1007        // Get the first (lowest) block number in this shard
1008        // All block numbers in a shard are sorted in ascending order
1009        let first = list.iter().next().expect("List can't be empty");
1010
1011        // Case 1: Entire shard is at or above the unwinding point
1012        // Keep it deleted (don't return anything for reinsertion)
1013        if first >= block_number {
1014            item = cursor.prev()?;
1015            continue
1016        }
1017        // Case 2: This is a boundary shard (spans across the unwinding point)
1018        // The shard contains some blocks below and some at/above the unwinding point
1019        else if block_number <= sharded_key.as_ref().highest_block_number {
1020            // Return only the block numbers that are below the unwinding point
1021            // These will be reinserted to preserve the historical data
1022            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1023        }
1024        // Case 3: Entire shard is below the unwinding point
1025        // Return all block numbers for reinsertion (preserve entire shard)
1026        return Ok(list.iter().collect::<Vec<_>>())
1027    }
1028
1029    // No shards found or all processed
1030    Ok(Vec::new())
1031}
1032
1033impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1034    /// Creates a provider with an inner read-only transaction.
1035    #[expect(clippy::too_many_arguments)]
1036    pub fn new(
1037        tx: TX,
1038        chain_spec: Arc<N::ChainSpec>,
1039        static_file_provider: StaticFileProvider<N::Primitives>,
1040        prune_modes: PruneModes,
1041        storage: Arc<N::Storage>,
1042        storage_settings: Arc<RwLock<StorageSettings>>,
1043        rocksdb_provider: RocksDBProvider,
1044        changeset_cache: ChangesetCache,
1045        runtime: reth_tasks::Runtime,
1046        db_path: PathBuf,
1047    ) -> Self {
1048        Self {
1049            tx,
1050            chain_spec,
1051            static_file_provider,
1052            prune_modes,
1053            storage,
1054            storage_settings,
1055            rocksdb_provider,
1056            changeset_cache,
1057            runtime,
1058            db_path,
1059            pending_rocksdb_batches: Default::default(),
1060            commit_order: CommitOrder::Normal,
1061            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1062            metrics: metrics::DatabaseProviderMetrics::default(),
1063            reader_txn_tracker: None,
1064        }
1065    }
1066
1067    /// Consume `DbTx` or `DbTxMut`.
1068    pub fn into_tx(self) -> TX {
1069        self.tx
1070    }
1071
1072    /// Pass `DbTx` or `DbTxMut` mutable reference.
1073    pub const fn tx_mut(&mut self) -> &mut TX {
1074        &mut self.tx
1075    }
1076
1077    /// Pass `DbTx` or `DbTxMut` immutable reference.
1078    pub const fn tx_ref(&self) -> &TX {
1079        &self.tx
1080    }
1081
1082    /// Returns a reference to the chain specification.
1083    pub fn chain_spec(&self) -> &N::ChainSpec {
1084        &self.chain_spec
1085    }
1086}
1087
1088impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1089    fn recovered_block<H, HF, B, BF>(
1090        &self,
1091        id: BlockHashOrNumber,
1092        _transaction_kind: TransactionVariant,
1093        header_by_number: HF,
1094        construct_block: BF,
1095    ) -> ProviderResult<Option<B>>
1096    where
1097        H: AsRef<HeaderTy<N>>,
1098        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1099        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1100    {
1101        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1102        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1103
1104        // Get the block body
1105        //
1106        // If the body indices are not found, this means that the transactions either do not exist
1107        // in the database yet, or they do exit but are not indexed. If they exist but are not
1108        // indexed, we don't have enough information to return the block anyways, so we return
1109        // `None`.
1110        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1111
1112        let tx_range = body.tx_num_range();
1113
1114        let transactions = if tx_range.is_empty() {
1115            vec![]
1116        } else {
1117            self.transactions_by_tx_range(tx_range.clone())?
1118        };
1119
1120        let body = self
1121            .storage
1122            .reader()
1123            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1124            .pop()
1125            .ok_or(ProviderError::InvalidStorageOutput)?;
1126
1127        let senders = if tx_range.is_empty() {
1128            vec![]
1129        } else {
1130            let known_senders: HashMap<TxNumber, Address> =
1131                EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1132
1133            let mut senders = Vec::with_capacity(body.transactions().len());
1134            for (tx_num, tx) in tx_range.zip(body.transactions()) {
1135                match known_senders.get(&tx_num) {
1136                    None => {
1137                        let sender = tx.recover_signer_unchecked()?;
1138                        senders.push(sender);
1139                    }
1140                    Some(sender) => senders.push(*sender),
1141                }
1142            }
1143            senders
1144        };
1145
1146        construct_block(header, body, senders)
1147    }
1148
1149    /// Returns a range of blocks from the database.
1150    ///
1151    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1152    /// construct blocks from the following inputs:
1153    ///     – Header
1154    ///     - Range of transaction numbers
1155    ///     – Ommers
1156    ///     – Withdrawals
1157    ///     – Senders
1158    fn block_range<F, H, HF, R>(
1159        &self,
1160        range: RangeInclusive<BlockNumber>,
1161        headers_range: HF,
1162        mut assemble_block: F,
1163    ) -> ProviderResult<Vec<R>>
1164    where
1165        H: AsRef<HeaderTy<N>>,
1166        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1167        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1168    {
1169        if range.is_empty() {
1170            return Ok(Vec::new())
1171        }
1172
1173        let len = range.end().saturating_sub(*range.start()) as usize + 1;
1174        let mut blocks = Vec::with_capacity(len);
1175
1176        let headers = headers_range(range.clone())?;
1177
1178        // If the body indices are not found, this means that the transactions either do
1179        // not exist in the database yet, or they do exit but are
1180        // not indexed. If they exist but are not indexed, we don't
1181        // have enough information to return the block anyways, so
1182        // we skip the block.
1183        let present_headers = self
1184            .block_body_indices_range(range)?
1185            .into_iter()
1186            .map(|b| b.tx_num_range())
1187            .zip(headers)
1188            .collect::<Vec<_>>();
1189
1190        let mut inputs = Vec::with_capacity(present_headers.len());
1191        for (tx_range, header) in &present_headers {
1192            let transactions = if tx_range.is_empty() {
1193                Vec::new()
1194            } else {
1195                self.transactions_by_tx_range(tx_range.clone())?
1196            };
1197
1198            inputs.push((header.as_ref(), transactions));
1199        }
1200
1201        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1202
1203        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1204            blocks.push(assemble_block(header, body, tx_range)?);
1205        }
1206
1207        Ok(blocks)
1208    }
1209
1210    /// Returns a range of blocks from the database, along with the senders of each
1211    /// transaction in the blocks.
1212    ///
1213    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1214    /// construct blocks from the following inputs:
1215    ///     – Header
1216    ///     - Transactions
1217    ///     – Ommers
1218    ///     – Withdrawals
1219    ///     – Senders
1220    fn block_with_senders_range<H, HF, B, BF>(
1221        &self,
1222        range: RangeInclusive<BlockNumber>,
1223        headers_range: HF,
1224        assemble_block: BF,
1225    ) -> ProviderResult<Vec<B>>
1226    where
1227        H: AsRef<HeaderTy<N>>,
1228        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1229        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1230    {
1231        self.block_range(range, headers_range, |header, body, tx_range| {
1232            let senders = if tx_range.is_empty() {
1233                Vec::new()
1234            } else {
1235                let known_senders: HashMap<TxNumber, Address> =
1236                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1237
1238                let mut senders = Vec::with_capacity(body.transactions().len());
1239                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1240                    match known_senders.get(&tx_num) {
1241                        None => {
1242                            // recover the sender from the transaction if not found
1243                            let sender = tx.recover_signer_unchecked()?;
1244                            senders.push(sender);
1245                        }
1246                        Some(sender) => senders.push(*sender),
1247                    }
1248                }
1249
1250                senders
1251            };
1252
1253            assemble_block(header, body, senders)
1254        })
1255    }
1256
1257    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1258    /// [`tables::PlainAccountState`] and [`tables::PlainStorageState`] tables, based on the given
1259    /// storage and account changesets.
1260    fn populate_bundle_state(
1261        &self,
1262        account_changeset: Vec<(u64, AccountBeforeTx)>,
1263        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1264        mut get_account: impl FnMut(Address) -> ProviderResult<Option<Account>>,
1265        mut get_storage: impl FnMut(Address, StorageKey) -> ProviderResult<Option<StorageValue>>,
1266    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1267        // iterate previous value and get plain state value to create changeset
1268        // Double option around Account represent if Account state is know (first option) and
1269        // account is removed (Second Option)
1270        let mut state: BundleStateInit = HashMap::default();
1271
1272        // This is not working for blocks that are not at tip. as plain state is not the last
1273        // state of end range. We should rename the functions or add support to access
1274        // History state. Accessing history state can be tricky but we are not gaining
1275        // anything.
1276
1277        let mut reverts: RevertsInit = HashMap::default();
1278
1279        // add account changeset changes
1280        for (block_number, account_before) in account_changeset.into_iter().rev() {
1281            let AccountBeforeTx { info: old_info, address } = account_before;
1282            match state.entry(address) {
1283                hash_map::Entry::Vacant(entry) => {
1284                    let new_info = get_account(address)?;
1285                    entry.insert((old_info, new_info, HashMap::default()));
1286                }
1287                hash_map::Entry::Occupied(mut entry) => {
1288                    // overwrite old account state.
1289                    entry.get_mut().0 = old_info;
1290                }
1291            }
1292            // insert old info into reverts.
1293            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1294        }
1295
1296        // add storage changeset changes
1297        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1298            let BlockNumberAddress((block_number, address)) = block_and_address;
1299            // get account state or insert from plain state.
1300            let account_state = match state.entry(address) {
1301                hash_map::Entry::Vacant(entry) => {
1302                    let present_info = get_account(address)?;
1303                    entry.insert((present_info, present_info, HashMap::default()))
1304                }
1305                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1306            };
1307
1308            // match storage.
1309            match account_state.2.entry(old_storage.key) {
1310                hash_map::Entry::Vacant(entry) => {
1311                    let new_storage = get_storage(address, old_storage.key)?.unwrap_or_default();
1312                    entry.insert((old_storage.value, new_storage));
1313                }
1314                hash_map::Entry::Occupied(mut entry) => {
1315                    entry.get_mut().0 = old_storage.value;
1316                }
1317            };
1318
1319            reverts
1320                .entry(block_number)
1321                .or_default()
1322                .entry(address)
1323                .or_default()
1324                .1
1325                .push(old_storage);
1326        }
1327
1328        Ok((state, reverts))
1329    }
1330
1331    /// Invokes [`populate_bundle_state`](Self::populate_bundle_state) with the given plain state
1332    /// cursors.
1333    fn populate_bundle_state_plain(
1334        &self,
1335        account_changeset: Vec<(u64, AccountBeforeTx)>,
1336        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1337        plain_accounts_cursor: &mut impl DbCursorRO<tables::PlainAccountState>,
1338        plain_storage_cursor: &mut impl DbDupCursorRO<tables::PlainStorageState>,
1339    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1340        self.populate_bundle_state(
1341            account_changeset,
1342            storage_changeset,
1343            |address| Ok(plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1)),
1344            |address, storage_key| {
1345                Ok(plain_storage_cursor
1346                    .seek_by_key_subkey(address, storage_key)?
1347                    .filter(|s| s.key == storage_key)
1348                    .map(|s| s.value))
1349            },
1350        )
1351    }
1352
1353    /// Like [`populate_bundle_state`](Self::populate_bundle_state), but reads current values from
1354    /// `HashedAccounts`/`HashedStorages`. Addresses and storage keys are hashed via `keccak256`
1355    /// for DB lookups. The output `BundleStateInit`/`RevertsInit` structures remain keyed by
1356    /// plain address and plain storage key.
1357    fn populate_bundle_state_hashed(
1358        &self,
1359        account_changeset: Vec<(u64, AccountBeforeTx)>,
1360        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1361        hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1362        hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1363    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1364        self.populate_bundle_state(
1365            account_changeset,
1366            storage_changeset,
1367            |address| Ok(hashed_accounts_cursor.seek_exact(keccak256(address))?.map(|kv| kv.1)),
1368            |address, storage_key| {
1369                let hashed_storage_key = keccak256(storage_key);
1370                Ok(hashed_storage_cursor
1371                    .seek_by_key_subkey(keccak256(address), hashed_storage_key)?
1372                    .filter(|s| s.key == hashed_storage_key)
1373                    .map(|s| s.value))
1374            },
1375        )
1376    }
1377
1378    fn populate_bundle_state_with_provider(
1379        &self,
1380        account_changeset: Vec<(u64, AccountBeforeTx)>,
1381        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1382        state_provider: impl StateProvider,
1383    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1384        self.populate_bundle_state(
1385            account_changeset,
1386            storage_changeset,
1387            |address| state_provider.basic_account(&address),
1388            |address, storage_key| state_provider.storage(address, storage_key),
1389        )
1390    }
1391}
1392
1393impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1394    /// Insert history index to the database.
1395    ///
1396    /// For each updated partial key, this function retrieves the last shard from the database
1397    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1398    /// the shards back into the database.
1399    ///
1400    /// This function is used by history indexing stages.
1401    fn append_history_index<P, T>(
1402        &self,
1403        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1404        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1405    ) -> ProviderResult<()>
1406    where
1407        P: Copy,
1408        T: Table<Value = BlockNumberList>,
1409    {
1410        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1411        // will append duplicate entries instead of updating existing ones, causing data corruption.
1412        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1413
1414        let mut cursor = self.tx.cursor_write::<T>()?;
1415
1416        for (partial_key, indices) in index_updates {
1417            let last_key = sharded_key_factory(partial_key, u64::MAX);
1418            let mut last_shard = cursor
1419                .seek_exact(last_key.clone())?
1420                .map(|(_, list)| list)
1421                .unwrap_or_else(BlockNumberList::empty);
1422
1423            last_shard.append(indices).map_err(ProviderError::other)?;
1424
1425            // fast path: all indices fit in one shard
1426            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1427                cursor.upsert(last_key, &last_shard)?;
1428                continue;
1429            }
1430
1431            // slow path: rechunk into multiple shards
1432            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1433            let mut chunks_peekable = chunks.into_iter().peekable();
1434
1435            while let Some(chunk) = chunks_peekable.next() {
1436                let shard = BlockNumberList::new_pre_sorted(chunk);
1437                let highest_block_number = if chunks_peekable.peek().is_some() {
1438                    shard.iter().next_back().expect("`chunks` does not return empty list")
1439                } else {
1440                    // Insert last list with `u64::MAX`.
1441                    u64::MAX
1442                };
1443
1444                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1445            }
1446        }
1447
1448        Ok(())
1449    }
1450}
1451
1452impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1453    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1454        if self.cached_storage_settings().use_hashed_state() {
1455            let hashed_address = keccak256(address);
1456            Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1457        } else {
1458            Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1459        }
1460    }
1461}
1462
1463impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1464    fn changed_accounts_with_range(
1465        &self,
1466        range: RangeInclusive<BlockNumber>,
1467    ) -> ProviderResult<BTreeSet<Address>> {
1468        let mut reader = EitherReader::new_account_changesets(self)?;
1469
1470        reader.changed_accounts_with_range(range)
1471    }
1472
1473    fn basic_accounts(
1474        &self,
1475        iter: impl IntoIterator<Item = Address>,
1476    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1477        if self.cached_storage_settings().use_hashed_state() {
1478            let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1479            Ok(iter
1480                .into_iter()
1481                .map(|address| {
1482                    let hashed_address = keccak256(address);
1483                    hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1484                })
1485                .collect::<Result<Vec<_>, _>>()?)
1486        } else {
1487            let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1488            Ok(iter
1489                .into_iter()
1490                .map(|address| {
1491                    plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1492                })
1493                .collect::<Result<Vec<_>, _>>()?)
1494        }
1495    }
1496
1497    fn changed_accounts_and_blocks_with_range(
1498        &self,
1499        range: RangeInclusive<BlockNumber>,
1500    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1501        let highest_static_block = self
1502            .static_file_provider
1503            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1504
1505        if let Some(highest) = highest_static_block &&
1506            self.cached_storage_settings().storage_v2
1507        {
1508            let start = *range.start();
1509            let static_end = (*range.end()).min(highest);
1510
1511            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1512            if start <= static_end {
1513                for block in start..=static_end {
1514                    let block_changesets = self.account_block_changeset(block)?;
1515                    for changeset in block_changesets {
1516                        changed_accounts_and_blocks
1517                            .entry(changeset.address)
1518                            .or_default()
1519                            .push(block);
1520                    }
1521                }
1522            }
1523
1524            Ok(changed_accounts_and_blocks)
1525        } else {
1526            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1527
1528            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1529                BTreeMap::new(),
1530                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1531                    let (index, account) = entry?;
1532                    accounts.entry(account.address).or_default().push(index);
1533                    Ok(accounts)
1534                },
1535            )?;
1536
1537            Ok(account_transitions)
1538        }
1539    }
1540}
1541
1542impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1543    fn storage_changeset(
1544        &self,
1545        block_number: BlockNumber,
1546    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1547        if self.cached_storage_settings().storage_v2 {
1548            self.static_file_provider.storage_changeset(block_number)
1549        } else {
1550            let range = block_number..=block_number;
1551            let storage_range = BlockNumberAddress::range(range);
1552            self.tx
1553                .cursor_dup_read::<tables::StorageChangeSets>()?
1554                .walk_range(storage_range)?
1555                .map(|r| {
1556                    let (bna, entry) = r?;
1557                    Ok((bna, entry))
1558                })
1559                .collect()
1560        }
1561    }
1562
1563    fn get_storage_before_block(
1564        &self,
1565        block_number: BlockNumber,
1566        address: Address,
1567        storage_key: B256,
1568    ) -> ProviderResult<Option<StorageEntry>> {
1569        if self.cached_storage_settings().storage_v2 {
1570            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1571        } else {
1572            Ok(self
1573                .tx
1574                .cursor_dup_read::<tables::StorageChangeSets>()?
1575                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1576                .filter(|entry| entry.key == storage_key))
1577        }
1578    }
1579
1580    fn storage_changesets_range(
1581        &self,
1582        range: impl RangeBounds<BlockNumber>,
1583    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1584        if self.cached_storage_settings().storage_v2 {
1585            self.static_file_provider.storage_changesets_range(range)
1586        } else {
1587            self.tx
1588                .cursor_dup_read::<tables::StorageChangeSets>()?
1589                .walk_range(BlockNumberAddressRange::from(range))?
1590                .map(|r| {
1591                    let (bna, entry) = r?;
1592                    Ok((bna, entry))
1593                })
1594                .collect()
1595        }
1596    }
1597}
1598
1599impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1600    fn account_block_changeset(
1601        &self,
1602        block_number: BlockNumber,
1603    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1604        if self.cached_storage_settings().storage_v2 {
1605            let static_changesets =
1606                self.static_file_provider.account_block_changeset(block_number)?;
1607            Ok(static_changesets)
1608        } else {
1609            let range = block_number..=block_number;
1610            self.tx
1611                .cursor_read::<tables::AccountChangeSets>()?
1612                .walk_range(range)?
1613                .map(|result| -> ProviderResult<_> {
1614                    let (_, account_before) = result?;
1615                    Ok(account_before)
1616                })
1617                .collect()
1618        }
1619    }
1620
1621    fn get_account_before_block(
1622        &self,
1623        block_number: BlockNumber,
1624        address: Address,
1625    ) -> ProviderResult<Option<AccountBeforeTx>> {
1626        if self.cached_storage_settings().storage_v2 {
1627            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1628        } else {
1629            self.tx
1630                .cursor_dup_read::<tables::AccountChangeSets>()?
1631                .seek_by_key_subkey(block_number, address)?
1632                .filter(|acc| acc.address == address)
1633                .map(Ok)
1634                .transpose()
1635        }
1636    }
1637
1638    fn account_changesets_range(
1639        &self,
1640        range: impl core::ops::RangeBounds<BlockNumber>,
1641    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1642        if self.cached_storage_settings().storage_v2 {
1643            self.static_file_provider.account_changesets_range(range)
1644        } else {
1645            self.tx
1646                .cursor_read::<tables::AccountChangeSets>()?
1647                .walk_range(to_range(range))?
1648                .map(|r| r.map_err(Into::into))
1649                .collect()
1650        }
1651    }
1652}
1653
1654impl<Tx: DbTx + 'static, N: NodeTypesForProvider> StateReader for DatabaseProvider<Tx, N> {
1655    type Receipt = ReceiptTy<N>;
1656
1657    fn get_state(
1658        &self,
1659        block: BlockNumber,
1660    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1661        let Some(block_body) = self.block_body_indices(block)? else { return Ok(None) };
1662
1663        let from_transaction_num = block_body.first_tx_num();
1664        let to_transaction_num = block_body.last_tx_num();
1665
1666        let account_changeset = self.account_changesets_range(block..=block)?;
1667        let storage_changeset = self.storage_changeset(block)?;
1668
1669        let Some(block_hash) = self.block_hash(block)? else { return Ok(None) };
1670        let state_provider = self.history_by_block_hash(block_hash)?;
1671        let (state, reverts) = self.populate_bundle_state_with_provider(
1672            account_changeset,
1673            storage_changeset,
1674            state_provider,
1675        )?;
1676
1677        let receipts = self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?;
1678
1679        Ok(Some(ExecutionOutcome::new_init(
1680            state,
1681            reverts,
1682            // We skip new contracts since we never delete them from the database
1683            Vec::new(),
1684            vec![receipts],
1685            block,
1686            Vec::new(),
1687        )))
1688    }
1689}
1690
1691impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1692    for DatabaseProvider<TX, N>
1693{
1694    type Header = HeaderTy<N>;
1695
1696    fn local_tip_header(
1697        &self,
1698        highest_uninterrupted_block: BlockNumber,
1699    ) -> ProviderResult<SealedHeader<Self::Header>> {
1700        let static_file_provider = self.static_file_provider();
1701
1702        // Make sure Headers static file is at the same height. If it's further, this
1703        // input execution was interrupted previously and we need to unwind the static file.
1704        let next_static_file_block_num = static_file_provider
1705            .get_highest_static_file_block(StaticFileSegment::Headers)
1706            .map(|id| id + 1)
1707            .unwrap_or_default();
1708        let next_block = highest_uninterrupted_block + 1;
1709
1710        match next_static_file_block_num.cmp(&next_block) {
1711            // The node shutdown between an executed static file commit and before the database
1712            // commit, so we need to unwind the static files.
1713            Ordering::Greater => {
1714                let mut static_file_producer =
1715                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1716                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1717                // Since this is a database <-> static file inconsistency, we commit the change
1718                // straight away.
1719                static_file_producer.commit()?
1720            }
1721            Ordering::Less => {
1722                // There's either missing or corrupted files.
1723                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1724            }
1725            Ordering::Equal => {}
1726        }
1727
1728        let local_head = static_file_provider
1729            .sealed_header(highest_uninterrupted_block)?
1730            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1731
1732        Ok(local_head)
1733    }
1734}
1735
1736impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1737    type Header = HeaderTy<N>;
1738
1739    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1740        if let Some(num) = self.block_number(block_hash)? {
1741            Ok(self.header_by_number(num)?)
1742        } else {
1743            Ok(None)
1744        }
1745    }
1746
1747    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1748        self.static_file_provider.header_by_number(num)
1749    }
1750
1751    fn headers_range(
1752        &self,
1753        range: impl RangeBounds<BlockNumber>,
1754    ) -> ProviderResult<Vec<Self::Header>> {
1755        self.static_file_provider.headers_range(range)
1756    }
1757
1758    fn sealed_header(
1759        &self,
1760        number: BlockNumber,
1761    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1762        self.static_file_provider.sealed_header(number)
1763    }
1764
1765    fn sealed_headers_while(
1766        &self,
1767        range: impl RangeBounds<BlockNumber>,
1768        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1769    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1770        self.static_file_provider.sealed_headers_while(range, predicate)
1771    }
1772}
1773
1774impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1775    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1776        self.static_file_provider.block_hash(number)
1777    }
1778
1779    fn canonical_hashes_range(
1780        &self,
1781        start: BlockNumber,
1782        end: BlockNumber,
1783    ) -> ProviderResult<Vec<B256>> {
1784        self.static_file_provider.canonical_hashes_range(start, end)
1785    }
1786}
1787
1788impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1789    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1790        let best_number = self.best_block_number()?;
1791        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1792        Ok(ChainInfo { best_hash, best_number })
1793    }
1794
1795    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1796        // The best block number is tracked via the finished stage which gets updated in the same tx
1797        // when new blocks committed
1798        Ok(self
1799            .get_stage_checkpoint(StageId::Finish)?
1800            .map(|checkpoint| checkpoint.block_number)
1801            .unwrap_or_default())
1802    }
1803
1804    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1805        self.static_file_provider.last_block_number()
1806    }
1807
1808    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1809        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1810    }
1811}
1812
1813impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1814    type Block = BlockTy<N>;
1815
1816    fn find_block_by_hash(
1817        &self,
1818        hash: B256,
1819        source: BlockSource,
1820    ) -> ProviderResult<Option<Self::Block>> {
1821        if source.is_canonical() {
1822            self.block(hash.into())
1823        } else {
1824            Ok(None)
1825        }
1826    }
1827
1828    /// Returns the block with matching number from database.
1829    ///
1830    /// If the header for this block is not found, this returns `None`.
1831    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1832    /// will return None.
1833    ///
1834    /// Returns an error if the requested block is below the earliest available history.
1835    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1836        if let Some(number) = self.convert_hash_or_number(id)? {
1837            let earliest_available = self.static_file_provider.earliest_history_height();
1838            if number < earliest_available {
1839                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1840            }
1841
1842            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1843
1844            // If the body indices are not found, this means that the transactions either do not
1845            // exist in the database yet, or they do exit but are not indexed.
1846            // If they exist but are not indexed, we don't have enough
1847            // information to return the block anyways, so we return `None`.
1848            let Some(transactions) = self.transactions_by_block(number.into())? else {
1849                return Ok(None)
1850            };
1851
1852            let body = self
1853                .storage
1854                .reader()
1855                .read_block_bodies(self, vec![(&header, transactions)])?
1856                .pop()
1857                .ok_or(ProviderError::InvalidStorageOutput)?;
1858
1859            return Ok(Some(Self::Block::new(header, body)))
1860        }
1861
1862        Ok(None)
1863    }
1864
1865    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1866        Ok(None)
1867    }
1868
1869    fn pending_block_and_receipts(
1870        &self,
1871    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1872        Ok(None)
1873    }
1874
1875    /// Returns the block with senders with matching number or hash from database.
1876    ///
1877    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1878    /// spot, and we want fast querying.**
1879    ///
1880    /// If the header for this block is not found, this returns `None`.
1881    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1882    /// will return None.
1883    fn recovered_block(
1884        &self,
1885        id: BlockHashOrNumber,
1886        transaction_kind: TransactionVariant,
1887    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1888        self.recovered_block(
1889            id,
1890            transaction_kind,
1891            |block_number| self.header_by_number(block_number),
1892            |header, body, senders| {
1893                Self::Block::new(header, body)
1894                    // Note: we're using unchecked here because we know the block contains valid txs
1895                    // wrt to its height and can ignore the s value check so pre
1896                    // EIP-2 txs are allowed
1897                    .try_into_recovered_unchecked(senders)
1898                    .map(Some)
1899                    .map_err(|_| ProviderError::SenderRecoveryError)
1900            },
1901        )
1902    }
1903
1904    fn sealed_block_with_senders(
1905        &self,
1906        id: BlockHashOrNumber,
1907        transaction_kind: TransactionVariant,
1908    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1909        self.recovered_block(
1910            id,
1911            transaction_kind,
1912            |block_number| self.sealed_header(block_number),
1913            |header, body, senders| {
1914                Self::Block::new_sealed(header, body)
1915                    // Note: we're using unchecked here because we know the block contains valid txs
1916                    // wrt to its height and can ignore the s value check so pre
1917                    // EIP-2 txs are allowed
1918                    .try_with_senders_unchecked(senders)
1919                    .map(Some)
1920                    .map_err(|_| ProviderError::SenderRecoveryError)
1921            },
1922        )
1923    }
1924
1925    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1926        self.block_range(
1927            range,
1928            |range| self.headers_range(range),
1929            |header, body, _| Ok(Self::Block::new(header, body)),
1930        )
1931    }
1932
1933    fn block_with_senders_range(
1934        &self,
1935        range: RangeInclusive<BlockNumber>,
1936    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1937        self.block_with_senders_range(
1938            range,
1939            |range| self.headers_range(range),
1940            |header, body, senders| {
1941                Self::Block::new(header, body)
1942                    .try_into_recovered_unchecked(senders)
1943                    .map_err(|_| ProviderError::SenderRecoveryError)
1944            },
1945        )
1946    }
1947
1948    fn recovered_block_range(
1949        &self,
1950        range: RangeInclusive<BlockNumber>,
1951    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1952        self.block_with_senders_range(
1953            range,
1954            |range| self.sealed_headers_range(range),
1955            |header, body, senders| {
1956                Self::Block::new_sealed(header, body)
1957                    .try_with_senders(senders)
1958                    .map_err(|_| ProviderError::SenderRecoveryError)
1959            },
1960        )
1961    }
1962
1963    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1964        Ok(self
1965            .tx
1966            .cursor_read::<tables::TransactionBlocks>()?
1967            .seek(id)
1968            .map(|b| b.map(|(_, bn)| bn))?)
1969    }
1970}
1971
1972impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1973    for DatabaseProvider<TX, N>
1974{
1975    /// Recovers transaction hashes by walking through `Transactions` table and
1976    /// calculating them in a parallel manner. Returned unsorted.
1977    fn transaction_hashes_by_range(
1978        &self,
1979        tx_range: Range<TxNumber>,
1980    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1981        self.static_file_provider.transaction_hashes_by_range(tx_range)
1982    }
1983}
1984
1985// Calculates the hash of the given transaction
1986impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1987    type Transaction = TxTy<N>;
1988
1989    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1990        self.with_rocksdb_snapshot(|rocksdb_ref| {
1991            let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1992            reader.get_transaction_hash_number(tx_hash)
1993        })
1994    }
1995
1996    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1997        self.static_file_provider.transaction_by_id(id)
1998    }
1999
2000    fn transaction_by_id_unhashed(
2001        &self,
2002        id: TxNumber,
2003    ) -> ProviderResult<Option<Self::Transaction>> {
2004        self.static_file_provider.transaction_by_id_unhashed(id)
2005    }
2006
2007    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2008        if let Some(id) = self.transaction_id(hash)? {
2009            Ok(self.transaction_by_id_unhashed(id)?)
2010        } else {
2011            Ok(None)
2012        }
2013    }
2014
2015    fn transaction_by_hash_with_meta(
2016        &self,
2017        tx_hash: TxHash,
2018    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2019        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
2020            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
2021            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
2022            let Some(sealed_header) = self.sealed_header(block_number)?
2023        {
2024            let (header, block_hash) = sealed_header.split();
2025            if let Some(block_body) = self.block_body_indices(block_number)? {
2026                // the index of the tx in the block is the offset:
2027                // len([start..tx_id])
2028                // NOTE: `transaction_id` is always `>=` the block's first
2029                // index
2030                let index = transaction_id - block_body.first_tx_num();
2031
2032                let meta = TransactionMeta {
2033                    tx_hash,
2034                    index,
2035                    block_hash,
2036                    block_number,
2037                    base_fee: header.base_fee_per_gas(),
2038                    excess_blob_gas: header.excess_blob_gas(),
2039                    timestamp: header.timestamp(),
2040                };
2041
2042                return Ok(Some((transaction, meta)))
2043            }
2044        }
2045
2046        Ok(None)
2047    }
2048
2049    fn transactions_by_block(
2050        &self,
2051        id: BlockHashOrNumber,
2052    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2053        if let Some(block_number) = self.convert_hash_or_number(id)? &&
2054            let Some(body) = self.block_body_indices(block_number)?
2055        {
2056            let tx_range = body.tx_num_range();
2057            return if tx_range.is_empty() {
2058                Ok(Some(Vec::new()))
2059            } else {
2060                self.transactions_by_tx_range(tx_range).map(Some)
2061            }
2062        }
2063        Ok(None)
2064    }
2065
2066    fn transactions_by_block_range(
2067        &self,
2068        range: impl RangeBounds<BlockNumber>,
2069    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2070        let range = to_range(range);
2071
2072        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2073            .into_iter()
2074            .map(|body| {
2075                let tx_num_range = body.tx_num_range();
2076                if tx_num_range.is_empty() {
2077                    Ok(Vec::new())
2078                } else {
2079                    self.transactions_by_tx_range(tx_num_range)
2080                }
2081            })
2082            .collect()
2083    }
2084
2085    fn transactions_by_tx_range(
2086        &self,
2087        range: impl RangeBounds<TxNumber>,
2088    ) -> ProviderResult<Vec<Self::Transaction>> {
2089        self.static_file_provider.transactions_by_tx_range(range)
2090    }
2091
2092    fn senders_by_tx_range(
2093        &self,
2094        range: impl RangeBounds<TxNumber>,
2095    ) -> ProviderResult<Vec<Address>> {
2096        if EitherWriterDestination::senders(self).is_static_file() {
2097            self.static_file_provider.senders_by_tx_range(range)
2098        } else {
2099            self.cursor_read_collect::<tables::TransactionSenders>(range)
2100        }
2101    }
2102
2103    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2104        if EitherWriterDestination::senders(self).is_static_file() {
2105            self.static_file_provider.transaction_sender(id)
2106        } else {
2107            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2108        }
2109    }
2110}
2111
2112impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2113    type Receipt = ReceiptTy<N>;
2114
2115    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2116        self.static_file_provider.get_with_static_file_or_database(
2117            StaticFileSegment::Receipts,
2118            id,
2119            |static_file| static_file.receipt(id),
2120            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2121        )
2122    }
2123
2124    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2125        if let Some(id) = self.transaction_id(hash)? {
2126            self.receipt(id)
2127        } else {
2128            Ok(None)
2129        }
2130    }
2131
2132    fn receipts_by_block(
2133        &self,
2134        block: BlockHashOrNumber,
2135    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2136        if let Some(number) = self.convert_hash_or_number(block)? &&
2137            let Some(body) = self.block_body_indices(number)?
2138        {
2139            let tx_range = body.tx_num_range();
2140            return if tx_range.is_empty() {
2141                Ok(Some(Vec::new()))
2142            } else {
2143                self.receipts_by_tx_range(tx_range).map(Some)
2144            }
2145        }
2146        Ok(None)
2147    }
2148
2149    fn receipts_by_tx_range(
2150        &self,
2151        range: impl RangeBounds<TxNumber>,
2152    ) -> ProviderResult<Vec<Self::Receipt>> {
2153        self.static_file_provider.get_range_with_static_file_or_database(
2154            StaticFileSegment::Receipts,
2155            to_range(range),
2156            |static_file, range, _| static_file.receipts_by_tx_range(range),
2157            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2158            |_| true,
2159        )
2160    }
2161
2162    fn receipts_by_block_range(
2163        &self,
2164        block_range: RangeInclusive<BlockNumber>,
2165    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2166        if block_range.is_empty() {
2167            return Ok(Vec::new());
2168        }
2169
2170        // collect block body indices for each block in the range
2171        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2172        let mut block_body_indices = Vec::with_capacity(range_len);
2173        for block_num in block_range {
2174            if let Some(indices) = self.block_body_indices(block_num)? {
2175                block_body_indices.push(indices);
2176            } else {
2177                // use default indices for missing blocks (empty block)
2178                block_body_indices.push(StoredBlockBodyIndices::default());
2179            }
2180        }
2181
2182        if block_body_indices.is_empty() {
2183            return Ok(Vec::new());
2184        }
2185
2186        // find blocks with transactions to determine transaction range
2187        let non_empty_blocks: Vec<_> =
2188            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2189
2190        if non_empty_blocks.is_empty() {
2191            // all blocks are empty
2192            return Ok(vec![Vec::new(); block_body_indices.len()]);
2193        }
2194
2195        // calculate the overall transaction range
2196        let first_tx = non_empty_blocks[0].first_tx_num();
2197        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2198
2199        // fetch all receipts in the transaction range
2200        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2201        let mut receipts_iter = all_receipts.into_iter();
2202
2203        // distribute receipts to their respective blocks
2204        let mut result = Vec::with_capacity(block_body_indices.len());
2205        for indices in &block_body_indices {
2206            if indices.tx_count == 0 {
2207                result.push(Vec::new());
2208            } else {
2209                let block_receipts =
2210                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2211                result.push(block_receipts);
2212            }
2213        }
2214
2215        Ok(result)
2216    }
2217}
2218
2219impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2220    for DatabaseProvider<TX, N>
2221{
2222    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2223        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2224    }
2225
2226    fn block_body_indices_range(
2227        &self,
2228        range: RangeInclusive<BlockNumber>,
2229    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2230        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2231    }
2232}
2233
2234impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2235    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2236        Ok(if let Some(encoded) = id.get_pre_encoded() {
2237            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2238        } else {
2239            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2240        })
2241    }
2242
2243    /// Get stage checkpoint progress.
2244    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2245        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2246    }
2247
2248    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2249        self.tx
2250            .cursor_read::<tables::StageCheckpoints>()?
2251            .walk(None)?
2252            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2253            .map_err(ProviderError::Database)
2254    }
2255}
2256
2257impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2258    /// Save stage checkpoint.
2259    fn save_stage_checkpoint(
2260        &self,
2261        id: StageId,
2262        checkpoint: StageCheckpoint,
2263    ) -> ProviderResult<()> {
2264        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2265    }
2266
2267    /// Save stage checkpoint progress.
2268    fn save_stage_checkpoint_progress(
2269        &self,
2270        id: StageId,
2271        checkpoint: Vec<u8>,
2272    ) -> ProviderResult<()> {
2273        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2274    }
2275
2276    #[instrument(level = "debug", target = "providers::db", skip_all)]
2277    fn update_pipeline_stages(
2278        &self,
2279        block_number: BlockNumber,
2280        drop_stage_checkpoint: bool,
2281    ) -> ProviderResult<()> {
2282        // iterate over all existing stages in the table and update its progress.
2283        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2284        for stage_id in StageId::ALL {
2285            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2286            cursor.upsert(
2287                stage_id.to_string(),
2288                &StageCheckpoint {
2289                    block_number,
2290                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2291                },
2292            )?;
2293        }
2294
2295        Ok(())
2296    }
2297}
2298
2299impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2300    fn plain_state_storages(
2301        &self,
2302        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2303    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2304        if self.cached_storage_settings().use_hashed_state() {
2305            let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2306
2307            addresses_with_keys
2308                .into_iter()
2309                .map(|(address, storage)| {
2310                    let hashed_address = keccak256(address);
2311                    storage
2312                        .into_iter()
2313                        .map(|key| -> ProviderResult<_> {
2314                            let hashed_key = keccak256(key);
2315                            let value = hashed_storage
2316                                .seek_by_key_subkey(hashed_address, hashed_key)?
2317                                .filter(|v| v.key == hashed_key)
2318                                .map(|v| v.value)
2319                                .unwrap_or_default();
2320                            Ok(StorageEntry { key, value })
2321                        })
2322                        .collect::<ProviderResult<Vec<_>>>()
2323                        .map(|storage| (address, storage))
2324                })
2325                .collect::<ProviderResult<Vec<(_, _)>>>()
2326        } else {
2327            let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2328
2329            addresses_with_keys
2330                .into_iter()
2331                .map(|(address, storage)| {
2332                    storage
2333                        .into_iter()
2334                        .map(|key| -> ProviderResult<_> {
2335                            Ok(plain_storage
2336                                .seek_by_key_subkey(address, key)?
2337                                .filter(|v| v.key == key)
2338                                .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2339                        })
2340                        .collect::<ProviderResult<Vec<_>>>()
2341                        .map(|storage| (address, storage))
2342                })
2343                .collect::<ProviderResult<Vec<(_, _)>>>()
2344        }
2345    }
2346
2347    fn changed_storages_with_range(
2348        &self,
2349        range: RangeInclusive<BlockNumber>,
2350    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2351        if self.cached_storage_settings().storage_v2 {
2352            self.storage_changesets_range(range)?.into_iter().try_fold(
2353                BTreeMap::new(),
2354                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2355                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2356                    accounts.entry(address).or_default().insert(storage_entry.key);
2357                    Ok(accounts)
2358                },
2359            )
2360        } else {
2361            self.tx
2362                .cursor_read::<tables::StorageChangeSets>()?
2363                .walk_range(BlockNumberAddress::range(range))?
2364                // fold all storages and save its old state so we can remove it from HashedStorage
2365                // it is needed as it is dup table.
2366                .try_fold(
2367                    BTreeMap::new(),
2368                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2369                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2370                        accounts.entry(address).or_default().insert(storage_entry.key);
2371                        Ok(accounts)
2372                    },
2373                )
2374        }
2375    }
2376
2377    fn changed_storages_and_blocks_with_range(
2378        &self,
2379        range: RangeInclusive<BlockNumber>,
2380    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2381        if self.cached_storage_settings().storage_v2 {
2382            self.storage_changesets_range(range)?.into_iter().try_fold(
2383                BTreeMap::new(),
2384                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2385                    storages
2386                        .entry((index.address(), storage.key))
2387                        .or_default()
2388                        .push(index.block_number());
2389                    Ok(storages)
2390                },
2391            )
2392        } else {
2393            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2394
2395            let storage_changeset_lists =
2396                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2397                    BTreeMap::new(),
2398                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2399                     entry|
2400                     -> ProviderResult<_> {
2401                        let (index, storage) = entry?;
2402                        storages
2403                            .entry((index.address(), storage.key))
2404                            .or_default()
2405                            .push(index.block_number());
2406                        Ok(storages)
2407                    },
2408                )?;
2409
2410            Ok(storage_changeset_lists)
2411        }
2412    }
2413}
2414
2415impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2416    for DatabaseProvider<TX, N>
2417{
2418    type Receipt = ReceiptTy<N>;
2419
2420    #[instrument(level = "debug", target = "providers::db", skip_all)]
2421    fn write_state<'a>(
2422        &self,
2423        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2424        is_value_known: OriginalValuesKnown,
2425        config: StateWriteConfig,
2426    ) -> ProviderResult<()> {
2427        let execution_outcome = execution_outcome.into();
2428
2429        if self.cached_storage_settings().use_hashed_state() &&
2430            !config.write_receipts &&
2431            !config.write_account_changesets &&
2432            !config.write_storage_changesets
2433        {
2434            // In storage v2 with all outputs directed to static files, plain state and changesets
2435            // are written elsewhere. Only bytecodes need MDBX writes, so skip the expensive
2436            // to_plain_state_and_reverts conversion that iterates all accounts and storage.
2437            self.write_bytecodes(
2438                execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2439            )?;
2440            return Ok(());
2441        }
2442
2443        let first_block = execution_outcome.first_block();
2444        let (plain_state, reverts) =
2445            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2446
2447        self.write_state_reverts(reverts, first_block, config)?;
2448        self.write_state_changes(plain_state)?;
2449
2450        if !config.write_receipts {
2451            return Ok(());
2452        }
2453
2454        let block_count = execution_outcome.len() as u64;
2455        let last_block = execution_outcome.last_block();
2456        let block_range = first_block..=last_block;
2457
2458        let tip = self.last_block_number()?.max(last_block);
2459
2460        // Fetch the first transaction number for each block in the range
2461        let block_indices: Vec<_> = self
2462            .block_body_indices_range(block_range)?
2463            .into_iter()
2464            .map(|b| b.first_tx_num)
2465            .collect();
2466
2467        // Ensure all expected blocks are present.
2468        if block_indices.len() < block_count as usize {
2469            let missing_blocks = block_count - block_indices.len() as u64;
2470            return Err(ProviderError::BlockBodyIndicesNotFound(
2471                last_block.saturating_sub(missing_blocks - 1),
2472            ));
2473        }
2474
2475        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2476
2477        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2478        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2479
2480        // All receipts from the last 128 blocks are required for blockchain tree, even with
2481        // [`PruneSegment::ContractLogs`].
2482        //
2483        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2484        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2485        // files yet. Once receipts exist in static files, we must continue writing to maintain
2486        // continuity and have no gaps.
2487        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2488            self.static_file_provider()
2489                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2490                .is_none()) &&
2491            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2492
2493        // Prepare set of addresses which logs should not be pruned.
2494        let mut allowed_addresses: AddressSet = AddressSet::default();
2495        for (_, addresses) in contract_log_pruner.range(..first_block) {
2496            allowed_addresses.extend(addresses.iter().copied());
2497        }
2498
2499        for (idx, (receipts, first_tx_index)) in
2500            execution_outcome.receipts().zip(block_indices).enumerate()
2501        {
2502            let block_number = first_block + idx as u64;
2503
2504            // Increment block number for receipts static file writer
2505            receipts_writer.increment_block(block_number)?;
2506
2507            // Skip writing receipts if pruning configuration requires us to.
2508            if prunable_receipts &&
2509                self.prune_modes
2510                    .receipts
2511                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2512            {
2513                continue
2514            }
2515
2516            // If there are new addresses to retain after this block number, track them
2517            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2518                allowed_addresses.extend(new_addresses.iter().copied());
2519            }
2520
2521            for (idx, receipt) in receipts.iter().enumerate() {
2522                let receipt_idx = first_tx_index + idx as u64;
2523                // Skip writing receipt if log filter is active and it does not have any logs to
2524                // retain
2525                if prunable_receipts &&
2526                    has_contract_log_filter &&
2527                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2528                {
2529                    continue
2530                }
2531
2532                receipts_writer.append_receipt(receipt_idx, receipt)?;
2533            }
2534        }
2535
2536        Ok(())
2537    }
2538
2539    fn write_state_reverts(
2540        &self,
2541        reverts: PlainStateReverts,
2542        first_block: BlockNumber,
2543        config: StateWriteConfig,
2544    ) -> ProviderResult<()> {
2545        // Write storage changes
2546        if config.write_storage_changesets {
2547            tracing::trace!("Writing storage changes");
2548            let mut storages_cursor =
2549                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2550            for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2551                let block_number = first_block + block_index as BlockNumber;
2552
2553                tracing::trace!(block_number, "Writing block change");
2554                // sort changes by address.
2555                storage_changes.par_sort_unstable_by_key(|a| a.address);
2556                let total_changes =
2557                    storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2558                let mut changeset = Vec::with_capacity(total_changes);
2559                for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2560                    let mut storage = storage_revert
2561                        .into_iter()
2562                        .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2563                        .collect::<Vec<_>>();
2564                    // sort storage slots by key.
2565                    storage.par_sort_unstable_by_key(|a| a.0);
2566
2567                    // If we are writing the primary storage wipe transition, the pre-existing
2568                    // storage state has to be taken from the database and written to storage
2569                    // history. See [StorageWipe::Primary] for more details.
2570                    //
2571                    // TODO(mediocregopher): This could be rewritten in a way which doesn't
2572                    // require collecting wiped entries into a Vec like this, see
2573                    // `write_storage_trie_changesets`.
2574                    let mut wiped_storage = Vec::new();
2575                    if wiped {
2576                        tracing::trace!(?address, "Wiping storage");
2577                        if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2578                            wiped_storage.push((entry.key, entry.value));
2579                            while let Some(entry) = storages_cursor.next_dup_val()? {
2580                                wiped_storage.push((entry.key, entry.value))
2581                            }
2582                        }
2583                    }
2584
2585                    tracing::trace!(?address, ?storage, "Writing storage reverts");
2586                    for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2587                        changeset.push(StorageBeforeTx { address, key, value });
2588                    }
2589                }
2590
2591                let mut storage_changesets_writer =
2592                    EitherWriter::new_storage_changesets(self, block_number)?;
2593                storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2594            }
2595        }
2596
2597        if !config.write_account_changesets {
2598            return Ok(());
2599        }
2600
2601        // Write account changes
2602        tracing::trace!(?first_block, "Writing account changes");
2603        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2604            let block_number = first_block + block_index as BlockNumber;
2605            let changeset = account_block_reverts
2606                .into_iter()
2607                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2608                .collect::<Vec<_>>();
2609            let mut account_changesets_writer =
2610                EitherWriter::new_account_changesets(self, block_number)?;
2611
2612            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2613        }
2614
2615        Ok(())
2616    }
2617
2618    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2619        // sort all entries so they can be written to database in more performant way.
2620        // and take smaller memory footprint.
2621        changes.accounts.par_sort_by_key(|a| a.0);
2622        changes.storage.par_sort_by_key(|a| a.address);
2623        changes.contracts.par_sort_by_key(|a| a.0);
2624
2625        if !self.cached_storage_settings().use_hashed_state() {
2626            // Write new account state
2627            tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2628            let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2629            // write account to database.
2630            for (address, account) in changes.accounts {
2631                if let Some(account) = account {
2632                    tracing::trace!(?address, "Updating plain state account");
2633                    accounts_cursor.upsert(address, &account.into())?;
2634                } else if accounts_cursor.seek_exact(address)?.is_some() {
2635                    tracing::trace!(?address, "Deleting plain state account");
2636                    accounts_cursor.delete_current()?;
2637                }
2638            }
2639
2640            // Write new storage state and wipe storage if needed.
2641            tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2642            let mut storages_cursor =
2643                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2644            for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2645                // Wiping of storage.
2646                if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2647                    storages_cursor.delete_current_duplicates()?;
2648                }
2649                // cast storages to B256.
2650                let mut storage = storage
2651                    .into_iter()
2652                    .map(|(k, value)| StorageEntry { key: k.into(), value })
2653                    .collect::<Vec<_>>();
2654                // sort storage slots by key.
2655                storage.par_sort_unstable_by_key(|a| a.key);
2656
2657                for entry in storage {
2658                    tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2659                    if let Some(db_entry) =
2660                        storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2661                        db_entry.key == entry.key
2662                    {
2663                        storages_cursor.delete_current()?;
2664                    }
2665
2666                    if !entry.value.is_zero() {
2667                        storages_cursor.upsert(address, &entry)?;
2668                    }
2669                }
2670            }
2671        }
2672
2673        // Write bytecode
2674        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2675        self.write_bytecodes(
2676            changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2677        )?;
2678
2679        Ok(())
2680    }
2681
2682    #[instrument(level = "debug", target = "providers::db", skip_all)]
2683    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2684        // Write hashed account updates.
2685        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2686        for (hashed_address, account) in hashed_state.accounts() {
2687            if let Some(account) = account {
2688                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2689            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2690                hashed_accounts_cursor.delete_current()?;
2691            }
2692        }
2693
2694        // Write hashed storage changes.
2695        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2696        let mut hashed_storage_cursor =
2697            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2698        for (hashed_address, storage) in sorted_storages {
2699            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2700                hashed_storage_cursor.delete_current_duplicates()?;
2701            }
2702
2703            for (hashed_slot, value) in storage.storage_slots_ref() {
2704                let entry = StorageEntry { key: *hashed_slot, value: *value };
2705
2706                if let Some(db_entry) =
2707                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2708                    db_entry.key == entry.key
2709                {
2710                    hashed_storage_cursor.delete_current()?;
2711                }
2712
2713                if !entry.value.is_zero() {
2714                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2715                }
2716            }
2717        }
2718
2719        Ok(())
2720    }
2721
2722    /// Remove the last N blocks of state.
2723    ///
2724    /// The latest state will be unwound
2725    ///
2726    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2727    ///    transaction ids.
2728    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2729    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2730    ///    the changesets.
2731    ///    - In order to have both the old and new values in the changesets, we also access the
2732    ///      plain state tables.
2733    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2734    ///    we:
2735    ///     1. Take the old value from the changeset
2736    ///     2. Take the new value from the plain state
2737    ///     3. Save the old value to the local state
2738    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2739    ///    have seen before we:
2740    ///     1. Take the old value from the changeset
2741    ///     2. Take the new value from the local state
2742    ///     3. Set the local state to the value in the changeset
2743    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2744        let range = block + 1..=self.last_block_number()?;
2745
2746        if range.is_empty() {
2747            return Ok(());
2748        }
2749
2750        // We are not removing block meta as it is used to get block changesets.
2751        let block_bodies = self.block_body_indices_range(range.clone())?;
2752
2753        // get transaction receipts
2754        let from_transaction_num =
2755            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2756
2757        let storage_range = BlockNumberAddress::range(range.clone());
2758        let storage_changeset = if self.cached_storage_settings().storage_v2 {
2759            let changesets = self.storage_changesets_range(range.clone())?;
2760            let mut changeset_writer =
2761                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2762            changeset_writer.prune_storage_changesets(block)?;
2763            changesets
2764        } else {
2765            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2766        };
2767        let account_changeset = if self.cached_storage_settings().storage_v2 {
2768            let changesets = self.account_changesets_range(range)?;
2769            let mut changeset_writer =
2770                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2771            changeset_writer.prune_account_changesets(block)?;
2772            changesets
2773        } else {
2774            self.take::<tables::AccountChangeSets>(range)?
2775        };
2776
2777        if self.cached_storage_settings().use_hashed_state() {
2778            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2779            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2780
2781            let (state, _) = self.populate_bundle_state_hashed(
2782                account_changeset,
2783                storage_changeset,
2784                &mut hashed_accounts_cursor,
2785                &mut hashed_storage_cursor,
2786            )?;
2787
2788            for (address, (old_account, new_account, storage)) in &state {
2789                if old_account != new_account {
2790                    let hashed_address = keccak256(address);
2791                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2792                    if let Some(account) = old_account {
2793                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2794                    } else if existing_entry.is_some() {
2795                        hashed_accounts_cursor.delete_current()?;
2796                    }
2797                }
2798
2799                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2800                    let hashed_address = keccak256(address);
2801                    let hashed_storage_key = keccak256(storage_key);
2802                    let storage_entry =
2803                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2804                    if hashed_storage_cursor
2805                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2806                        .is_some_and(|s| s.key == hashed_storage_key)
2807                    {
2808                        hashed_storage_cursor.delete_current()?
2809                    }
2810
2811                    if !old_storage_value.is_zero() {
2812                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2813                    }
2814                }
2815            }
2816        } else {
2817            // This is not working for blocks that are not at tip. as plain state is not the last
2818            // state of end range. We should rename the functions or add support to access
2819            // History state. Accessing history state can be tricky but we are not gaining
2820            // anything.
2821            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2822            let mut plain_storage_cursor =
2823                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2824
2825            let (state, _) = self.populate_bundle_state_plain(
2826                account_changeset,
2827                storage_changeset,
2828                &mut plain_accounts_cursor,
2829                &mut plain_storage_cursor,
2830            )?;
2831
2832            for (address, (old_account, new_account, storage)) in &state {
2833                if old_account != new_account {
2834                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2835                    if let Some(account) = old_account {
2836                        plain_accounts_cursor.upsert(*address, account)?;
2837                    } else if existing_entry.is_some() {
2838                        plain_accounts_cursor.delete_current()?;
2839                    }
2840                }
2841
2842                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2843                    let storage_entry =
2844                        StorageEntry { key: *storage_key, value: *old_storage_value };
2845                    if plain_storage_cursor
2846                        .seek_by_key_subkey(*address, *storage_key)?
2847                        .is_some_and(|s| s.key == *storage_key)
2848                    {
2849                        plain_storage_cursor.delete_current()?
2850                    }
2851
2852                    if !old_storage_value.is_zero() {
2853                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2854                    }
2855                }
2856            }
2857        }
2858
2859        self.remove_receipts_from(from_transaction_num, block)?;
2860
2861        Ok(())
2862    }
2863
2864    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2865    ///
2866    /// The latest state will be unwound and returned back with all the blocks
2867    ///
2868    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2869    ///    transaction ids.
2870    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2871    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2872    ///    the changesets.
2873    ///    - In order to have both the old and new values in the changesets, we also access the
2874    ///      plain state tables.
2875    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2876    ///    we:
2877    ///     1. Take the old value from the changeset
2878    ///     2. Take the new value from the plain state
2879    ///     3. Save the old value to the local state
2880    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2881    ///    have seen before we:
2882    ///     1. Take the old value from the changeset
2883    ///     2. Take the new value from the local state
2884    ///     3. Set the local state to the value in the changeset
2885    fn take_state_above(
2886        &self,
2887        block: BlockNumber,
2888    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2889        let range = block + 1..=self.last_block_number()?;
2890
2891        if range.is_empty() {
2892            return Ok(ExecutionOutcome::default())
2893        }
2894        let start_block_number = *range.start();
2895
2896        // We are not removing block meta as it is used to get block changesets.
2897        let block_bodies = self.block_body_indices_range(range.clone())?;
2898
2899        // get transaction receipts
2900        let from_transaction_num =
2901            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2902        let to_transaction_num =
2903            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2904
2905        let storage_range = BlockNumberAddress::range(range.clone());
2906        let storage_changeset = if let Some(highest_block) = self
2907            .static_file_provider
2908            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2909            self.cached_storage_settings().storage_v2
2910        {
2911            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2912            let mut changeset_writer =
2913                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2914            changeset_writer.prune_storage_changesets(block)?;
2915            changesets
2916        } else {
2917            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2918        };
2919
2920        // if there are static files for this segment, prune them.
2921        let highest_changeset_block = self
2922            .static_file_provider
2923            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2924        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2925            self.cached_storage_settings().storage_v2
2926        {
2927            // TODO: add a `take` method that removes and returns the items instead of doing this
2928            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2929            let mut changeset_writer =
2930                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2931            changeset_writer.prune_account_changesets(block)?;
2932
2933            changesets
2934        } else {
2935            // Have to remove from static files if they exist, otherwise remove using `take` for the
2936            // changeset tables
2937            self.take::<tables::AccountChangeSets>(range)?
2938        };
2939
2940        let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2941            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2942            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2943
2944            let (state, reverts) = self.populate_bundle_state_hashed(
2945                account_changeset,
2946                storage_changeset,
2947                &mut hashed_accounts_cursor,
2948                &mut hashed_storage_cursor,
2949            )?;
2950
2951            for (address, (old_account, new_account, storage)) in &state {
2952                if old_account != new_account {
2953                    let hashed_address = keccak256(address);
2954                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2955                    if let Some(account) = old_account {
2956                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2957                    } else if existing_entry.is_some() {
2958                        hashed_accounts_cursor.delete_current()?;
2959                    }
2960                }
2961
2962                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2963                    let hashed_address = keccak256(address);
2964                    let hashed_storage_key = keccak256(storage_key);
2965                    let storage_entry =
2966                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2967                    if hashed_storage_cursor
2968                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2969                        .is_some_and(|s| s.key == hashed_storage_key)
2970                    {
2971                        hashed_storage_cursor.delete_current()?
2972                    }
2973
2974                    if !old_storage_value.is_zero() {
2975                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2976                    }
2977                }
2978            }
2979
2980            (state, reverts)
2981        } else {
2982            // This is not working for blocks that are not at tip. as plain state is not the last
2983            // state of end range. We should rename the functions or add support to access
2984            // History state. Accessing history state can be tricky but we are not gaining
2985            // anything.
2986            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2987            let mut plain_storage_cursor =
2988                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2989
2990            let (state, reverts) = self.populate_bundle_state_plain(
2991                account_changeset,
2992                storage_changeset,
2993                &mut plain_accounts_cursor,
2994                &mut plain_storage_cursor,
2995            )?;
2996
2997            for (address, (old_account, new_account, storage)) in &state {
2998                if old_account != new_account {
2999                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
3000                    if let Some(account) = old_account {
3001                        plain_accounts_cursor.upsert(*address, account)?;
3002                    } else if existing_entry.is_some() {
3003                        plain_accounts_cursor.delete_current()?;
3004                    }
3005                }
3006
3007                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
3008                    let storage_entry =
3009                        StorageEntry { key: *storage_key, value: *old_storage_value };
3010                    if plain_storage_cursor
3011                        .seek_by_key_subkey(*address, *storage_key)?
3012                        .is_some_and(|s| s.key == *storage_key)
3013                    {
3014                        plain_storage_cursor.delete_current()?
3015                    }
3016
3017                    if !old_storage_value.is_zero() {
3018                        plain_storage_cursor.upsert(*address, &storage_entry)?;
3019                    }
3020                }
3021            }
3022
3023            (state, reverts)
3024        };
3025
3026        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
3027        let mut receipts_iter = self
3028            .static_file_provider
3029            .get_range_with_static_file_or_database(
3030                StaticFileSegment::Receipts,
3031                from_transaction_num..to_transaction_num + 1,
3032                |static_file, range, _| {
3033                    static_file
3034                        .receipts_by_tx_range(range.clone())
3035                        .map(|r| range.into_iter().zip(r).collect())
3036                },
3037                |range, _| {
3038                    self.tx
3039                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
3040                        .walk_range(range)?
3041                        .map(|r| r.map_err(Into::into))
3042                        .collect()
3043                },
3044                |_| true,
3045            )?
3046            .into_iter()
3047            .peekable();
3048
3049        let mut receipts = Vec::with_capacity(block_bodies.len());
3050        // loop break if we are at the end of the blocks.
3051        for block_body in block_bodies {
3052            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3053            for num in block_body.tx_num_range() {
3054                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3055                    block_receipts.push(receipts_iter.next().unwrap().1);
3056                }
3057            }
3058            receipts.push(block_receipts);
3059        }
3060
3061        self.remove_receipts_from(from_transaction_num, block)?;
3062
3063        Ok(ExecutionOutcome::new_init(
3064            state,
3065            reverts,
3066            Vec::new(),
3067            receipts,
3068            start_block_number,
3069            Vec::new(),
3070        ))
3071    }
3072}
3073
3074impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3075    fn write_account_trie_updates<A: TrieTableAdapter>(
3076        tx: &TX,
3077        trie_updates: &TrieUpdatesSorted,
3078        num_entries: &mut usize,
3079    ) -> ProviderResult<()>
3080    where
3081        TX: DbTxMut,
3082    {
3083        let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3084        // Process sorted account nodes
3085        for (key, updated_node) in trie_updates.account_nodes_ref() {
3086            let nibbles = A::AccountKey::from(*key);
3087            match updated_node {
3088                Some(node) => {
3089                    if !key.is_empty() {
3090                        *num_entries += 1;
3091                        account_trie_cursor.upsert(nibbles, node)?;
3092                    }
3093                }
3094                None => {
3095                    *num_entries += 1;
3096                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3097                        account_trie_cursor.delete_current()?;
3098                    }
3099                }
3100            }
3101        }
3102        Ok(())
3103    }
3104
3105    fn write_storage_tries<A: TrieTableAdapter>(
3106        tx: &TX,
3107        storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3108        num_entries: &mut usize,
3109    ) -> ProviderResult<()>
3110    where
3111        TX: DbTxMut,
3112    {
3113        let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3114        for (hashed_address, storage_trie_updates) in storage_tries {
3115            let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3116                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3117            *num_entries +=
3118                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3119            cursor = db_storage_trie_cursor.cursor;
3120        }
3121        Ok(())
3122    }
3123}
3124
3125impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3126    /// Writes trie updates to the database with already sorted updates.
3127    ///
3128    /// Returns the number of entries modified.
3129    #[instrument(level = "debug", target = "providers::db", skip_all)]
3130    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3131        if trie_updates.is_empty() {
3132            return Ok(0)
3133        }
3134
3135        // Track the number of inserted entries.
3136        let mut num_entries = 0;
3137
3138        reth_trie_db::with_adapter!(self, |A| {
3139            Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3140        });
3141
3142        num_entries +=
3143            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3144
3145        Ok(num_entries)
3146    }
3147}
3148
3149impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3150    /// Writes storage trie updates from the given storage trie map with already sorted updates.
3151    ///
3152    /// Expects the storage trie updates to already be sorted by the hashed address key.
3153    ///
3154    /// Returns the number of entries modified.
3155    fn write_storage_trie_updates_sorted<'a>(
3156        &self,
3157        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3158    ) -> ProviderResult<usize> {
3159        let mut num_entries = 0;
3160        let mut storage_tries = storage_tries.collect::<Vec<_>>();
3161        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3162        reth_trie_db::with_adapter!(self, |A| {
3163            Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3164        });
3165        Ok(num_entries)
3166    }
3167}
3168
3169impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3170    fn unwind_account_hashing<'a>(
3171        &self,
3172        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3173    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3174        // Aggregate all block changesets and make a list of accounts that have been changed.
3175        // Note that collecting and then reversing the order is necessary to ensure that the
3176        // changes are applied in the correct order.
3177        let hashed_accounts = changesets
3178            .into_iter()
3179            .map(|(_, e)| (keccak256(e.address), e.info))
3180            .collect::<Vec<_>>()
3181            .into_iter()
3182            .rev()
3183            .collect::<BTreeMap<_, _>>();
3184
3185        // Apply values to HashedState, and remove the account if it's None.
3186        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3187        for (hashed_address, account) in &hashed_accounts {
3188            if let Some(account) = account {
3189                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3190            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3191                hashed_accounts_cursor.delete_current()?;
3192            }
3193        }
3194
3195        Ok(hashed_accounts)
3196    }
3197
3198    fn unwind_account_hashing_range(
3199        &self,
3200        range: impl RangeBounds<BlockNumber>,
3201    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3202        let changesets = self.account_changesets_range(range)?;
3203        self.unwind_account_hashing(changesets.iter())
3204    }
3205
3206    fn insert_account_for_hashing(
3207        &self,
3208        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3209    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3210        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3211        let hashed_accounts =
3212            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3213        for (hashed_address, account) in &hashed_accounts {
3214            if let Some(account) = account {
3215                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3216            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3217                hashed_accounts_cursor.delete_current()?;
3218            }
3219        }
3220        Ok(hashed_accounts)
3221    }
3222
3223    fn unwind_storage_hashing(
3224        &self,
3225        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3226    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3227        // Aggregate all block changesets and make list of accounts that have been changed.
3228        let mut hashed_storages = changesets
3229            .into_iter()
3230            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3231                let hashed_key = keccak256(storage_entry.key);
3232                (keccak256(address), hashed_key, storage_entry.value)
3233            })
3234            .collect::<Vec<_>>();
3235        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3236
3237        // Apply values to HashedState, and remove the account if it's None.
3238        let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3239            B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3240        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3241        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3242            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3243
3244            if hashed_storage
3245                .seek_by_key_subkey(hashed_address, key)?
3246                .is_some_and(|entry| entry.key == key)
3247            {
3248                hashed_storage.delete_current()?;
3249            }
3250
3251            if !value.is_zero() {
3252                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3253            }
3254        }
3255        Ok(hashed_storage_keys)
3256    }
3257
3258    fn unwind_storage_hashing_range(
3259        &self,
3260        range: impl RangeBounds<BlockNumber>,
3261    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3262        let changesets = self.storage_changesets_range(range)?;
3263        self.unwind_storage_hashing(changesets.into_iter())
3264    }
3265
3266    fn insert_storage_for_hashing(
3267        &self,
3268        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3269    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3270        // hash values
3271        let hashed_storages =
3272            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3273                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3274                    map.insert(keccak256(entry.key), entry.value);
3275                    map
3276                });
3277                map.insert(keccak256(address), storage);
3278                map
3279            });
3280
3281        let hashed_storage_keys = hashed_storages
3282            .iter()
3283            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3284            .collect();
3285
3286        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3287        // Hash the address and key and apply them to HashedStorage (if Storage is None
3288        // just remove it);
3289        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3290            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3291                if hashed_storage_cursor
3292                    .seek_by_key_subkey(hashed_address, key)?
3293                    .is_some_and(|entry| entry.key == key)
3294                {
3295                    hashed_storage_cursor.delete_current()?;
3296                }
3297
3298                if !value.is_zero() {
3299                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3300                }
3301                Ok(())
3302            })
3303        })?;
3304
3305        Ok(hashed_storage_keys)
3306    }
3307}
3308
3309impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3310    fn unwind_account_history_indices<'a>(
3311        &self,
3312        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3313    ) -> ProviderResult<usize> {
3314        let mut last_indices = changesets
3315            .into_iter()
3316            .map(|(index, account)| (account.address, *index))
3317            .collect::<Vec<_>>();
3318        last_indices.sort_unstable_by_key(|(a, _)| *a);
3319
3320        if self.cached_storage_settings().storage_v2 {
3321            let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3322            self.pending_rocksdb_batches.lock().push(batch);
3323        } else {
3324            // Unwind the account history index in MDBX.
3325            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3326            for &(address, rem_index) in &last_indices {
3327                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3328                    &mut cursor,
3329                    ShardedKey::last(address),
3330                    rem_index,
3331                    |sharded_key| sharded_key.key == address,
3332                )?;
3333
3334                // Check the last returned partial shard.
3335                // If it's not empty, the shard needs to be reinserted.
3336                if !partial_shard.is_empty() {
3337                    cursor.insert(
3338                        ShardedKey::last(address),
3339                        &BlockNumberList::new_pre_sorted(partial_shard),
3340                    )?;
3341                }
3342            }
3343        }
3344
3345        let changesets = last_indices.len();
3346        Ok(changesets)
3347    }
3348
3349    fn unwind_account_history_indices_range(
3350        &self,
3351        range: impl RangeBounds<BlockNumber>,
3352    ) -> ProviderResult<usize> {
3353        let changesets = self.account_changesets_range(range)?;
3354        self.unwind_account_history_indices(changesets.iter())
3355    }
3356
3357    fn insert_account_history_index(
3358        &self,
3359        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3360    ) -> ProviderResult<()> {
3361        self.append_history_index::<_, tables::AccountsHistory>(
3362            account_transitions,
3363            ShardedKey::new,
3364        )
3365    }
3366
3367    fn unwind_storage_history_indices(
3368        &self,
3369        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3370    ) -> ProviderResult<usize> {
3371        let mut storage_changesets = changesets
3372            .into_iter()
3373            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3374            .collect::<Vec<_>>();
3375        storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3376
3377        if self.cached_storage_settings().storage_v2 {
3378            let batch =
3379                self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3380            self.pending_rocksdb_batches.lock().push(batch);
3381        } else {
3382            // Unwind the storage history index in MDBX.
3383            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3384            for &(address, storage_key, rem_index) in &storage_changesets {
3385                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3386                    &mut cursor,
3387                    StorageShardedKey::last(address, storage_key),
3388                    rem_index,
3389                    |storage_sharded_key| {
3390                        storage_sharded_key.address == address &&
3391                            storage_sharded_key.sharded_key.key == storage_key
3392                    },
3393                )?;
3394
3395                // Check the last returned partial shard.
3396                // If it's not empty, the shard needs to be reinserted.
3397                if !partial_shard.is_empty() {
3398                    cursor.insert(
3399                        StorageShardedKey::last(address, storage_key),
3400                        &BlockNumberList::new_pre_sorted(partial_shard),
3401                    )?;
3402                }
3403            }
3404        }
3405
3406        let changesets = storage_changesets.len();
3407        Ok(changesets)
3408    }
3409
3410    fn unwind_storage_history_indices_range(
3411        &self,
3412        range: impl RangeBounds<BlockNumber>,
3413    ) -> ProviderResult<usize> {
3414        let changesets = self.storage_changesets_range(range)?;
3415        self.unwind_storage_history_indices(changesets.into_iter())
3416    }
3417
3418    fn insert_storage_history_index(
3419        &self,
3420        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3421    ) -> ProviderResult<()> {
3422        self.append_history_index::<_, tables::StoragesHistory>(
3423            storage_transitions,
3424            |(address, storage_key), highest_block_number| {
3425                StorageShardedKey::new(address, storage_key, highest_block_number)
3426            },
3427        )
3428    }
3429
3430    #[instrument(level = "debug", target = "providers::db", skip_all)]
3431    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3432        let storage_settings = self.cached_storage_settings();
3433        if !storage_settings.storage_v2 {
3434            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3435            self.insert_account_history_index(indices)?;
3436        }
3437
3438        if !storage_settings.storage_v2 {
3439            let indices = self.changed_storages_and_blocks_with_range(range)?;
3440            self.insert_storage_history_index(indices)?;
3441        }
3442
3443        Ok(())
3444    }
3445}
3446
3447impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3448    for DatabaseProvider<TX, N>
3449{
3450    fn take_block_and_execution_above(
3451        &self,
3452        block: BlockNumber,
3453    ) -> ProviderResult<Chain<Self::Primitives>> {
3454        let range = block + 1..=self.last_block_number()?;
3455
3456        self.unwind_trie_state_from(block + 1)?;
3457
3458        // get execution res
3459        let execution_state = self.take_state_above(block)?;
3460
3461        let blocks = self.recovered_block_range(range)?;
3462
3463        // remove block bodies it is needed for both get block range and get block execution results
3464        // that is why it is deleted afterwards.
3465        self.remove_blocks_above(block)?;
3466
3467        // Update pipeline progress
3468        self.update_pipeline_stages(block, true)?;
3469
3470        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3471    }
3472
3473    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3474        self.unwind_trie_state_from(block + 1)?;
3475
3476        // remove execution res
3477        self.remove_state_above(block)?;
3478
3479        // remove block bodies it is needed for both get block range and get block execution results
3480        // that is why it is deleted afterwards.
3481        self.remove_blocks_above(block)?;
3482
3483        // Update pipeline progress
3484        self.update_pipeline_stages(block, true)?;
3485
3486        Ok(())
3487    }
3488}
3489
3490impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3491    for DatabaseProvider<TX, N>
3492{
3493    type Block = BlockTy<N>;
3494    type Receipt = ReceiptTy<N>;
3495
3496    /// Inserts the block into the database, writing to both static files and MDBX.
3497    ///
3498    /// This is a convenience method primarily used in tests. For production use,
3499    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3500    fn insert_block(
3501        &self,
3502        block: &RecoveredBlock<Self::Block>,
3503    ) -> ProviderResult<StoredBlockBodyIndices> {
3504        let block_number = block.number();
3505
3506        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3507        let executed_block = ExecutedBlock::new(
3508            Arc::new(block.clone()),
3509            Arc::new(BlockExecutionOutput {
3510                result: BlockExecutionResult {
3511                    receipts: Default::default(),
3512                    requests: Default::default(),
3513                    gas_used: 0,
3514                    blob_gas_used: 0,
3515                },
3516                state: Default::default(),
3517            }),
3518            ComputedTrieData::default(),
3519        );
3520
3521        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3522        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3523
3524        // Return the body indices
3525        self.block_body_indices(block_number)?
3526            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3527    }
3528
3529    fn append_block_bodies(
3530        &self,
3531        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3532    ) -> ProviderResult<()> {
3533        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3534
3535        // Initialize writer if we will be writing transactions to staticfiles
3536        let mut tx_writer =
3537            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3538
3539        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3540        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3541
3542        // Get id for the next tx_num or zero if there are no transactions.
3543        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3544
3545        for (block_number, body) in &bodies {
3546            // Increment block on static file header.
3547            tx_writer.increment_block(*block_number)?;
3548
3549            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3550            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3551
3552            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3553
3554            // insert block meta
3555            block_indices_cursor.append(*block_number, &block_indices)?;
3556
3557            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3558
3559            let Some(body) = body else { continue };
3560
3561            // write transaction block index
3562            if !body.transactions().is_empty() {
3563                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3564                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3565            }
3566
3567            // write transactions
3568            for transaction in body.transactions() {
3569                tx_writer.append_transaction(next_tx_num, transaction)?;
3570
3571                // Increment transaction id for each transaction.
3572                next_tx_num += 1;
3573            }
3574        }
3575
3576        self.storage.writer().write_block_bodies(self, bodies)?;
3577
3578        Ok(())
3579    }
3580
3581    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3582        let last_block_number = self.last_block_number()?;
3583        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3584        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3585            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3586        }
3587
3588        // Get highest static file block for the total block range
3589        let highest_static_file_block = self
3590            .static_file_provider()
3591            .get_highest_static_file_block(StaticFileSegment::Headers)
3592            .expect("todo: error handling, headers should exist");
3593
3594        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3595        // we remove only what is ABOVE the block.
3596        //
3597        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3598        // will have three blocks to remove, which will be block 8, 7, and 6.
3599        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3600        self.static_file_provider()
3601            .get_writer(block, StaticFileSegment::Headers)?
3602            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3603
3604        // First transaction to be removed
3605        let unwind_tx_from = self
3606            .block_body_indices(block)?
3607            .map(|b| b.next_tx_num())
3608            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3609
3610        // Last transaction to be removed
3611        let unwind_tx_to = self
3612            .tx
3613            .cursor_read::<tables::BlockBodyIndices>()?
3614            .last()?
3615            // shouldn't happen because this was OK above
3616            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3617            .1
3618            .last_tx_num();
3619
3620        if unwind_tx_from <= unwind_tx_to {
3621            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3622            self.with_rocksdb_batch(|batch| {
3623                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3624                for (hash, _) in hashes {
3625                    writer.delete_transaction_hash_number(hash)?;
3626                }
3627                Ok(((), writer.into_raw_rocksdb_batch()))
3628            })?;
3629        }
3630
3631        // Skip sender pruning when sender_recovery is fully pruned, since no sender data
3632        // exists in static files or the database.
3633        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3634            EitherWriter::new_senders(self, last_block_number)?
3635                .prune_senders(unwind_tx_from, block)?;
3636        }
3637
3638        self.remove_bodies_above(block)?;
3639
3640        Ok(())
3641    }
3642
3643    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3644        self.storage.writer().remove_block_bodies_above(self, block)?;
3645
3646        // First transaction to be removed
3647        let unwind_tx_from = self
3648            .block_body_indices(block)?
3649            .map(|b| b.next_tx_num())
3650            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3651
3652        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3653        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3654
3655        let static_file_tx_num =
3656            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3657
3658        let to_delete = static_file_tx_num
3659            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3660            .unwrap_or_default();
3661
3662        self.static_file_provider
3663            .latest_writer(StaticFileSegment::Transactions)?
3664            .prune_transactions(to_delete, block)?;
3665
3666        Ok(())
3667    }
3668
3669    /// Appends blocks with their execution state to the database.
3670    ///
3671    /// **Note:** This function is only used in tests.
3672    ///
3673    /// History indices are written to the appropriate backend based on storage settings:
3674    /// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
3675    ///
3676    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3677    fn append_blocks_with_state(
3678        &self,
3679        blocks: Vec<RecoveredBlock<Self::Block>>,
3680        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3681        hashed_state: HashedPostStateSorted,
3682    ) -> ProviderResult<()> {
3683        if blocks.is_empty() {
3684            debug!(target: "providers::db", "Attempted to append empty block range");
3685            return Ok(())
3686        }
3687
3688        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3689        // `None`.
3690        let first_number = blocks[0].number();
3691
3692        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3693        // `None`.
3694        let last_block_number = blocks[blocks.len() - 1].number();
3695
3696        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3697
3698        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3699        // This is necessary because with edge storage, changesets are written to static files
3700        // whose index isn't updated until commit, making them invisible to subsequent reads
3701        // within the same transaction.
3702        let (account_transitions, storage_transitions) = {
3703            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3704            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3705            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3706                let block_number = first_number + block_idx as u64;
3707                for (address, account_revert) in block_reverts {
3708                    account_transitions.entry(*address).or_default().push(block_number);
3709                    for storage_key in account_revert.storage.keys() {
3710                        let key = B256::from(storage_key.to_be_bytes());
3711                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3712                    }
3713                }
3714            }
3715            (account_transitions, storage_transitions)
3716        };
3717
3718        // Insert the blocks
3719        for block in blocks {
3720            self.insert_block(&block)?;
3721            durations_recorder.record_relative(metrics::Action::InsertBlock);
3722        }
3723
3724        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3725        durations_recorder.record_relative(metrics::Action::InsertState);
3726
3727        // insert hashes and intermediate merkle nodes
3728        self.write_hashed_state(&hashed_state)?;
3729        durations_recorder.record_relative(metrics::Action::InsertHashes);
3730
3731        // Use pre-computed transitions for history indices since static file
3732        // writes aren't visible until commit.
3733        // Note: For MDBX we use insert_*_history_index. For RocksDB we use
3734        // append_*_history_shard which handles read-merge-write internally.
3735        let storage_settings = self.cached_storage_settings();
3736        if storage_settings.storage_v2 {
3737            self.with_rocksdb_batch(|mut batch| {
3738                for (address, blocks) in account_transitions {
3739                    batch.append_account_history_shard(address, blocks)?;
3740                }
3741                Ok(((), Some(batch.into_inner())))
3742            })?;
3743        } else {
3744            self.insert_account_history_index(account_transitions)?;
3745        }
3746        if storage_settings.storage_v2 {
3747            self.with_rocksdb_batch(|mut batch| {
3748                for ((address, key), blocks) in storage_transitions {
3749                    batch.append_storage_history_shard(address, key, blocks)?;
3750                }
3751                Ok(((), Some(batch.into_inner())))
3752            })?;
3753        } else {
3754            self.insert_storage_history_index(storage_transitions)?;
3755        }
3756        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3757
3758        // Update pipeline progress
3759        self.update_pipeline_stages(last_block_number, false)?;
3760        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3761
3762        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3763
3764        Ok(())
3765    }
3766}
3767
3768impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3769    fn get_prune_checkpoint(
3770        &self,
3771        segment: PruneSegment,
3772    ) -> ProviderResult<Option<PruneCheckpoint>> {
3773        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3774    }
3775
3776    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3777        Ok(PruneSegment::variants()
3778            .filter_map(|segment| {
3779                self.tx
3780                    .get::<tables::PruneCheckpoints>(segment)
3781                    .transpose()
3782                    .map(|chk| chk.map(|chk| (segment, chk)))
3783            })
3784            .collect::<Result<_, _>>()?)
3785    }
3786}
3787
3788impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3789    fn save_prune_checkpoint(
3790        &self,
3791        segment: PruneSegment,
3792        checkpoint: PruneCheckpoint,
3793    ) -> ProviderResult<()> {
3794        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3795    }
3796}
3797
3798impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3799    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3800        let db_entries = self.tx.entries::<T>()?;
3801        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3802            Ok(entries) => entries,
3803            Err(ProviderError::UnsupportedProvider) => 0,
3804            Err(err) => return Err(err),
3805        };
3806
3807        Ok(db_entries + static_file_entries)
3808    }
3809}
3810
3811impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3812    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3813        let mut finalized_blocks = self
3814            .tx
3815            .cursor_read::<tables::ChainState>()?
3816            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3817            .take(1)
3818            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3819
3820        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3821        Ok(last_finalized_block_number)
3822    }
3823
3824    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3825        let mut finalized_blocks = self
3826            .tx
3827            .cursor_read::<tables::ChainState>()?
3828            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3829            .take(1)
3830            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3831
3832        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3833        Ok(last_finalized_block_number)
3834    }
3835}
3836
3837impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3838    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3839        Ok(self
3840            .tx
3841            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3842    }
3843
3844    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3845        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3846    }
3847}
3848
3849impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3850    type Tx = TX;
3851
3852    fn tx_ref(&self) -> &Self::Tx {
3853        &self.tx
3854    }
3855
3856    fn tx_mut(&mut self) -> &mut Self::Tx {
3857        &mut self.tx
3858    }
3859
3860    fn into_tx(self) -> Self::Tx {
3861        self.tx
3862    }
3863
3864    fn prune_modes_ref(&self) -> &PruneModes {
3865        self.prune_modes_ref()
3866    }
3867
3868    /// Commit database transaction, static files, and pending `RocksDB` batches.
3869    #[instrument(
3870        name = "DatabaseProvider::commit",
3871        level = "debug",
3872        target = "providers::db",
3873        skip_all
3874    )]
3875    fn commit(self) -> ProviderResult<()> {
3876        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3877            self.commit_unwind()?;
3878        } else {
3879            // Normal path: finalize() will call sync_all() if not already synced
3880            let mut timings = metrics::CommitTimings::default();
3881
3882            let start = Instant::now();
3883            self.static_file_provider.finalize()?;
3884            timings.sf = start.elapsed();
3885
3886            let start = Instant::now();
3887            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3888            for batch in batches {
3889                self.rocksdb_provider.commit_batch(batch)?;
3890            }
3891            timings.rocksdb = start.elapsed();
3892
3893            let start = Instant::now();
3894            self.tx.commit()?;
3895            timings.mdbx = start.elapsed();
3896
3897            self.metrics.record_commit(&timings);
3898        }
3899
3900        Ok(())
3901    }
3902}
3903
3904impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3905    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3906        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3907    }
3908}
3909
3910impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3911    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3912        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3913    }
3914}
3915
3916impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3917    fn cached_storage_settings(&self) -> StorageSettings {
3918        *self.storage_settings.read()
3919    }
3920
3921    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3922        *self.storage_settings.write() = settings;
3923    }
3924}
3925
3926impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3927    fn storage_path(&self) -> PathBuf {
3928        self.db_path.clone()
3929    }
3930}
3931
3932#[cfg(test)]
3933mod tests {
3934    use super::*;
3935    use crate::{
3936        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3937        BlockWriter,
3938    };
3939    use alloy_consensus::Header;
3940    use alloy_primitives::{
3941        map::{AddressMap, B256Map},
3942        U256,
3943    };
3944    use reth_chain_state::ExecutedBlock;
3945    use reth_db_api::models::StorageSettings;
3946    use reth_ethereum_primitives::Receipt;
3947    use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3948    use reth_primitives_traits::SealedBlock;
3949    use reth_storage_api::MetadataWriter;
3950    use reth_testing_utils::generators::{self, random_block, BlockParams};
3951    use reth_trie::{
3952        HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3953    };
3954    use revm_database::BundleState;
3955    use revm_state::AccountInfo;
3956    use std::{sync::mpsc, time::Duration};
3957
3958    #[test]
3959    fn test_receipts_by_block_range_empty_range() {
3960        let factory = create_test_provider_factory();
3961        let provider = factory.provider().unwrap();
3962
3963        // empty range should return empty vec
3964        let start = 10u64;
3965        let end = 9u64;
3966        let result = provider.receipts_by_block_range(start..=end).unwrap();
3967        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3968    }
3969
3970    #[test]
3971    fn unwind_commit_waits_for_pre_commit_readers() {
3972        let factory = create_test_provider_factory();
3973
3974        let reader = factory.provider().unwrap();
3975        let provider_rw = factory.unwind_provider_rw().unwrap();
3976        provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3977        let (done_tx, done_rx) = mpsc::channel();
3978
3979        let handle = std::thread::spawn(move || {
3980            let result = provider_rw.commit();
3981            done_tx.send(result).unwrap();
3982        });
3983
3984        assert!(
3985            done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3986            "unwind commit should wait while an older read transaction is still open"
3987        );
3988
3989        drop(reader);
3990
3991        done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
3992        handle.join().unwrap();
3993    }
3994
3995    #[test]
3996    fn test_receipts_by_block_range_nonexistent_blocks() {
3997        let factory = create_test_provider_factory();
3998        let provider = factory.provider().unwrap();
3999
4000        // non-existent blocks should return empty vecs for each block
4001        let result = provider.receipts_by_block_range(10..=12).unwrap();
4002        assert_eq!(result, vec![vec![], vec![], vec![]]);
4003    }
4004
4005    #[test]
4006    fn test_receipts_by_block_range_single_block() {
4007        let factory = create_test_provider_factory();
4008        let data = BlockchainTestData::default();
4009
4010        let provider_rw = factory.provider_rw().unwrap();
4011        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4012        provider_rw
4013            .write_state(
4014                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4015                crate::OriginalValuesKnown::No,
4016                StateWriteConfig::default(),
4017            )
4018            .unwrap();
4019        provider_rw.insert_block(&data.blocks[0].0).unwrap();
4020        provider_rw
4021            .write_state(
4022                &data.blocks[0].1,
4023                crate::OriginalValuesKnown::No,
4024                StateWriteConfig::default(),
4025            )
4026            .unwrap();
4027        provider_rw.commit().unwrap();
4028
4029        let provider = factory.provider().unwrap();
4030        let result = provider.receipts_by_block_range(1..=1).unwrap();
4031
4032        // should have one vec with one receipt
4033        assert_eq!(result.len(), 1);
4034        assert_eq!(result[0].len(), 1);
4035        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4036    }
4037
4038    #[test]
4039    fn test_receipts_by_block_range_multiple_blocks() {
4040        let factory = create_test_provider_factory();
4041        let data = BlockchainTestData::default();
4042
4043        let provider_rw = factory.provider_rw().unwrap();
4044        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4045        provider_rw
4046            .write_state(
4047                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4048                crate::OriginalValuesKnown::No,
4049                StateWriteConfig::default(),
4050            )
4051            .unwrap();
4052        for i in 0..3 {
4053            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4054            provider_rw
4055                .write_state(
4056                    &data.blocks[i].1,
4057                    crate::OriginalValuesKnown::No,
4058                    StateWriteConfig::default(),
4059                )
4060                .unwrap();
4061        }
4062        provider_rw.commit().unwrap();
4063
4064        let provider = factory.provider().unwrap();
4065        let result = provider.receipts_by_block_range(1..=3).unwrap();
4066
4067        // should have 3 vecs, each with one receipt
4068        assert_eq!(result.len(), 3);
4069        for (i, block_receipts) in result.iter().enumerate() {
4070            assert_eq!(block_receipts.len(), 1);
4071            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4072        }
4073    }
4074
4075    #[test]
4076    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4077        let factory = create_test_provider_factory();
4078        let data = BlockchainTestData::default();
4079
4080        let provider_rw = factory.provider_rw().unwrap();
4081        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4082        provider_rw
4083            .write_state(
4084                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4085                crate::OriginalValuesKnown::No,
4086                StateWriteConfig::default(),
4087            )
4088            .unwrap();
4089
4090        // insert blocks 1-3 with receipts
4091        for i in 0..3 {
4092            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4093            provider_rw
4094                .write_state(
4095                    &data.blocks[i].1,
4096                    crate::OriginalValuesKnown::No,
4097                    StateWriteConfig::default(),
4098                )
4099                .unwrap();
4100        }
4101        provider_rw.commit().unwrap();
4102
4103        let provider = factory.provider().unwrap();
4104        let result = provider.receipts_by_block_range(1..=3).unwrap();
4105
4106        // verify each block has one receipt
4107        assert_eq!(result.len(), 3);
4108        for block_receipts in &result {
4109            assert_eq!(block_receipts.len(), 1);
4110        }
4111    }
4112
4113    #[test]
4114    fn test_receipts_by_block_range_partial_range() {
4115        let factory = create_test_provider_factory();
4116        let data = BlockchainTestData::default();
4117
4118        let provider_rw = factory.provider_rw().unwrap();
4119        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4120        provider_rw
4121            .write_state(
4122                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4123                crate::OriginalValuesKnown::No,
4124                StateWriteConfig::default(),
4125            )
4126            .unwrap();
4127        for i in 0..3 {
4128            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4129            provider_rw
4130                .write_state(
4131                    &data.blocks[i].1,
4132                    crate::OriginalValuesKnown::No,
4133                    StateWriteConfig::default(),
4134                )
4135                .unwrap();
4136        }
4137        provider_rw.commit().unwrap();
4138
4139        let provider = factory.provider().unwrap();
4140
4141        // request range that includes both existing and non-existing blocks
4142        let result = provider.receipts_by_block_range(2..=5).unwrap();
4143        assert_eq!(result.len(), 4);
4144
4145        // blocks 2-3 should have receipts, blocks 4-5 should be empty
4146        assert_eq!(result[0].len(), 1); // block 2
4147        assert_eq!(result[1].len(), 1); // block 3
4148        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
4149        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
4150
4151        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4152        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4153    }
4154
4155    #[test]
4156    fn test_receipts_by_block_range_all_empty_blocks() {
4157        let factory = create_test_provider_factory();
4158        let mut rng = generators::rng();
4159
4160        // create blocks with no transactions
4161        let mut blocks = Vec::new();
4162        for i in 0..3 {
4163            let block =
4164                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4165            blocks.push(block);
4166        }
4167
4168        let provider_rw = factory.provider_rw().unwrap();
4169        for block in blocks {
4170            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4171        }
4172        provider_rw.commit().unwrap();
4173
4174        let provider = factory.provider().unwrap();
4175        let result = provider.receipts_by_block_range(1..=3).unwrap();
4176
4177        assert_eq!(result.len(), 3);
4178        for block_receipts in result {
4179            assert_eq!(block_receipts.len(), 0);
4180        }
4181    }
4182
4183    #[test]
4184    fn test_receipts_by_block_range_consistency_with_individual_calls() {
4185        let factory = create_test_provider_factory();
4186        let data = BlockchainTestData::default();
4187
4188        let provider_rw = factory.provider_rw().unwrap();
4189        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4190        provider_rw
4191            .write_state(
4192                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4193                crate::OriginalValuesKnown::No,
4194                StateWriteConfig::default(),
4195            )
4196            .unwrap();
4197        for i in 0..3 {
4198            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4199            provider_rw
4200                .write_state(
4201                    &data.blocks[i].1,
4202                    crate::OriginalValuesKnown::No,
4203                    StateWriteConfig::default(),
4204                )
4205                .unwrap();
4206        }
4207        provider_rw.commit().unwrap();
4208
4209        let provider = factory.provider().unwrap();
4210
4211        // get receipts using block range method
4212        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4213
4214        // get receipts using individual block calls
4215        let mut individual_results = Vec::new();
4216        for block_num in 1..=3 {
4217            let receipts =
4218                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4219            individual_results.push(receipts);
4220        }
4221
4222        assert_eq!(range_result, individual_results);
4223    }
4224
4225    #[test]
4226    fn test_write_trie_updates_sorted() {
4227        use reth_trie::{
4228            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4229            BranchNodeCompact, StorageTrieEntry,
4230        };
4231
4232        let factory = create_test_provider_factory();
4233        let provider_rw = factory.provider_rw().unwrap();
4234
4235        // Pre-populate account trie with data that will be deleted
4236        {
4237            let tx = provider_rw.tx_ref();
4238            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4239
4240            // Add account node that will be deleted
4241            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4242            cursor
4243                .upsert(
4244                    to_delete,
4245                    &BranchNodeCompact::new(
4246                        0b1010_1010_1010_1010, // state_mask
4247                        0b0000_0000_0000_0000, // tree_mask
4248                        0b0000_0000_0000_0000, // hash_mask
4249                        vec![],
4250                        None,
4251                    ),
4252                )
4253                .unwrap();
4254
4255            // Add account node that will be updated
4256            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4257            cursor
4258                .upsert(
4259                    to_update,
4260                    &BranchNodeCompact::new(
4261                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4262                        0b0000_0000_0000_0000, // tree_mask
4263                        0b0000_0000_0000_0000, // hash_mask
4264                        vec![],
4265                        None,
4266                    ),
4267                )
4268                .unwrap();
4269        }
4270
4271        // Pre-populate storage tries with data
4272        let storage_address1 = B256::from([1u8; 32]);
4273        let storage_address2 = B256::from([2u8; 32]);
4274        {
4275            let tx = provider_rw.tx_ref();
4276            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4277
4278            // Add storage nodes for address1 (one will be deleted)
4279            storage_cursor
4280                .upsert(
4281                    storage_address1,
4282                    &StorageTrieEntry {
4283                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4284                        node: BranchNodeCompact::new(
4285                            0b0011_0011_0011_0011, // will be deleted
4286                            0b0000_0000_0000_0000,
4287                            0b0000_0000_0000_0000,
4288                            vec![],
4289                            None,
4290                        ),
4291                    },
4292                )
4293                .unwrap();
4294
4295            // Add storage nodes for address2 (will be wiped)
4296            storage_cursor
4297                .upsert(
4298                    storage_address2,
4299                    &StorageTrieEntry {
4300                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4301                        node: BranchNodeCompact::new(
4302                            0b1100_1100_1100_1100, // will be wiped
4303                            0b0000_0000_0000_0000,
4304                            0b0000_0000_0000_0000,
4305                            vec![],
4306                            None,
4307                        ),
4308                    },
4309                )
4310                .unwrap();
4311            storage_cursor
4312                .upsert(
4313                    storage_address2,
4314                    &StorageTrieEntry {
4315                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4316                        node: BranchNodeCompact::new(
4317                            0b0011_1100_0011_1100, // will be wiped
4318                            0b0000_0000_0000_0000,
4319                            0b0000_0000_0000_0000,
4320                            vec![],
4321                            None,
4322                        ),
4323                    },
4324                )
4325                .unwrap();
4326        }
4327
4328        // Create sorted account trie updates
4329        let account_nodes = vec![
4330            (
4331                Nibbles::from_nibbles([0x1, 0x2]),
4332                Some(BranchNodeCompact::new(
4333                    0b1111_1111_1111_1111, // state_mask (updated)
4334                    0b0000_0000_0000_0000, // tree_mask
4335                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4336                    vec![],
4337                    None,
4338                )),
4339            ),
4340            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4341            (
4342                Nibbles::from_nibbles([0x5, 0x6]),
4343                Some(BranchNodeCompact::new(
4344                    0b1111_1111_1111_1111, // state_mask
4345                    0b0000_0000_0000_0000, // tree_mask
4346                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4347                    vec![],
4348                    None,
4349                )),
4350            ),
4351        ];
4352
4353        // Create sorted storage trie updates
4354        let storage_trie1 = StorageTrieUpdatesSorted {
4355            is_deleted: false,
4356            storage_nodes: vec![
4357                (
4358                    Nibbles::from_nibbles([0x1, 0x0]),
4359                    Some(BranchNodeCompact::new(
4360                        0b1111_0000_0000_0000, // state_mask
4361                        0b0000_0000_0000_0000, // tree_mask
4362                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4363                        vec![],
4364                        None,
4365                    )),
4366                ),
4367                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4368            ],
4369        };
4370
4371        let storage_trie2 = StorageTrieUpdatesSorted {
4372            is_deleted: true, // Wipe all storage for this address
4373            storage_nodes: vec![],
4374        };
4375
4376        let mut storage_tries = B256Map::default();
4377        storage_tries.insert(storage_address1, storage_trie1);
4378        storage_tries.insert(storage_address2, storage_trie2);
4379
4380        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4381
4382        // Write the sorted trie updates
4383        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4384
4385        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4386        // storage deletion = 5
4387        assert_eq!(num_entries, 5);
4388
4389        // Verify account trie updates were written correctly
4390        let tx = provider_rw.tx_ref();
4391        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4392
4393        // Check first account node was updated
4394        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4395        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4396        assert!(entry1.is_some(), "Updated account node should exist");
4397        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4398        assert_eq!(
4399            entry1.unwrap().1.state_mask,
4400            expected_mask,
4401            "Account node should have updated state_mask"
4402        );
4403
4404        // Check deleted account node no longer exists
4405        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4406        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4407        assert!(entry2.is_none(), "Deleted account node should not exist");
4408
4409        // Check new account node exists
4410        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4411        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4412        assert!(entry3.is_some(), "New account node should exist");
4413
4414        // Verify storage trie updates were written correctly
4415        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4416
4417        // Check storage for address1
4418        let storage_entries1: Vec<_> = storage_cursor
4419            .walk_dup(Some(storage_address1), None)
4420            .unwrap()
4421            .collect::<Result<Vec<_>, _>>()
4422            .unwrap();
4423        assert_eq!(
4424            storage_entries1.len(),
4425            1,
4426            "Storage address1 should have 1 entry after deletion"
4427        );
4428        assert_eq!(
4429            storage_entries1[0].1.nibbles.0,
4430            Nibbles::from_nibbles([0x1, 0x0]),
4431            "Remaining entry should be [0x1, 0x0]"
4432        );
4433
4434        // Check storage for address2 was wiped
4435        let storage_entries2: Vec<_> = storage_cursor
4436            .walk_dup(Some(storage_address2), None)
4437            .unwrap()
4438            .collect::<Result<Vec<_>, _>>()
4439            .unwrap();
4440        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4441
4442        provider_rw.commit().unwrap();
4443    }
4444
4445    #[test]
4446    fn test_prunable_receipts_logic() {
4447        let insert_blocks =
4448            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4449                let mut rng = generators::rng();
4450                for block_num in 0..=tip_block {
4451                    let block = random_block(
4452                        &mut rng,
4453                        block_num,
4454                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4455                    );
4456                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4457                }
4458            };
4459
4460        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4461            let outcome = ExecutionOutcome {
4462                first_block: block,
4463                receipts: vec![vec![Receipt {
4464                    tx_type: Default::default(),
4465                    success: true,
4466                    cumulative_gas_used: block, // identifier to assert against
4467                    logs: vec![],
4468                }]],
4469                ..Default::default()
4470            };
4471            provider_rw
4472                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4473                .unwrap();
4474            provider_rw.commit().unwrap();
4475        };
4476
4477        // Legacy mode (receipts in DB) - should be prunable
4478        {
4479            let factory = create_test_provider_factory();
4480            let storage_settings = StorageSettings::v1();
4481            factory.set_storage_settings_cache(storage_settings);
4482            let factory = factory.with_prune_modes(PruneModes {
4483                receipts: Some(PruneMode::Before(100)),
4484                ..Default::default()
4485            });
4486
4487            let tip_block = 200u64;
4488            let first_block = 1u64;
4489
4490            // create chain
4491            let provider_rw = factory.provider_rw().unwrap();
4492            insert_blocks(&provider_rw, tip_block, 1);
4493            provider_rw.commit().unwrap();
4494
4495            write_receipts(
4496                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4497                first_block,
4498            );
4499            write_receipts(
4500                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4501                tip_block - 1,
4502            );
4503
4504            let provider = factory.provider().unwrap();
4505
4506            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4507                assert!(provider
4508                    .receipts_by_block(block.into())
4509                    .unwrap()
4510                    .is_some_and(|r| r.len() == num_receipts));
4511            }
4512        }
4513
4514        // Static files mode
4515        {
4516            let factory = create_test_provider_factory();
4517            let storage_settings = StorageSettings::v2();
4518            factory.set_storage_settings_cache(storage_settings);
4519            let factory = factory.with_prune_modes(PruneModes {
4520                receipts: Some(PruneMode::Before(2)),
4521                ..Default::default()
4522            });
4523
4524            let tip_block = 200u64;
4525
4526            // create chain
4527            let provider_rw = factory.provider_rw().unwrap();
4528            insert_blocks(&provider_rw, tip_block, 1);
4529            provider_rw.commit().unwrap();
4530
4531            // Attempt to write receipts for block 0 and 1 (should be skipped)
4532            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4533            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4534
4535            assert!(factory
4536                .static_file_provider()
4537                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4538                .is_none(),);
4539            assert!(factory
4540                .static_file_provider()
4541                .get_highest_static_file_block(StaticFileSegment::Receipts)
4542                .is_some_and(|b| b == 1),);
4543
4544            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4545            // static files.
4546            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4547            assert!(factory
4548                .static_file_provider()
4549                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4550                .is_some_and(|num| num == 2),);
4551
4552            // After having a receipt already in static files, attempt to skip the next receipt by
4553            // changing the prune mode. It should NOT skip it and should still write the receipt,
4554            // since static files do not support gaps.
4555            let factory = factory.with_prune_modes(PruneModes {
4556                receipts: Some(PruneMode::Before(100)),
4557                ..Default::default()
4558            });
4559            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4560            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4561            write_receipts(provider_rw, 3);
4562
4563            // Ensure we can only fetch the 2 last receipts.
4564            //
4565            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4566            // to the block number it belongs to easily identify and assert.
4567            let provider = factory.provider().unwrap();
4568            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4569            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4570                assert!(provider
4571                    .receipts_by_block(num.into())
4572                    .unwrap()
4573                    .is_some_and(|r| r.len() == num_receipts));
4574
4575                let receipt = provider.receipt(num).unwrap();
4576                if num_receipts > 0 {
4577                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4578                } else {
4579                    assert!(receipt.is_none());
4580                }
4581            }
4582        }
4583    }
4584
4585    #[test]
4586    fn test_try_into_history_rejects_unexecuted_blocks() {
4587        use reth_storage_api::TryIntoHistoricalStateProvider;
4588
4589        let factory = create_test_provider_factory();
4590
4591        // Insert genesis block to have some data
4592        let data = BlockchainTestData::default();
4593        let provider_rw = factory.provider_rw().unwrap();
4594        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4595        provider_rw
4596            .write_state(
4597                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4598                crate::OriginalValuesKnown::No,
4599                StateWriteConfig::default(),
4600            )
4601            .unwrap();
4602        provider_rw.commit().unwrap();
4603
4604        // Get a fresh provider - Execution checkpoint is 0, no receipts written beyond genesis
4605        let provider = factory.provider().unwrap();
4606
4607        // Requesting historical state for block 0 (executed) should succeed
4608        let result = provider.try_into_history_at_block(0);
4609        assert!(result.is_ok(), "Block 0 should be available");
4610
4611        // Get another provider and request state for block 100 (not executed)
4612        let provider = factory.provider().unwrap();
4613        let result = provider.try_into_history_at_block(100);
4614
4615        // Should fail with BlockNotExecuted error
4616        match result {
4617            Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4618            Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4619            Ok(_) => panic!("Expected error, got Ok"),
4620        }
4621    }
4622
4623    #[test]
4624    fn test_unwind_storage_hashing_with_hashed_state() {
4625        let factory = create_test_provider_factory();
4626        let storage_settings = StorageSettings::v2();
4627        factory.set_storage_settings_cache(storage_settings);
4628
4629        let address = Address::random();
4630        let hashed_address = keccak256(address);
4631
4632        let plain_slot = B256::random();
4633        let hashed_slot = keccak256(plain_slot);
4634
4635        let current_value = U256::from(100);
4636        let old_value = U256::from(42);
4637
4638        let provider_rw = factory.provider_rw().unwrap();
4639        provider_rw
4640            .tx
4641            .cursor_dup_write::<tables::HashedStorages>()
4642            .unwrap()
4643            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4644            .unwrap();
4645
4646        let changesets = vec![(
4647            BlockNumberAddress((1, address)),
4648            StorageEntry { key: plain_slot, value: old_value },
4649        )];
4650
4651        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4652
4653        assert_eq!(result.len(), 1);
4654        assert!(result.contains_key(&hashed_address));
4655        assert!(result[&hashed_address].contains(&hashed_slot));
4656
4657        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4658        let entry = cursor
4659            .seek_by_key_subkey(hashed_address, hashed_slot)
4660            .unwrap()
4661            .expect("entry should exist");
4662        assert_eq!(entry.key, hashed_slot);
4663        assert_eq!(entry.value, old_value);
4664    }
4665
4666    #[test]
4667    fn test_write_and_remove_state_roundtrip_legacy() {
4668        let factory = create_test_provider_factory();
4669        let storage_settings = StorageSettings::v1();
4670        assert!(!storage_settings.use_hashed_state());
4671        factory.set_storage_settings_cache(storage_settings);
4672
4673        let address = Address::with_last_byte(1);
4674        let hashed_address = keccak256(address);
4675        let slot = U256::from(5);
4676        let slot_key = B256::from(slot);
4677        let hashed_slot = keccak256(slot_key);
4678
4679        let mut rng = generators::rng();
4680        let block0 =
4681            random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4682        let block1 =
4683            random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4684
4685        {
4686            let provider_rw = factory.provider_rw().unwrap();
4687            provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4688            provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4689            provider_rw
4690                .tx
4691                .cursor_write::<tables::PlainAccountState>()
4692                .unwrap()
4693                .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4694                .unwrap();
4695            provider_rw.commit().unwrap();
4696        }
4697
4698        let provider_rw = factory.provider_rw().unwrap();
4699
4700        let mut state_init: BundleStateInit = AddressMap::default();
4701        let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4702        storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4703        state_init.insert(
4704            address,
4705            (
4706                Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4707                Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4708                storage_map,
4709            ),
4710        );
4711
4712        let mut reverts_init: RevertsInit = HashMap::default();
4713        let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4714        block_reverts.insert(
4715            address,
4716            (
4717                Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4718                vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4719            ),
4720        );
4721        reverts_init.insert(1, block_reverts);
4722
4723        let execution_outcome =
4724            ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4725
4726        provider_rw
4727            .write_state(
4728                &execution_outcome,
4729                OriginalValuesKnown::Yes,
4730                StateWriteConfig {
4731                    write_receipts: false,
4732                    write_account_changesets: true,
4733                    write_storage_changesets: true,
4734                },
4735            )
4736            .unwrap();
4737
4738        let hashed_state =
4739            execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4740        provider_rw.write_hashed_state(&hashed_state).unwrap();
4741
4742        let account = provider_rw
4743            .tx
4744            .cursor_read::<tables::PlainAccountState>()
4745            .unwrap()
4746            .seek_exact(address)
4747            .unwrap()
4748            .unwrap()
4749            .1;
4750        assert_eq!(account.nonce, 1);
4751
4752        let storage_entry = provider_rw
4753            .tx
4754            .cursor_dup_read::<tables::PlainStorageState>()
4755            .unwrap()
4756            .seek_by_key_subkey(address, slot_key)
4757            .unwrap()
4758            .unwrap();
4759        assert_eq!(storage_entry.key, slot_key);
4760        assert_eq!(storage_entry.value, U256::from(10));
4761
4762        let hashed_entry = provider_rw
4763            .tx
4764            .cursor_dup_read::<tables::HashedStorages>()
4765            .unwrap()
4766            .seek_by_key_subkey(hashed_address, hashed_slot)
4767            .unwrap()
4768            .unwrap();
4769        assert_eq!(hashed_entry.key, hashed_slot);
4770        assert_eq!(hashed_entry.value, U256::from(10));
4771
4772        let account_cs_entries = provider_rw
4773            .tx
4774            .cursor_dup_read::<tables::AccountChangeSets>()
4775            .unwrap()
4776            .walk(Some(1))
4777            .unwrap()
4778            .collect::<Result<Vec<_>, _>>()
4779            .unwrap();
4780        assert!(!account_cs_entries.is_empty());
4781
4782        let storage_cs_entries = provider_rw
4783            .tx
4784            .cursor_read::<tables::StorageChangeSets>()
4785            .unwrap()
4786            .walk(Some(BlockNumberAddress((1, address))))
4787            .unwrap()
4788            .collect::<Result<Vec<_>, _>>()
4789            .unwrap();
4790        assert!(!storage_cs_entries.is_empty());
4791        assert_eq!(storage_cs_entries[0].1.key, slot_key);
4792
4793        provider_rw.remove_state_above(0).unwrap();
4794
4795        let restored_account = provider_rw
4796            .tx
4797            .cursor_read::<tables::PlainAccountState>()
4798            .unwrap()
4799            .seek_exact(address)
4800            .unwrap()
4801            .unwrap()
4802            .1;
4803        assert_eq!(restored_account.nonce, 0);
4804
4805        let storage_gone = provider_rw
4806            .tx
4807            .cursor_dup_read::<tables::PlainStorageState>()
4808            .unwrap()
4809            .seek_by_key_subkey(address, slot_key)
4810            .unwrap();
4811        assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4812
4813        let account_cs_after = provider_rw
4814            .tx
4815            .cursor_dup_read::<tables::AccountChangeSets>()
4816            .unwrap()
4817            .walk(Some(1))
4818            .unwrap()
4819            .collect::<Result<Vec<_>, _>>()
4820            .unwrap();
4821        assert!(account_cs_after.is_empty());
4822
4823        let storage_cs_after = provider_rw
4824            .tx
4825            .cursor_read::<tables::StorageChangeSets>()
4826            .unwrap()
4827            .walk(Some(BlockNumberAddress((1, address))))
4828            .unwrap()
4829            .collect::<Result<Vec<_>, _>>()
4830            .unwrap();
4831        assert!(storage_cs_after.is_empty());
4832    }
4833
4834    #[test]
4835    fn test_unwind_storage_hashing_legacy() {
4836        let factory = create_test_provider_factory();
4837        let storage_settings = StorageSettings::v1();
4838        assert!(!storage_settings.use_hashed_state());
4839        factory.set_storage_settings_cache(storage_settings);
4840
4841        let address = Address::random();
4842        let hashed_address = keccak256(address);
4843
4844        let plain_slot = B256::random();
4845        let hashed_slot = keccak256(plain_slot);
4846
4847        let current_value = U256::from(100);
4848        let old_value = U256::from(42);
4849
4850        let provider_rw = factory.provider_rw().unwrap();
4851        provider_rw
4852            .tx
4853            .cursor_dup_write::<tables::HashedStorages>()
4854            .unwrap()
4855            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4856            .unwrap();
4857
4858        let changesets = vec![(
4859            BlockNumberAddress((1, address)),
4860            StorageEntry { key: plain_slot, value: old_value },
4861        )];
4862
4863        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4864
4865        assert_eq!(result.len(), 1);
4866        assert!(result.contains_key(&hashed_address));
4867        assert!(result[&hashed_address].contains(&hashed_slot));
4868
4869        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4870        let entry = cursor
4871            .seek_by_key_subkey(hashed_address, hashed_slot)
4872            .unwrap()
4873            .expect("entry should exist");
4874        assert_eq!(entry.key, hashed_slot);
4875        assert_eq!(entry.value, old_value);
4876    }
4877
4878    #[test]
4879    fn test_write_state_and_historical_read_hashed() {
4880        use reth_storage_api::StateProvider;
4881        use reth_trie::{HashedPostState, KeccakKeyHasher};
4882        use revm_database::BundleState;
4883        use revm_state::AccountInfo;
4884
4885        let factory = create_test_provider_factory();
4886        factory.set_storage_settings_cache(StorageSettings::v2());
4887
4888        let address = Address::with_last_byte(1);
4889        let slot = U256::from(5);
4890        let slot_key = B256::from(slot);
4891        let hashed_address = keccak256(address);
4892        let hashed_slot = keccak256(slot_key);
4893
4894        {
4895            let sf = factory.static_file_provider();
4896            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4897            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4898            hw.append_header(&h0, &B256::ZERO).unwrap();
4899            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4900            hw.append_header(&h1, &B256::ZERO).unwrap();
4901            hw.commit().unwrap();
4902
4903            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4904            aw.append_account_changeset(vec![], 0).unwrap();
4905            aw.commit().unwrap();
4906
4907            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4908            sw.append_storage_changeset(vec![], 0).unwrap();
4909            sw.commit().unwrap();
4910        }
4911
4912        let provider_rw = factory.provider_rw().unwrap();
4913
4914        let bundle = BundleState::builder(1..=1)
4915            .state_present_account_info(
4916                address,
4917                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4918            )
4919            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4920            .revert_account_info(1, address, Some(None))
4921            .revert_storage(1, address, vec![(slot, U256::ZERO)])
4922            .build();
4923
4924        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4925
4926        provider_rw
4927            .tx
4928            .put::<tables::BlockBodyIndices>(
4929                1,
4930                StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4931            )
4932            .unwrap();
4933
4934        provider_rw
4935            .write_state(
4936                &execution_outcome,
4937                OriginalValuesKnown::Yes,
4938                StateWriteConfig {
4939                    write_receipts: false,
4940                    write_account_changesets: true,
4941                    write_storage_changesets: true,
4942                },
4943            )
4944            .unwrap();
4945
4946        let hashed_state =
4947            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4948        provider_rw.write_hashed_state(&hashed_state).unwrap();
4949
4950        let plain_storage_entries = provider_rw
4951            .tx
4952            .cursor_dup_read::<tables::PlainStorageState>()
4953            .unwrap()
4954            .walk(None)
4955            .unwrap()
4956            .collect::<Result<Vec<_>, _>>()
4957            .unwrap();
4958        assert!(plain_storage_entries.is_empty());
4959
4960        let hashed_entry = provider_rw
4961            .tx
4962            .cursor_dup_read::<tables::HashedStorages>()
4963            .unwrap()
4964            .seek_by_key_subkey(hashed_address, hashed_slot)
4965            .unwrap()
4966            .unwrap();
4967        assert_eq!(hashed_entry.key, hashed_slot);
4968        assert_eq!(hashed_entry.value, U256::from(10));
4969
4970        provider_rw.static_file_provider().commit().unwrap();
4971
4972        let sf = factory.static_file_provider();
4973        let storage_cs = sf.storage_changeset(1).unwrap();
4974        assert!(!storage_cs.is_empty());
4975        assert_eq!(storage_cs[0].1.key, slot_key);
4976
4977        let account_cs = sf.account_block_changeset(1).unwrap();
4978        assert!(!account_cs.is_empty());
4979        assert_eq!(account_cs[0].address, address);
4980
4981        let historical_value =
4982            HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
4983                .storage(address, slot_key)
4984                .unwrap();
4985        assert_eq!(historical_value, None);
4986    }
4987
4988    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4989    enum StorageMode {
4990        V1,
4991        V2,
4992    }
4993
4994    fn run_save_blocks_and_verify(mode: StorageMode) {
4995        use alloy_primitives::map::{FbBuildHasher, HashMap};
4996
4997        let factory = create_test_provider_factory();
4998
4999        match mode {
5000            StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
5001            StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
5002        }
5003
5004        let num_blocks = 3u64;
5005        let accounts_per_block = 5usize;
5006        let slots_per_account = 3usize;
5007
5008        let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
5009            SealedHeader::new(
5010                Header { number: 0, difficulty: U256::from(1), ..Default::default() },
5011                B256::ZERO,
5012            ),
5013            Default::default(),
5014        );
5015
5016        let genesis_executed = ExecutedBlock::new(
5017            Arc::new(genesis.try_recover().unwrap()),
5018            Arc::new(BlockExecutionOutput {
5019                result: BlockExecutionResult {
5020                    receipts: vec![],
5021                    requests: Default::default(),
5022                    gas_used: 0,
5023                    blob_gas_used: 0,
5024                },
5025                state: Default::default(),
5026            }),
5027            ComputedTrieData::default(),
5028        );
5029        let provider_rw = factory.provider_rw().unwrap();
5030        provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5031        provider_rw.commit().unwrap();
5032
5033        let mut blocks: Vec<ExecutedBlock> = Vec::new();
5034        let mut parent_hash = B256::ZERO;
5035
5036        for block_num in 1..=num_blocks {
5037            let mut builder = BundleState::builder(block_num..=block_num);
5038
5039            for acct_idx in 0..accounts_per_block {
5040                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5041                let info = AccountInfo {
5042                    nonce: block_num,
5043                    balance: U256::from(block_num * 100 + acct_idx as u64),
5044                    ..Default::default()
5045                };
5046
5047                let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5048                    slots_per_account as u64)
5049                    .map(|s| {
5050                        (
5051                            U256::from(s + acct_idx as u64 * 100),
5052                            (U256::ZERO, U256::from(block_num * 1000 + s)),
5053                        )
5054                    })
5055                    .collect();
5056
5057                let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5058                    .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5059                    .collect();
5060
5061                builder = builder
5062                    .state_present_account_info(address, info)
5063                    .revert_account_info(block_num, address, Some(None))
5064                    .state_storage(address, storage)
5065                    .revert_storage(block_num, address, revert_storage);
5066            }
5067
5068            let bundle = builder.build();
5069
5070            let hashed_state =
5071                HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5072
5073            let header = Header {
5074                number: block_num,
5075                parent_hash,
5076                difficulty: U256::from(1),
5077                ..Default::default()
5078            };
5079            let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5080                header,
5081                Default::default(),
5082            );
5083            parent_hash = block.hash();
5084
5085            let executed = ExecutedBlock::new(
5086                Arc::new(block.try_recover().unwrap()),
5087                Arc::new(BlockExecutionOutput {
5088                    result: BlockExecutionResult {
5089                        receipts: vec![],
5090                        requests: Default::default(),
5091                        gas_used: 0,
5092                        blob_gas_used: 0,
5093                    },
5094                    state: bundle,
5095                }),
5096                ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5097            );
5098            blocks.push(executed);
5099        }
5100
5101        let provider_rw = factory.provider_rw().unwrap();
5102        provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5103        provider_rw.commit().unwrap();
5104
5105        let provider = factory.provider().unwrap();
5106
5107        for block_num in 1..=num_blocks {
5108            for acct_idx in 0..accounts_per_block {
5109                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5110                let hashed_address = keccak256(address);
5111
5112                let ha_entry = provider
5113                    .tx_ref()
5114                    .cursor_read::<tables::HashedAccounts>()
5115                    .unwrap()
5116                    .seek_exact(hashed_address)
5117                    .unwrap();
5118                assert!(
5119                    ha_entry.is_some(),
5120                    "HashedAccounts missing for block {block_num} acct {acct_idx}"
5121                );
5122
5123                for s in 1..=slots_per_account as u64 {
5124                    let slot = U256::from(s + acct_idx as u64 * 100);
5125                    let slot_key = B256::from(slot);
5126                    let hashed_slot = keccak256(slot_key);
5127
5128                    let hs_entry = provider
5129                        .tx_ref()
5130                        .cursor_dup_read::<tables::HashedStorages>()
5131                        .unwrap()
5132                        .seek_by_key_subkey(hashed_address, hashed_slot)
5133                        .unwrap();
5134                    assert!(
5135                        hs_entry.is_some(),
5136                        "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5137                    );
5138                    let entry = hs_entry.unwrap();
5139                    assert_eq!(entry.key, hashed_slot);
5140                    assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5141                }
5142            }
5143        }
5144
5145        for block_num in 1..=num_blocks {
5146            let header = provider.header_by_number(block_num).unwrap();
5147            assert!(header.is_some(), "Header missing for block {block_num}");
5148
5149            let indices = provider.block_body_indices(block_num).unwrap();
5150            assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5151        }
5152
5153        let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5154        let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5155
5156        if mode == StorageMode::V2 {
5157            assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5158            assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5159
5160            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5161            assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5162
5163            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5164            assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5165
5166            provider.static_file_provider().commit().unwrap();
5167            let sf = factory.static_file_provider();
5168
5169            for block_num in 1..=num_blocks {
5170                let account_cs = sf.account_block_changeset(block_num).unwrap();
5171                assert!(
5172                    !account_cs.is_empty(),
5173                    "v2: static file AccountChangeSets should exist for block {block_num}"
5174                );
5175
5176                let storage_cs = sf.storage_changeset(block_num).unwrap();
5177                assert!(
5178                    !storage_cs.is_empty(),
5179                    "v2: static file StorageChangeSets should exist for block {block_num}"
5180                );
5181
5182                for (_, entry) in &storage_cs {
5183                    assert!(
5184                        entry.key != keccak256(entry.key),
5185                        "v2: static file storage changeset should have plain slot keys"
5186                    );
5187                }
5188            }
5189
5190            let rocksdb = factory.rocksdb_provider();
5191            for block_num in 1..=num_blocks {
5192                for acct_idx in 0..accounts_per_block {
5193                    let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5194                    let shards = rocksdb.account_history_shards(address).unwrap();
5195                    assert!(
5196                        !shards.is_empty(),
5197                        "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5198                    );
5199
5200                    for s in 1..=slots_per_account as u64 {
5201                        let slot = U256::from(s + acct_idx as u64 * 100);
5202                        let slot_key = B256::from(slot);
5203                        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5204                        assert!(
5205                            !shards.is_empty(),
5206                            "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5207                        );
5208                    }
5209                }
5210            }
5211        } else {
5212            assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5213            assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5214
5215            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5216            assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5217
5218            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5219            assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5220
5221            for block_num in 1..=num_blocks {
5222                let storage_entries: Vec<_> = provider
5223                    .tx_ref()
5224                    .cursor_dup_read::<tables::StorageChangeSets>()
5225                    .unwrap()
5226                    .walk_range(BlockNumberAddress::range(block_num..=block_num))
5227                    .unwrap()
5228                    .collect::<Result<Vec<_>, _>>()
5229                    .unwrap();
5230                assert!(
5231                    !storage_entries.is_empty(),
5232                    "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5233                );
5234
5235                for (_, entry) in &storage_entries {
5236                    let slot_key = B256::from(entry.key);
5237                    assert!(
5238                        slot_key != keccak256(slot_key),
5239                        "v1: storage changeset keys should be plain (not hashed)"
5240                    );
5241                }
5242            }
5243
5244            let mdbx_account_history =
5245                provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5246            assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5247
5248            let mdbx_storage_history =
5249                provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5250            assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5251        }
5252    }
5253
5254    #[test]
5255    fn test_save_blocks_v1_table_assertions() {
5256        run_save_blocks_and_verify(StorageMode::V1);
5257    }
5258
5259    #[test]
5260    fn test_save_blocks_v2_table_assertions() {
5261        run_save_blocks_and_verify(StorageMode::V2);
5262    }
5263
5264    #[test]
5265    fn test_write_and_remove_state_roundtrip_v2() {
5266        let factory = create_test_provider_factory();
5267        let storage_settings = StorageSettings::v2();
5268        assert!(storage_settings.use_hashed_state());
5269        factory.set_storage_settings_cache(storage_settings);
5270
5271        let address = Address::with_last_byte(1);
5272        let hashed_address = keccak256(address);
5273        let slot = U256::from(5);
5274        let slot_key = B256::from(slot);
5275        let hashed_slot = keccak256(slot_key);
5276
5277        {
5278            let sf = factory.static_file_provider();
5279            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5280            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5281            hw.append_header(&h0, &B256::ZERO).unwrap();
5282            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5283            hw.append_header(&h1, &B256::ZERO).unwrap();
5284            hw.commit().unwrap();
5285
5286            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5287            aw.append_account_changeset(vec![], 0).unwrap();
5288            aw.commit().unwrap();
5289
5290            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5291            sw.append_storage_changeset(vec![], 0).unwrap();
5292            sw.commit().unwrap();
5293        }
5294
5295        {
5296            let provider_rw = factory.provider_rw().unwrap();
5297            provider_rw
5298                .tx
5299                .put::<tables::BlockBodyIndices>(
5300                    0,
5301                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5302                )
5303                .unwrap();
5304            provider_rw
5305                .tx
5306                .put::<tables::BlockBodyIndices>(
5307                    1,
5308                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5309                )
5310                .unwrap();
5311            provider_rw
5312                .tx
5313                .cursor_write::<tables::HashedAccounts>()
5314                .unwrap()
5315                .upsert(
5316                    hashed_address,
5317                    &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5318                )
5319                .unwrap();
5320            provider_rw.commit().unwrap();
5321        }
5322
5323        let provider_rw = factory.provider_rw().unwrap();
5324
5325        let bundle = BundleState::builder(1..=1)
5326            .state_present_account_info(
5327                address,
5328                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5329            )
5330            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5331            .revert_account_info(1, address, Some(None))
5332            .revert_storage(1, address, vec![(slot, U256::ZERO)])
5333            .build();
5334
5335        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5336
5337        provider_rw
5338            .write_state(
5339                &execution_outcome,
5340                OriginalValuesKnown::Yes,
5341                StateWriteConfig {
5342                    write_receipts: false,
5343                    write_account_changesets: true,
5344                    write_storage_changesets: true,
5345                },
5346            )
5347            .unwrap();
5348
5349        let hashed_state =
5350            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5351        provider_rw.write_hashed_state(&hashed_state).unwrap();
5352
5353        let hashed_account = provider_rw
5354            .tx
5355            .cursor_read::<tables::HashedAccounts>()
5356            .unwrap()
5357            .seek_exact(hashed_address)
5358            .unwrap()
5359            .unwrap()
5360            .1;
5361        assert_eq!(hashed_account.nonce, 1);
5362
5363        let hashed_entry = provider_rw
5364            .tx
5365            .cursor_dup_read::<tables::HashedStorages>()
5366            .unwrap()
5367            .seek_by_key_subkey(hashed_address, hashed_slot)
5368            .unwrap()
5369            .unwrap();
5370        assert_eq!(hashed_entry.key, hashed_slot);
5371        assert_eq!(hashed_entry.value, U256::from(10));
5372
5373        let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5374        assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5375
5376        let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5377        assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5378
5379        provider_rw.static_file_provider().commit().unwrap();
5380
5381        let sf = factory.static_file_provider();
5382        let storage_cs = sf.storage_changeset(1).unwrap();
5383        assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5384        assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5385
5386        provider_rw.remove_state_above(0).unwrap();
5387
5388        let restored_account = provider_rw
5389            .tx
5390            .cursor_read::<tables::HashedAccounts>()
5391            .unwrap()
5392            .seek_exact(hashed_address)
5393            .unwrap();
5394        assert!(
5395            restored_account.is_none(),
5396            "v2: account should be removed (didn't exist before block 1)"
5397        );
5398
5399        let storage_gone = provider_rw
5400            .tx
5401            .cursor_dup_read::<tables::HashedStorages>()
5402            .unwrap()
5403            .seek_by_key_subkey(hashed_address, hashed_slot)
5404            .unwrap();
5405        assert!(
5406            storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5407            "v2: storage should be reverted (removed or different key)"
5408        );
5409
5410        let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5411        assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5412
5413        let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5414        assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5415    }
5416
5417    #[test]
5418    fn test_unwind_storage_history_indices_v2() {
5419        let factory = create_test_provider_factory();
5420        factory.set_storage_settings_cache(StorageSettings::v2());
5421
5422        let address = Address::with_last_byte(1);
5423        let slot_key = B256::from(U256::from(42));
5424
5425        {
5426            let rocksdb = factory.rocksdb_provider();
5427            let mut batch = rocksdb.batch();
5428            batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5429            batch.commit().unwrap();
5430
5431            let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5432            assert!(!shards.is_empty(), "history should be written to rocksdb");
5433        }
5434
5435        let provider_rw = factory.provider_rw().unwrap();
5436
5437        let changesets = vec![
5438            (
5439                BlockNumberAddress((7, address)),
5440                StorageEntry { key: slot_key, value: U256::from(5) },
5441            ),
5442            (
5443                BlockNumberAddress((10, address)),
5444                StorageEntry { key: slot_key, value: U256::from(8) },
5445            ),
5446        ];
5447
5448        let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5449        assert_eq!(count, 2);
5450
5451        provider_rw.commit().unwrap();
5452
5453        let rocksdb = factory.rocksdb_provider();
5454        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5455
5456        assert!(
5457            !shards.is_empty(),
5458            "history shards should still exist with block 3 after partial unwind"
5459        );
5460
5461        let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5462        assert!(all_blocks.contains(&3), "block 3 should remain");
5463        assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5464        assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5465    }
5466}