Skip to main content

reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, AddressSet, B256Map, HashMap},
31    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::{Database, ReaderTxnTracker},
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44        StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55    SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader, StoragePath,
65    StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77    cmp::Ordering,
78    collections::{BTreeMap, BTreeSet},
79    fmt::Debug,
80    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81    path::PathBuf,
82    sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86/// Determines the commit order for database operations.
87#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
90    #[default]
91    Normal,
92    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
93    /// Used for unwind operations to allow recovery by truncating static files on restart.
94    Unwind,
95}
96
97impl CommitOrder {
98    /// Returns true if this is unwind commit order.
99    pub const fn is_unwind(&self) -> bool {
100        matches!(self, Self::Unwind)
101    }
102}
103
104/// A [`DatabaseProvider`] that holds a read-only database transaction.
105pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107/// A [`DatabaseProvider`] that holds a read-write database transaction.
108///
109/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
110/// Once that issue is solved, we can probably revert back to being an alias type.
111#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113    pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119    fn deref(&self) -> &Self::Target {
120        &self.0
121    }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125    fn deref_mut(&mut self) -> &mut Self::Target {
126        &mut self.0
127    }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131    for DatabaseProviderRW<DB, N>
132{
133    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134        &self.0
135    }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139    /// Commit database transaction and static file if it exists.
140    pub fn commit(self) -> ProviderResult<()> {
141        self.0.commit()
142    }
143
144    /// Consume `DbTx` or `DbTxMut`.
145    pub fn into_tx(self) -> <DB as Database>::TXMut {
146        self.0.into_tx()
147    }
148
149    /// Override the minimum pruning distance for testing purposes.
150    #[cfg(any(test, feature = "test-utils"))]
151    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152        self.0.minimum_pruning_distance = distance;
153        self
154    }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158    for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161        provider.0
162    }
163}
164
165/// Mode for [`DatabaseProvider::save_blocks`].
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168    /// Full mode: write block structure + receipts + state + trie.
169    /// Used by engine/production code.
170    Full,
171    /// Blocks only: write block structure (headers, txs, senders, indices).
172    /// Receipts/state/trie are skipped - they may come later via separate calls.
173    /// Used by `insert_block`.
174    BlocksOnly,
175}
176
177impl SaveBlocksMode {
178    /// Returns `true` if this is [`SaveBlocksMode::Full`].
179    pub const fn with_state(self) -> bool {
180        matches!(self, Self::Full)
181    }
182}
183
184/// A provider struct that fetches data from the database.
185/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
186pub struct DatabaseProvider<TX, N: NodeTypes> {
187    /// Database transaction.
188    tx: TX,
189    /// Chain spec
190    chain_spec: Arc<N::ChainSpec>,
191    /// Static File provider
192    static_file_provider: StaticFileProvider<N::Primitives>,
193    /// Pruning configuration
194    prune_modes: PruneModes,
195    /// Node storage handler.
196    storage: Arc<N::Storage>,
197    /// Storage configuration settings for this node
198    storage_settings: Arc<RwLock<StorageSettings>>,
199    /// `RocksDB` provider
200    rocksdb_provider: RocksDBProvider,
201    /// Changeset cache for trie unwinding
202    changeset_cache: ChangesetCache,
203    /// Task runtime for spawning parallel I/O work.
204    runtime: reth_tasks::Runtime,
205    /// Path to the database directory.
206    db_path: PathBuf,
207    /// Pending `RocksDB` batches to be committed at provider commit time.
208    pending_rocksdb_batches: PendingRocksDBBatches,
209    /// Commit order for database operations.
210    commit_order: CommitOrder,
211    /// Minimum distance from tip required for pruning
212    minimum_pruning_distance: u64,
213    /// Database provider metrics
214    metrics: metrics::DatabaseProviderMetrics,
215    /// Database handle used to inspect active MDBX readers during unwind commits.
216    reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
217}
218
219impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        let mut s = f.debug_struct("DatabaseProvider");
222        s.field("tx", &self.tx)
223            .field("chain_spec", &self.chain_spec)
224            .field("static_file_provider", &self.static_file_provider)
225            .field("prune_modes", &self.prune_modes)
226            .field("storage", &self.storage)
227            .field("storage_settings", &self.storage_settings)
228            .field("rocksdb_provider", &self.rocksdb_provider)
229            .field("changeset_cache", &self.changeset_cache)
230            .field("runtime", &self.runtime)
231            .field("pending_rocksdb_batches", &"<pending batches>")
232            .field("commit_order", &self.commit_order)
233            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
234            .field("reader_txn_tracker", &"<reader txn tracker>")
235            .finish()
236    }
237}
238
239impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
240    /// Returns reference to prune modes.
241    pub const fn prune_modes_ref(&self) -> &PruneModes {
242        &self.prune_modes
243    }
244
245    /// Sets the minimum pruning distance.
246    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
247        self.minimum_pruning_distance = distance;
248        self
249    }
250
251    /// Attaches reader tracking so unwind commits can wait on active readers.
252    pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
253    where
254        T: ReaderTxnTracker + 'static,
255    {
256        self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
257        self
258    }
259}
260
261impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
262    /// Commits unwind writes in MDBX -> `RocksDB` -> static-file order.
263    ///
264    /// This keeps MDBX as the first durable step so an interrupted unwind can be recovered by
265    /// truncating static files from checkpoints on the next startup.
266    ///
267    /// For `storage_v2`, this waits after the MDBX commit so readers holding older MDBX-visible
268    /// views cannot overlap the `RocksDB` unwind.
269    ///
270    /// Historical `storage_v2` reads ignore `RocksDB` history entries above their MDBX-visible tip,
271    /// so no additional post-`RocksDB` wait is needed before static-file commit.
272    fn commit_unwind(self) -> ProviderResult<()> {
273        let storage_v2 = self.cached_storage_settings().storage_v2;
274        let reader_txn_tracker = self.reader_txn_tracker.clone();
275        self.tx.commit()?;
276
277        if storage_v2 {
278            if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
279                reader_txn_tracker.wait_for_pre_commit_readers();
280            }
281
282            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
283            for batch in batches {
284                self.rocksdb_provider.commit_batch(batch)?;
285            }
286        }
287
288        self.static_file_provider.commit()?;
289        Ok(())
290    }
291
292    /// State provider for latest state
293    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
294        trace!(target: "providers::db", "Returning latest state provider");
295        Box::new(LatestStateProviderRef::new(self))
296    }
297
298    /// Storage provider for state at that given block hash
299    pub fn history_by_block_hash<'a>(
300        &'a self,
301        block_hash: BlockHash,
302    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
303        let mut block_number =
304            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
305        if block_number == self.best_block_number().unwrap_or_default() &&
306            block_number == self.last_block_number().unwrap_or_default()
307        {
308            return Ok(Box::new(LatestStateProviderRef::new(self)))
309        }
310
311        // +1 as the changeset that we want is the one that was applied after this block.
312        block_number += 1;
313
314        let account_history_prune_checkpoint =
315            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
316        let storage_history_prune_checkpoint =
317            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
318
319        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
320
321        // If we pruned account or storage history, we can't return state on every historical block.
322        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
323        if let Some(prune_checkpoint_block_number) =
324            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
325        {
326            state_provider = state_provider.with_lowest_available_account_history_block_number(
327                prune_checkpoint_block_number + 1,
328            );
329        }
330        if let Some(prune_checkpoint_block_number) =
331            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
332        {
333            state_provider = state_provider.with_lowest_available_storage_history_block_number(
334                prune_checkpoint_block_number + 1,
335            );
336        }
337
338        Ok(Box::new(state_provider))
339    }
340
341    #[cfg(feature = "test-utils")]
342    /// Sets the prune modes for provider.
343    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
344        self.prune_modes = prune_modes;
345    }
346}
347
348impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
349    type Primitives = N::Primitives;
350}
351
352impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
353    /// Returns a static file provider
354    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
355        self.static_file_provider.clone()
356    }
357
358    fn get_static_file_writer(
359        &self,
360        block: BlockNumber,
361        segment: StaticFileSegment,
362    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
363        self.static_file_provider.get_writer(block, segment)
364    }
365}
366
367impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
368    /// Returns the `RocksDB` provider.
369    fn rocksdb_provider(&self) -> RocksDBProvider {
370        self.rocksdb_provider.clone()
371    }
372
373    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
374        self.pending_rocksdb_batches.lock().push(batch);
375    }
376
377    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
378        let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
379        for batch in batches {
380            self.rocksdb_provider.commit_batch(batch)?;
381        }
382        Ok(())
383    }
384}
385
386impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
387    for DatabaseProvider<TX, N>
388{
389    type ChainSpec = N::ChainSpec;
390
391    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
392        self.chain_spec.clone()
393    }
394}
395
396impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
397    /// Creates a provider with an inner read-write transaction.
398    #[expect(clippy::too_many_arguments)]
399    fn new_rw_inner(
400        tx: TX,
401        chain_spec: Arc<N::ChainSpec>,
402        static_file_provider: StaticFileProvider<N::Primitives>,
403        prune_modes: PruneModes,
404        storage: Arc<N::Storage>,
405        storage_settings: Arc<RwLock<StorageSettings>>,
406        rocksdb_provider: RocksDBProvider,
407        changeset_cache: ChangesetCache,
408        runtime: reth_tasks::Runtime,
409        db_path: PathBuf,
410        commit_order: CommitOrder,
411    ) -> Self {
412        Self {
413            tx,
414            chain_spec,
415            static_file_provider,
416            prune_modes,
417            storage,
418            storage_settings,
419            rocksdb_provider,
420            changeset_cache,
421            runtime,
422            db_path,
423            pending_rocksdb_batches: Default::default(),
424            commit_order,
425            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
426            metrics: metrics::DatabaseProviderMetrics::default(),
427            reader_txn_tracker: None,
428        }
429    }
430
431    /// Creates a provider with an inner read-write transaction using normal commit order.
432    #[expect(clippy::too_many_arguments)]
433    pub fn new_rw(
434        tx: TX,
435        chain_spec: Arc<N::ChainSpec>,
436        static_file_provider: StaticFileProvider<N::Primitives>,
437        prune_modes: PruneModes,
438        storage: Arc<N::Storage>,
439        storage_settings: Arc<RwLock<StorageSettings>>,
440        rocksdb_provider: RocksDBProvider,
441        changeset_cache: ChangesetCache,
442        runtime: reth_tasks::Runtime,
443        db_path: PathBuf,
444    ) -> Self {
445        Self::new_rw_inner(
446            tx,
447            chain_spec,
448            static_file_provider,
449            prune_modes,
450            storage,
451            storage_settings,
452            rocksdb_provider,
453            changeset_cache,
454            runtime,
455            db_path,
456            CommitOrder::Normal,
457        )
458    }
459
460    /// Creates a provider with an inner read-write transaction using unwind commit order.
461    #[expect(clippy::too_many_arguments)]
462    pub fn new_unwind_rw(
463        tx: TX,
464        chain_spec: Arc<N::ChainSpec>,
465        static_file_provider: StaticFileProvider<N::Primitives>,
466        prune_modes: PruneModes,
467        storage: Arc<N::Storage>,
468        storage_settings: Arc<RwLock<StorageSettings>>,
469        rocksdb_provider: RocksDBProvider,
470        changeset_cache: ChangesetCache,
471        runtime: reth_tasks::Runtime,
472        db_path: PathBuf,
473    ) -> Self {
474        Self::new_rw_inner(
475            tx,
476            chain_spec,
477            static_file_provider,
478            prune_modes,
479            storage,
480            storage_settings,
481            rocksdb_provider,
482            changeset_cache,
483            runtime,
484            db_path,
485            CommitOrder::Unwind,
486        )
487    }
488}
489
490impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
491    fn as_ref(&self) -> &Self {
492        self
493    }
494}
495
496impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
497    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
498    ///
499    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
500    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
501    where
502        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
503    {
504        let rocksdb = self.rocksdb_provider();
505        let rocksdb_batch = rocksdb.batch();
506
507        let (result, raw_batch) = f(rocksdb_batch)?;
508
509        if let Some(batch) = raw_batch {
510            self.set_pending_rocksdb_batch(batch);
511        }
512        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
513
514        Ok(result)
515    }
516
517    /// Creates the context for static file writes.
518    fn static_file_write_ctx(
519        &self,
520        save_mode: SaveBlocksMode,
521        first_block: BlockNumber,
522        last_block: BlockNumber,
523    ) -> ProviderResult<StaticFileWriteCtx> {
524        let tip = self.last_block_number()?.max(last_block);
525        Ok(StaticFileWriteCtx {
526            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
527                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
528            write_receipts: save_mode.with_state() &&
529                EitherWriter::receipts_destination(self).is_static_file(),
530            write_account_changesets: save_mode.with_state() &&
531                EitherWriterDestination::account_changesets(self).is_static_file(),
532            write_storage_changesets: save_mode.with_state() &&
533                EitherWriterDestination::storage_changesets(self).is_static_file(),
534            tip,
535            receipts_prune_mode: self.prune_modes.receipts,
536            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
537            receipts_prunable: self
538                .static_file_provider
539                .get_highest_static_file_tx(StaticFileSegment::Receipts)
540                .is_none() &&
541                PruneMode::Distance(self.minimum_pruning_distance)
542                    .should_prune(first_block, tip),
543        })
544    }
545
546    /// Creates the context for `RocksDB` writes.
547    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
548        RocksDBWriteCtx {
549            first_block_number: first_block,
550            prune_tx_lookup: self.prune_modes.transaction_lookup,
551            storage_settings: self.cached_storage_settings(),
552            pending_batches: self.pending_rocksdb_batches.clone(),
553        }
554    }
555
556    /// Writes executed blocks and state to storage.
557    ///
558    /// This method parallelizes static file (SF) writes with MDBX writes.
559    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
560    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
561    ///
562    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
563    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
564    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
565    pub fn save_blocks(
566        &self,
567        blocks: Vec<ExecutedBlock<N::Primitives>>,
568        save_mode: SaveBlocksMode,
569    ) -> ProviderResult<()> {
570        if blocks.is_empty() {
571            debug!(target: "providers::db", "Attempted to write empty block range");
572            return Ok(())
573        }
574
575        let total_start = Instant::now();
576        let block_count = blocks.len() as u64;
577        let first_number = blocks.first().unwrap().recovered_block().number();
578        let last_block_number = blocks.last().unwrap().recovered_block().number();
579
580        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
581
582        // Compute tx_nums upfront (both threads need these)
583        let first_tx_num = self
584            .tx
585            .cursor_read::<tables::TransactionBlocks>()?
586            .last()?
587            .map(|(n, _)| n + 1)
588            .unwrap_or_default();
589
590        let tx_nums: Vec<TxNumber> = {
591            let mut nums = Vec::with_capacity(blocks.len());
592            let mut current = first_tx_num;
593            for block in &blocks {
594                nums.push(current);
595                current += block.recovered_block().body().transaction_count() as u64;
596            }
597            nums
598        };
599
600        let mut timings =
601            metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
602
603        // avoid capturing &self.tx in scope below.
604        let sf_provider = &self.static_file_provider;
605        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
606        let rocksdb_provider = self.rocksdb_provider.clone();
607        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
608        let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
609
610        let mut sf_result = None;
611        let mut rocksdb_result = None;
612
613        // Write to all backends in parallel.
614        let runtime = &self.runtime;
615        // Propagate tracing context into rayon-spawned threads so that static file
616        // and RocksDB write spans appear as children of save_blocks in traces.
617        let span = tracing::Span::current();
618        runtime.storage_pool().in_place_scope(|s| {
619            // SF writes
620            s.spawn(|_| {
621                let _guard = span.enter();
622                let start = Instant::now();
623                sf_result = Some(
624                    sf_provider
625                        .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
626                        .map(|()| start.elapsed()),
627                );
628            });
629
630            // RocksDB writes
631            if rocksdb_enabled {
632                s.spawn(|_| {
633                    let _guard = span.enter();
634                    let start = Instant::now();
635                    rocksdb_result = Some(
636                        rocksdb_provider
637                            .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
638                            .map(|()| start.elapsed()),
639                    );
640                });
641            }
642
643            // MDBX writes
644            let mdbx_start = Instant::now();
645
646            // Collect all transaction hashes across all blocks, sort them, and write in batch
647            if !self.cached_storage_settings().storage_v2 &&
648                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
649            {
650                let start = Instant::now();
651                let total_tx_count: usize =
652                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
653                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
654                for (i, block) in blocks.iter().enumerate() {
655                    let recovered_block = block.recovered_block();
656                    for (tx_num, transaction) in
657                        (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
658                    {
659                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
660                    }
661                }
662
663                // Sort by hash for optimal MDBX insertion performance
664                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
665
666                // Write all transaction hash numbers in a single batch
667                self.with_rocksdb_batch(|batch| {
668                    let mut tx_hash_writer =
669                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
670                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
671                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
672                    Ok(((), raw_batch))
673                })?;
674                self.metrics.record_duration(
675                    metrics::Action::InsertTransactionHashNumbers,
676                    start.elapsed(),
677                );
678            }
679
680            for (i, block) in blocks.iter().enumerate() {
681                let recovered_block = block.recovered_block();
682
683                let start = Instant::now();
684                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
685                timings.insert_block += start.elapsed();
686
687                if save_mode.with_state() {
688                    let execution_output = block.execution_outcome();
689
690                    // Write state and changesets to the database.
691                    // Must be written after blocks because of the receipt lookup.
692                    // Skip receipts/account changesets if they're being written to static files.
693                    let start = Instant::now();
694                    self.write_state(
695                        WriteStateInput::Single {
696                            outcome: execution_output,
697                            block: recovered_block.number(),
698                        },
699                        OriginalValuesKnown::No,
700                        StateWriteConfig {
701                            write_receipts: !sf_ctx.write_receipts,
702                            write_account_changesets: !sf_ctx.write_account_changesets,
703                            write_storage_changesets: !sf_ctx.write_storage_changesets,
704                        },
705                    )?;
706                    timings.write_state += start.elapsed();
707                }
708            }
709
710            // Write all hashed state and trie updates in single batches.
711            // This reduces cursor open/close overhead from N calls to 1.
712            if save_mode.with_state() {
713                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
714                let start = Instant::now();
715                let merged_hashed_state = HashedPostStateSorted::merge_batch(
716                    blocks.iter().rev().map(|b| b.trie_data().hashed_state),
717                );
718                if !merged_hashed_state.is_empty() {
719                    self.write_hashed_state(&merged_hashed_state)?;
720                }
721                timings.write_hashed_state += start.elapsed();
722
723                let start = Instant::now();
724                let merged_trie =
725                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
726                if !merged_trie.is_empty() {
727                    self.write_trie_updates_sorted(&merged_trie)?;
728                }
729                timings.write_trie_updates += start.elapsed();
730            }
731
732            // Full mode: update history indices
733            if save_mode.with_state() {
734                let start = Instant::now();
735                self.update_history_indices(first_number..=last_block_number)?;
736                timings.update_history_indices = start.elapsed();
737            }
738
739            // Update pipeline progress
740            let start = Instant::now();
741            self.update_pipeline_stages(last_block_number, false)?;
742            timings.update_pipeline_stages = start.elapsed();
743
744            timings.mdbx = mdbx_start.elapsed();
745
746            Ok::<_, ProviderError>(())
747        })?;
748
749        // Collect results from spawned tasks
750        timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
751
752        if rocksdb_enabled {
753            timings.rocksdb = rocksdb_result.ok_or_else(|| {
754                ProviderError::Database(reth_db_api::DatabaseError::Other(
755                    "RocksDB thread panicked".into(),
756                ))
757            })??;
758        }
759
760        timings.total = total_start.elapsed();
761
762        self.metrics.record_save_blocks(&timings);
763        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
764
765        Ok(())
766    }
767
768    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
769    ///
770    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
771    #[instrument(level = "debug", target = "providers::db", skip_all)]
772    fn insert_block_mdbx_only(
773        &self,
774        block: &RecoveredBlock<BlockTy<N>>,
775        first_tx_num: TxNumber,
776    ) -> ProviderResult<StoredBlockBodyIndices> {
777        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
778            EitherWriterDestination::senders(self).is_database()
779        {
780            let start = Instant::now();
781            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
782            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
783            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
784                cursor.append(tx_num, &sender)?;
785            }
786            self.metrics
787                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
788        }
789
790        let block_number = block.number();
791        let tx_count = block.body().transaction_count() as u64;
792
793        let start = Instant::now();
794        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
795        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
796
797        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
798
799        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
800    }
801
802    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
803    /// `Ommers`/`Withdrawals`).
804    fn write_block_body_indices(
805        &self,
806        block_number: BlockNumber,
807        body: &BodyTy<N>,
808        first_tx_num: TxNumber,
809        tx_count: u64,
810    ) -> ProviderResult<()> {
811        // MDBX: BlockBodyIndices
812        let start = Instant::now();
813        self.tx
814            .cursor_write::<tables::BlockBodyIndices>()?
815            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
816        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
817
818        // MDBX: TransactionBlocks (last tx -> block mapping)
819        if tx_count > 0 {
820            let start = Instant::now();
821            self.tx
822                .cursor_write::<tables::TransactionBlocks>()?
823                .append(first_tx_num + tx_count - 1, &block_number)?;
824            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
825        }
826
827        // MDBX: Ommers/Withdrawals
828        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
829
830        Ok(())
831    }
832
833    /// Unwinds trie state starting at and including the given block.
834    ///
835    /// This includes calculating the resulted state root and comparing it with the parent block
836    /// state root.
837    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
838        let changed_accounts = self.account_changesets_range(from..)?;
839
840        // Unwind account hashes.
841        self.unwind_account_hashing(changed_accounts.iter())?;
842
843        // Unwind account history indices.
844        self.unwind_account_history_indices(changed_accounts.iter())?;
845
846        let changed_storages = self.storage_changesets_range(from..)?;
847
848        // Unwind storage hashes.
849        self.unwind_storage_hashing(changed_storages.iter().copied())?;
850
851        // Unwind storage history indices.
852        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
853
854        // Unwind accounts/storages trie tables using the revert.
855        // Get the database tip block number
856        let db_tip_block = self
857            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
858            .as_ref()
859            .map(|chk| chk.block_number)
860            .ok_or_else(|| ProviderError::InsufficientChangesets {
861                requested: from,
862                available: 0..=0,
863            })?;
864
865        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
866        self.write_trie_updates_sorted(&trie_revert)?;
867
868        Ok(())
869    }
870
871    /// Removes receipts from all transactions starting with provided number (inclusive).
872    fn remove_receipts_from(
873        &self,
874        from_tx: TxNumber,
875        last_block: BlockNumber,
876    ) -> ProviderResult<()> {
877        // iterate over block body and remove receipts
878        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
879
880        if EitherWriter::receipts_destination(self).is_static_file() {
881            let static_file_receipt_num =
882                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
883
884            let to_delete = static_file_receipt_num
885                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
886                .unwrap_or_default();
887
888            self.static_file_provider
889                .latest_writer(StaticFileSegment::Receipts)?
890                .prune_receipts(to_delete, last_block)?;
891        }
892
893        Ok(())
894    }
895
896    /// Writes bytecodes to MDBX.
897    fn write_bytecodes(
898        &self,
899        bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
900    ) -> ProviderResult<()> {
901        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
902        for (hash, bytecode) in bytecodes {
903            bytecodes_cursor.upsert(hash, &bytecode)?;
904        }
905        Ok(())
906    }
907}
908
909impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
910    fn try_into_history_at_block(
911        self,
912        mut block_number: BlockNumber,
913    ) -> ProviderResult<StateProviderBox> {
914        let best_block = self.best_block_number().unwrap_or_default();
915
916        // Reject requests for blocks beyond the best block
917        if block_number > best_block {
918            return Err(ProviderError::BlockNotExecuted {
919                requested: block_number,
920                executed: best_block,
921            });
922        }
923
924        // If requesting state at the best block, use the latest state provider
925        if block_number == best_block {
926            return Ok(Box::new(LatestStateProvider::new(self)));
927        }
928
929        // +1 as the changeset that we want is the one that was applied after this block.
930        block_number += 1;
931
932        let account_history_prune_checkpoint =
933            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
934        let storage_history_prune_checkpoint =
935            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
936
937        let mut state_provider = HistoricalStateProvider::new(self, block_number);
938
939        // If we pruned account or storage history, we can't return state on every historical block.
940        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
941        if let Some(prune_checkpoint_block_number) =
942            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
943        {
944            state_provider = state_provider.with_lowest_available_account_history_block_number(
945                prune_checkpoint_block_number + 1,
946            );
947        }
948        if let Some(prune_checkpoint_block_number) =
949            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
950        {
951            state_provider = state_provider.with_lowest_available_storage_history_block_number(
952                prune_checkpoint_block_number + 1,
953            );
954        }
955
956        Ok(Box::new(state_provider))
957    }
958}
959
960/// For a given key, unwind all history shards that contain block numbers at or above the given
961/// block number.
962///
963/// S - Sharded key subtype.
964/// T - Table to walk over.
965/// C - Cursor implementation.
966///
967/// This function walks the entries from the given start key and deletes all shards that belong to
968/// the key and contain block numbers at or above the given block number. Shards entirely below
969/// the block number are preserved.
970///
971/// The boundary shard (the shard that spans across the block number) is removed from the database.
972/// Any indices that are below the block number are filtered out and returned for reinsertion.
973/// The boundary shard is returned for reinsertion (if it's not empty).
974fn unwind_history_shards<S, T, C>(
975    cursor: &mut C,
976    start_key: T::Key,
977    block_number: BlockNumber,
978    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
979) -> ProviderResult<Vec<u64>>
980where
981    T: Table<Value = BlockNumberList>,
982    T::Key: AsRef<ShardedKey<S>>,
983    C: DbCursorRO<T> + DbCursorRW<T>,
984{
985    // Start from the given key and iterate through shards
986    let mut item = cursor.seek_exact(start_key)?;
987    while let Some((sharded_key, list)) = item {
988        // If the shard does not belong to the key, break.
989        if !shard_belongs_to_key(&sharded_key) {
990            break
991        }
992
993        // Always delete the current shard from the database first
994        // We'll decide later what (if anything) to reinsert
995        cursor.delete_current()?;
996
997        // Get the first (lowest) block number in this shard
998        // All block numbers in a shard are sorted in ascending order
999        let first = list.iter().next().expect("List can't be empty");
1000
1001        // Case 1: Entire shard is at or above the unwinding point
1002        // Keep it deleted (don't return anything for reinsertion)
1003        if first >= block_number {
1004            item = cursor.prev()?;
1005            continue
1006        }
1007        // Case 2: This is a boundary shard (spans across the unwinding point)
1008        // The shard contains some blocks below and some at/above the unwinding point
1009        else if block_number <= sharded_key.as_ref().highest_block_number {
1010            // Return only the block numbers that are below the unwinding point
1011            // These will be reinserted to preserve the historical data
1012            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1013        }
1014        // Case 3: Entire shard is below the unwinding point
1015        // Return all block numbers for reinsertion (preserve entire shard)
1016        return Ok(list.iter().collect::<Vec<_>>())
1017    }
1018
1019    // No shards found or all processed
1020    Ok(Vec::new())
1021}
1022
1023impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1024    /// Creates a provider with an inner read-only transaction.
1025    #[expect(clippy::too_many_arguments)]
1026    pub fn new(
1027        tx: TX,
1028        chain_spec: Arc<N::ChainSpec>,
1029        static_file_provider: StaticFileProvider<N::Primitives>,
1030        prune_modes: PruneModes,
1031        storage: Arc<N::Storage>,
1032        storage_settings: Arc<RwLock<StorageSettings>>,
1033        rocksdb_provider: RocksDBProvider,
1034        changeset_cache: ChangesetCache,
1035        runtime: reth_tasks::Runtime,
1036        db_path: PathBuf,
1037    ) -> Self {
1038        Self {
1039            tx,
1040            chain_spec,
1041            static_file_provider,
1042            prune_modes,
1043            storage,
1044            storage_settings,
1045            rocksdb_provider,
1046            changeset_cache,
1047            runtime,
1048            db_path,
1049            pending_rocksdb_batches: Default::default(),
1050            commit_order: CommitOrder::Normal,
1051            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1052            metrics: metrics::DatabaseProviderMetrics::default(),
1053            reader_txn_tracker: None,
1054        }
1055    }
1056
1057    /// Consume `DbTx` or `DbTxMut`.
1058    pub fn into_tx(self) -> TX {
1059        self.tx
1060    }
1061
1062    /// Pass `DbTx` or `DbTxMut` mutable reference.
1063    pub const fn tx_mut(&mut self) -> &mut TX {
1064        &mut self.tx
1065    }
1066
1067    /// Pass `DbTx` or `DbTxMut` immutable reference.
1068    pub const fn tx_ref(&self) -> &TX {
1069        &self.tx
1070    }
1071
1072    /// Returns a reference to the chain specification.
1073    pub fn chain_spec(&self) -> &N::ChainSpec {
1074        &self.chain_spec
1075    }
1076}
1077
1078impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1079    fn recovered_block<H, HF, B, BF>(
1080        &self,
1081        id: BlockHashOrNumber,
1082        _transaction_kind: TransactionVariant,
1083        header_by_number: HF,
1084        construct_block: BF,
1085    ) -> ProviderResult<Option<B>>
1086    where
1087        H: AsRef<HeaderTy<N>>,
1088        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1089        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1090    {
1091        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1092        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1093
1094        // Get the block body
1095        //
1096        // If the body indices are not found, this means that the transactions either do not exist
1097        // in the database yet, or they do exit but are not indexed. If they exist but are not
1098        // indexed, we don't have enough information to return the block anyways, so we return
1099        // `None`.
1100        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1101
1102        let tx_range = body.tx_num_range();
1103
1104        let transactions = if tx_range.is_empty() {
1105            vec![]
1106        } else {
1107            self.transactions_by_tx_range(tx_range.clone())?
1108        };
1109
1110        let body = self
1111            .storage
1112            .reader()
1113            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1114            .pop()
1115            .ok_or(ProviderError::InvalidStorageOutput)?;
1116
1117        let senders = if tx_range.is_empty() {
1118            vec![]
1119        } else {
1120            let known_senders: HashMap<TxNumber, Address> =
1121                EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1122
1123            let mut senders = Vec::with_capacity(body.transactions().len());
1124            for (tx_num, tx) in tx_range.zip(body.transactions()) {
1125                match known_senders.get(&tx_num) {
1126                    None => {
1127                        let sender = tx.recover_signer_unchecked()?;
1128                        senders.push(sender);
1129                    }
1130                    Some(sender) => senders.push(*sender),
1131                }
1132            }
1133            senders
1134        };
1135
1136        construct_block(header, body, senders)
1137    }
1138
1139    /// Returns a range of blocks from the database.
1140    ///
1141    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1142    /// construct blocks from the following inputs:
1143    ///     – Header
1144    ///     - Range of transaction numbers
1145    ///     – Ommers
1146    ///     – Withdrawals
1147    ///     – Senders
1148    fn block_range<F, H, HF, R>(
1149        &self,
1150        range: RangeInclusive<BlockNumber>,
1151        headers_range: HF,
1152        mut assemble_block: F,
1153    ) -> ProviderResult<Vec<R>>
1154    where
1155        H: AsRef<HeaderTy<N>>,
1156        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1157        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1158    {
1159        if range.is_empty() {
1160            return Ok(Vec::new())
1161        }
1162
1163        let len = range.end().saturating_sub(*range.start()) as usize + 1;
1164        let mut blocks = Vec::with_capacity(len);
1165
1166        let headers = headers_range(range.clone())?;
1167
1168        // If the body indices are not found, this means that the transactions either do
1169        // not exist in the database yet, or they do exit but are
1170        // not indexed. If they exist but are not indexed, we don't
1171        // have enough information to return the block anyways, so
1172        // we skip the block.
1173        let present_headers = self
1174            .block_body_indices_range(range)?
1175            .into_iter()
1176            .map(|b| b.tx_num_range())
1177            .zip(headers)
1178            .collect::<Vec<_>>();
1179
1180        let mut inputs = Vec::with_capacity(present_headers.len());
1181        for (tx_range, header) in &present_headers {
1182            let transactions = if tx_range.is_empty() {
1183                Vec::new()
1184            } else {
1185                self.transactions_by_tx_range(tx_range.clone())?
1186            };
1187
1188            inputs.push((header.as_ref(), transactions));
1189        }
1190
1191        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1192
1193        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1194            blocks.push(assemble_block(header, body, tx_range)?);
1195        }
1196
1197        Ok(blocks)
1198    }
1199
1200    /// Returns a range of blocks from the database, along with the senders of each
1201    /// transaction in the blocks.
1202    ///
1203    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1204    /// construct blocks from the following inputs:
1205    ///     – Header
1206    ///     - Transactions
1207    ///     – Ommers
1208    ///     – Withdrawals
1209    ///     – Senders
1210    fn block_with_senders_range<H, HF, B, BF>(
1211        &self,
1212        range: RangeInclusive<BlockNumber>,
1213        headers_range: HF,
1214        assemble_block: BF,
1215    ) -> ProviderResult<Vec<B>>
1216    where
1217        H: AsRef<HeaderTy<N>>,
1218        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1219        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1220    {
1221        self.block_range(range, headers_range, |header, body, tx_range| {
1222            let senders = if tx_range.is_empty() {
1223                Vec::new()
1224            } else {
1225                let known_senders: HashMap<TxNumber, Address> =
1226                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1227
1228                let mut senders = Vec::with_capacity(body.transactions().len());
1229                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1230                    match known_senders.get(&tx_num) {
1231                        None => {
1232                            // recover the sender from the transaction if not found
1233                            let sender = tx.recover_signer_unchecked()?;
1234                            senders.push(sender);
1235                        }
1236                        Some(sender) => senders.push(*sender),
1237                    }
1238                }
1239
1240                senders
1241            };
1242
1243            assemble_block(header, body, senders)
1244        })
1245    }
1246
1247    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1248    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
1249    /// account changesets.
1250    fn populate_bundle_state<A, S>(
1251        &self,
1252        account_changeset: Vec<(u64, AccountBeforeTx)>,
1253        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1254        plain_accounts_cursor: &mut A,
1255        plain_storage_cursor: &mut S,
1256    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1257    where
1258        A: DbCursorRO<PlainAccountState>,
1259        S: DbDupCursorRO<PlainStorageState>,
1260    {
1261        // iterate previous value and get plain state value to create changeset
1262        // Double option around Account represent if Account state is know (first option) and
1263        // account is removed (Second Option)
1264        let mut state: BundleStateInit = HashMap::default();
1265
1266        // This is not working for blocks that are not at tip. as plain state is not the last
1267        // state of end range. We should rename the functions or add support to access
1268        // History state. Accessing history state can be tricky but we are not gaining
1269        // anything.
1270
1271        let mut reverts: RevertsInit = HashMap::default();
1272
1273        // add account changeset changes
1274        for (block_number, account_before) in account_changeset.into_iter().rev() {
1275            let AccountBeforeTx { info: old_info, address } = account_before;
1276            match state.entry(address) {
1277                hash_map::Entry::Vacant(entry) => {
1278                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1279                    entry.insert((old_info, new_info, HashMap::default()));
1280                }
1281                hash_map::Entry::Occupied(mut entry) => {
1282                    // overwrite old account state.
1283                    entry.get_mut().0 = old_info;
1284                }
1285            }
1286            // insert old info into reverts.
1287            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1288        }
1289
1290        // add storage changeset changes
1291        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1292            let BlockNumberAddress((block_number, address)) = block_and_address;
1293            // get account state or insert from plain state.
1294            let account_state = match state.entry(address) {
1295                hash_map::Entry::Vacant(entry) => {
1296                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1297                    entry.insert((present_info, present_info, HashMap::default()))
1298                }
1299                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1300            };
1301
1302            // match storage.
1303            match account_state.2.entry(old_storage.key) {
1304                hash_map::Entry::Vacant(entry) => {
1305                    let new_storage = plain_storage_cursor
1306                        .seek_by_key_subkey(address, old_storage.key)?
1307                        .filter(|storage| storage.key == old_storage.key)
1308                        .unwrap_or_default();
1309                    entry.insert((old_storage.value, new_storage.value));
1310                }
1311                hash_map::Entry::Occupied(mut entry) => {
1312                    entry.get_mut().0 = old_storage.value;
1313                }
1314            };
1315
1316            reverts
1317                .entry(block_number)
1318                .or_default()
1319                .entry(address)
1320                .or_default()
1321                .1
1322                .push(old_storage);
1323        }
1324
1325        Ok((state, reverts))
1326    }
1327
1328    /// Like [`populate_bundle_state`](Self::populate_bundle_state), but reads current values from
1329    /// `HashedAccounts`/`HashedStorages`. Addresses and storage keys are hashed via `keccak256`
1330    /// for DB lookups. The output `BundleStateInit`/`RevertsInit` structures remain keyed by
1331    /// plain address and plain storage key.
1332    fn populate_bundle_state_hashed(
1333        &self,
1334        account_changeset: Vec<(u64, AccountBeforeTx)>,
1335        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1336        hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1337        hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1338    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1339        let mut state: BundleStateInit = HashMap::default();
1340        let mut reverts: RevertsInit = HashMap::default();
1341
1342        // add account changeset changes
1343        for (block_number, account_before) in account_changeset.into_iter().rev() {
1344            let AccountBeforeTx { info: old_info, address } = account_before;
1345            match state.entry(address) {
1346                hash_map::Entry::Vacant(entry) => {
1347                    let hashed_address = keccak256(address);
1348                    let new_info =
1349                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1350                    entry.insert((old_info, new_info, HashMap::default()));
1351                }
1352                hash_map::Entry::Occupied(mut entry) => {
1353                    entry.get_mut().0 = old_info;
1354                }
1355            }
1356            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1357        }
1358
1359        // add storage changeset changes
1360        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1361            let BlockNumberAddress((block_number, address)) = block_and_address;
1362            let account_state = match state.entry(address) {
1363                hash_map::Entry::Vacant(entry) => {
1364                    let hashed_address = keccak256(address);
1365                    let present_info =
1366                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1367                    entry.insert((present_info, present_info, HashMap::default()))
1368                }
1369                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1370            };
1371
1372            // Storage keys in changesets are plain; hash them for HashedStorages lookup.
1373            let hashed_storage_key = keccak256(old_storage.key);
1374            match account_state.2.entry(old_storage.key) {
1375                hash_map::Entry::Vacant(entry) => {
1376                    let hashed_address = keccak256(address);
1377                    let new_storage = hashed_storage_cursor
1378                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
1379                        .filter(|storage| storage.key == hashed_storage_key)
1380                        .unwrap_or_default();
1381                    entry.insert((old_storage.value, new_storage.value));
1382                }
1383                hash_map::Entry::Occupied(mut entry) => {
1384                    entry.get_mut().0 = old_storage.value;
1385                }
1386            };
1387
1388            reverts
1389                .entry(block_number)
1390                .or_default()
1391                .entry(address)
1392                .or_default()
1393                .1
1394                .push(old_storage);
1395        }
1396
1397        Ok((state, reverts))
1398    }
1399}
1400
1401impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1402    /// Insert history index to the database.
1403    ///
1404    /// For each updated partial key, this function retrieves the last shard from the database
1405    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1406    /// the shards back into the database.
1407    ///
1408    /// This function is used by history indexing stages.
1409    fn append_history_index<P, T>(
1410        &self,
1411        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1412        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1413    ) -> ProviderResult<()>
1414    where
1415        P: Copy,
1416        T: Table<Value = BlockNumberList>,
1417    {
1418        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1419        // will append duplicate entries instead of updating existing ones, causing data corruption.
1420        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1421
1422        let mut cursor = self.tx.cursor_write::<T>()?;
1423
1424        for (partial_key, indices) in index_updates {
1425            let last_key = sharded_key_factory(partial_key, u64::MAX);
1426            let mut last_shard = cursor
1427                .seek_exact(last_key.clone())?
1428                .map(|(_, list)| list)
1429                .unwrap_or_else(BlockNumberList::empty);
1430
1431            last_shard.append(indices).map_err(ProviderError::other)?;
1432
1433            // fast path: all indices fit in one shard
1434            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1435                cursor.upsert(last_key, &last_shard)?;
1436                continue;
1437            }
1438
1439            // slow path: rechunk into multiple shards
1440            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1441            let mut chunks_peekable = chunks.into_iter().peekable();
1442
1443            while let Some(chunk) = chunks_peekable.next() {
1444                let shard = BlockNumberList::new_pre_sorted(chunk);
1445                let highest_block_number = if chunks_peekable.peek().is_some() {
1446                    shard.iter().next_back().expect("`chunks` does not return empty list")
1447                } else {
1448                    // Insert last list with `u64::MAX`.
1449                    u64::MAX
1450                };
1451
1452                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1453            }
1454        }
1455
1456        Ok(())
1457    }
1458}
1459
1460impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1461    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1462        if self.cached_storage_settings().use_hashed_state() {
1463            let hashed_address = keccak256(address);
1464            Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1465        } else {
1466            Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1467        }
1468    }
1469}
1470
1471impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1472    fn changed_accounts_with_range(
1473        &self,
1474        range: RangeInclusive<BlockNumber>,
1475    ) -> ProviderResult<BTreeSet<Address>> {
1476        let mut reader = EitherReader::new_account_changesets(self)?;
1477
1478        reader.changed_accounts_with_range(range)
1479    }
1480
1481    fn basic_accounts(
1482        &self,
1483        iter: impl IntoIterator<Item = Address>,
1484    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1485        if self.cached_storage_settings().use_hashed_state() {
1486            let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1487            Ok(iter
1488                .into_iter()
1489                .map(|address| {
1490                    let hashed_address = keccak256(address);
1491                    hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1492                })
1493                .collect::<Result<Vec<_>, _>>()?)
1494        } else {
1495            let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1496            Ok(iter
1497                .into_iter()
1498                .map(|address| {
1499                    plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1500                })
1501                .collect::<Result<Vec<_>, _>>()?)
1502        }
1503    }
1504
1505    fn changed_accounts_and_blocks_with_range(
1506        &self,
1507        range: RangeInclusive<BlockNumber>,
1508    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1509        let highest_static_block = self
1510            .static_file_provider
1511            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1512
1513        if let Some(highest) = highest_static_block &&
1514            self.cached_storage_settings().storage_v2
1515        {
1516            let start = *range.start();
1517            let static_end = (*range.end()).min(highest);
1518
1519            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1520            if start <= static_end {
1521                for block in start..=static_end {
1522                    let block_changesets = self.account_block_changeset(block)?;
1523                    for changeset in block_changesets {
1524                        changed_accounts_and_blocks
1525                            .entry(changeset.address)
1526                            .or_default()
1527                            .push(block);
1528                    }
1529                }
1530            }
1531
1532            Ok(changed_accounts_and_blocks)
1533        } else {
1534            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1535
1536            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1537                BTreeMap::new(),
1538                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1539                    let (index, account) = entry?;
1540                    accounts.entry(account.address).or_default().push(index);
1541                    Ok(accounts)
1542                },
1543            )?;
1544
1545            Ok(account_transitions)
1546        }
1547    }
1548}
1549
1550impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1551    fn storage_changeset(
1552        &self,
1553        block_number: BlockNumber,
1554    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1555        if self.cached_storage_settings().storage_v2 {
1556            self.static_file_provider.storage_changeset(block_number)
1557        } else {
1558            let range = block_number..=block_number;
1559            let storage_range = BlockNumberAddress::range(range);
1560            self.tx
1561                .cursor_dup_read::<tables::StorageChangeSets>()?
1562                .walk_range(storage_range)?
1563                .map(|r| {
1564                    let (bna, entry) = r?;
1565                    Ok((bna, entry))
1566                })
1567                .collect()
1568        }
1569    }
1570
1571    fn get_storage_before_block(
1572        &self,
1573        block_number: BlockNumber,
1574        address: Address,
1575        storage_key: B256,
1576    ) -> ProviderResult<Option<StorageEntry>> {
1577        if self.cached_storage_settings().storage_v2 {
1578            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1579        } else {
1580            Ok(self
1581                .tx
1582                .cursor_dup_read::<tables::StorageChangeSets>()?
1583                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1584                .filter(|entry| entry.key == storage_key))
1585        }
1586    }
1587
1588    fn storage_changesets_range(
1589        &self,
1590        range: impl RangeBounds<BlockNumber>,
1591    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1592        if self.cached_storage_settings().storage_v2 {
1593            self.static_file_provider.storage_changesets_range(range)
1594        } else {
1595            self.tx
1596                .cursor_dup_read::<tables::StorageChangeSets>()?
1597                .walk_range(BlockNumberAddressRange::from(range))?
1598                .map(|r| {
1599                    let (bna, entry) = r?;
1600                    Ok((bna, entry))
1601                })
1602                .collect()
1603        }
1604    }
1605}
1606
1607impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1608    fn account_block_changeset(
1609        &self,
1610        block_number: BlockNumber,
1611    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1612        if self.cached_storage_settings().storage_v2 {
1613            let static_changesets =
1614                self.static_file_provider.account_block_changeset(block_number)?;
1615            Ok(static_changesets)
1616        } else {
1617            let range = block_number..=block_number;
1618            self.tx
1619                .cursor_read::<tables::AccountChangeSets>()?
1620                .walk_range(range)?
1621                .map(|result| -> ProviderResult<_> {
1622                    let (_, account_before) = result?;
1623                    Ok(account_before)
1624                })
1625                .collect()
1626        }
1627    }
1628
1629    fn get_account_before_block(
1630        &self,
1631        block_number: BlockNumber,
1632        address: Address,
1633    ) -> ProviderResult<Option<AccountBeforeTx>> {
1634        if self.cached_storage_settings().storage_v2 {
1635            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1636        } else {
1637            self.tx
1638                .cursor_dup_read::<tables::AccountChangeSets>()?
1639                .seek_by_key_subkey(block_number, address)?
1640                .filter(|acc| acc.address == address)
1641                .map(Ok)
1642                .transpose()
1643        }
1644    }
1645
1646    fn account_changesets_range(
1647        &self,
1648        range: impl core::ops::RangeBounds<BlockNumber>,
1649    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1650        if self.cached_storage_settings().storage_v2 {
1651            self.static_file_provider.account_changesets_range(range)
1652        } else {
1653            self.tx
1654                .cursor_read::<tables::AccountChangeSets>()?
1655                .walk_range(to_range(range))?
1656                .map(|r| r.map_err(Into::into))
1657                .collect()
1658        }
1659    }
1660}
1661
1662impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1663    for DatabaseProvider<TX, N>
1664{
1665    type Header = HeaderTy<N>;
1666
1667    fn local_tip_header(
1668        &self,
1669        highest_uninterrupted_block: BlockNumber,
1670    ) -> ProviderResult<SealedHeader<Self::Header>> {
1671        let static_file_provider = self.static_file_provider();
1672
1673        // Make sure Headers static file is at the same height. If it's further, this
1674        // input execution was interrupted previously and we need to unwind the static file.
1675        let next_static_file_block_num = static_file_provider
1676            .get_highest_static_file_block(StaticFileSegment::Headers)
1677            .map(|id| id + 1)
1678            .unwrap_or_default();
1679        let next_block = highest_uninterrupted_block + 1;
1680
1681        match next_static_file_block_num.cmp(&next_block) {
1682            // The node shutdown between an executed static file commit and before the database
1683            // commit, so we need to unwind the static files.
1684            Ordering::Greater => {
1685                let mut static_file_producer =
1686                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1687                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1688                // Since this is a database <-> static file inconsistency, we commit the change
1689                // straight away.
1690                static_file_producer.commit()?
1691            }
1692            Ordering::Less => {
1693                // There's either missing or corrupted files.
1694                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1695            }
1696            Ordering::Equal => {}
1697        }
1698
1699        let local_head = static_file_provider
1700            .sealed_header(highest_uninterrupted_block)?
1701            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1702
1703        Ok(local_head)
1704    }
1705}
1706
1707impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1708    type Header = HeaderTy<N>;
1709
1710    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1711        if let Some(num) = self.block_number(block_hash)? {
1712            Ok(self.header_by_number(num)?)
1713        } else {
1714            Ok(None)
1715        }
1716    }
1717
1718    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1719        self.static_file_provider.header_by_number(num)
1720    }
1721
1722    fn headers_range(
1723        &self,
1724        range: impl RangeBounds<BlockNumber>,
1725    ) -> ProviderResult<Vec<Self::Header>> {
1726        self.static_file_provider.headers_range(range)
1727    }
1728
1729    fn sealed_header(
1730        &self,
1731        number: BlockNumber,
1732    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1733        self.static_file_provider.sealed_header(number)
1734    }
1735
1736    fn sealed_headers_while(
1737        &self,
1738        range: impl RangeBounds<BlockNumber>,
1739        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1740    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1741        self.static_file_provider.sealed_headers_while(range, predicate)
1742    }
1743}
1744
1745impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1746    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1747        self.static_file_provider.block_hash(number)
1748    }
1749
1750    fn canonical_hashes_range(
1751        &self,
1752        start: BlockNumber,
1753        end: BlockNumber,
1754    ) -> ProviderResult<Vec<B256>> {
1755        self.static_file_provider.canonical_hashes_range(start, end)
1756    }
1757}
1758
1759impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1760    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1761        let best_number = self.best_block_number()?;
1762        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1763        Ok(ChainInfo { best_hash, best_number })
1764    }
1765
1766    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1767        // The best block number is tracked via the finished stage which gets updated in the same tx
1768        // when new blocks committed
1769        Ok(self
1770            .get_stage_checkpoint(StageId::Finish)?
1771            .map(|checkpoint| checkpoint.block_number)
1772            .unwrap_or_default())
1773    }
1774
1775    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1776        self.static_file_provider.last_block_number()
1777    }
1778
1779    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1780        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1781    }
1782}
1783
1784impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1785    type Block = BlockTy<N>;
1786
1787    fn find_block_by_hash(
1788        &self,
1789        hash: B256,
1790        source: BlockSource,
1791    ) -> ProviderResult<Option<Self::Block>> {
1792        if source.is_canonical() {
1793            self.block(hash.into())
1794        } else {
1795            Ok(None)
1796        }
1797    }
1798
1799    /// Returns the block with matching number from database.
1800    ///
1801    /// If the header for this block is not found, this returns `None`.
1802    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1803    /// will return None.
1804    ///
1805    /// Returns an error if the requested block is below the earliest available history.
1806    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1807        if let Some(number) = self.convert_hash_or_number(id)? {
1808            let earliest_available = self.static_file_provider.earliest_history_height();
1809            if number < earliest_available {
1810                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1811            }
1812
1813            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1814
1815            // If the body indices are not found, this means that the transactions either do not
1816            // exist in the database yet, or they do exit but are not indexed.
1817            // If they exist but are not indexed, we don't have enough
1818            // information to return the block anyways, so we return `None`.
1819            let Some(transactions) = self.transactions_by_block(number.into())? else {
1820                return Ok(None)
1821            };
1822
1823            let body = self
1824                .storage
1825                .reader()
1826                .read_block_bodies(self, vec![(&header, transactions)])?
1827                .pop()
1828                .ok_or(ProviderError::InvalidStorageOutput)?;
1829
1830            return Ok(Some(Self::Block::new(header, body)))
1831        }
1832
1833        Ok(None)
1834    }
1835
1836    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1837        Ok(None)
1838    }
1839
1840    fn pending_block_and_receipts(
1841        &self,
1842    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1843        Ok(None)
1844    }
1845
1846    /// Returns the block with senders with matching number or hash from database.
1847    ///
1848    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1849    /// spot, and we want fast querying.**
1850    ///
1851    /// If the header for this block is not found, this returns `None`.
1852    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1853    /// will return None.
1854    fn recovered_block(
1855        &self,
1856        id: BlockHashOrNumber,
1857        transaction_kind: TransactionVariant,
1858    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1859        self.recovered_block(
1860            id,
1861            transaction_kind,
1862            |block_number| self.header_by_number(block_number),
1863            |header, body, senders| {
1864                Self::Block::new(header, body)
1865                    // Note: we're using unchecked here because we know the block contains valid txs
1866                    // wrt to its height and can ignore the s value check so pre
1867                    // EIP-2 txs are allowed
1868                    .try_into_recovered_unchecked(senders)
1869                    .map(Some)
1870                    .map_err(|_| ProviderError::SenderRecoveryError)
1871            },
1872        )
1873    }
1874
1875    fn sealed_block_with_senders(
1876        &self,
1877        id: BlockHashOrNumber,
1878        transaction_kind: TransactionVariant,
1879    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1880        self.recovered_block(
1881            id,
1882            transaction_kind,
1883            |block_number| self.sealed_header(block_number),
1884            |header, body, senders| {
1885                Self::Block::new_sealed(header, body)
1886                    // Note: we're using unchecked here because we know the block contains valid txs
1887                    // wrt to its height and can ignore the s value check so pre
1888                    // EIP-2 txs are allowed
1889                    .try_with_senders_unchecked(senders)
1890                    .map(Some)
1891                    .map_err(|_| ProviderError::SenderRecoveryError)
1892            },
1893        )
1894    }
1895
1896    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1897        self.block_range(
1898            range,
1899            |range| self.headers_range(range),
1900            |header, body, _| Ok(Self::Block::new(header, body)),
1901        )
1902    }
1903
1904    fn block_with_senders_range(
1905        &self,
1906        range: RangeInclusive<BlockNumber>,
1907    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1908        self.block_with_senders_range(
1909            range,
1910            |range| self.headers_range(range),
1911            |header, body, senders| {
1912                Self::Block::new(header, body)
1913                    .try_into_recovered_unchecked(senders)
1914                    .map_err(|_| ProviderError::SenderRecoveryError)
1915            },
1916        )
1917    }
1918
1919    fn recovered_block_range(
1920        &self,
1921        range: RangeInclusive<BlockNumber>,
1922    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1923        self.block_with_senders_range(
1924            range,
1925            |range| self.sealed_headers_range(range),
1926            |header, body, senders| {
1927                Self::Block::new_sealed(header, body)
1928                    .try_with_senders(senders)
1929                    .map_err(|_| ProviderError::SenderRecoveryError)
1930            },
1931        )
1932    }
1933
1934    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1935        Ok(self
1936            .tx
1937            .cursor_read::<tables::TransactionBlocks>()?
1938            .seek(id)
1939            .map(|b| b.map(|(_, bn)| bn))?)
1940    }
1941}
1942
1943impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1944    for DatabaseProvider<TX, N>
1945{
1946    /// Recovers transaction hashes by walking through `Transactions` table and
1947    /// calculating them in a parallel manner. Returned unsorted.
1948    fn transaction_hashes_by_range(
1949        &self,
1950        tx_range: Range<TxNumber>,
1951    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1952        self.static_file_provider.transaction_hashes_by_range(tx_range)
1953    }
1954}
1955
1956// Calculates the hash of the given transaction
1957impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1958    type Transaction = TxTy<N>;
1959
1960    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1961        self.with_rocksdb_snapshot(|rocksdb_ref| {
1962            let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1963            reader.get_transaction_hash_number(tx_hash)
1964        })
1965    }
1966
1967    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1968        self.static_file_provider.transaction_by_id(id)
1969    }
1970
1971    fn transaction_by_id_unhashed(
1972        &self,
1973        id: TxNumber,
1974    ) -> ProviderResult<Option<Self::Transaction>> {
1975        self.static_file_provider.transaction_by_id_unhashed(id)
1976    }
1977
1978    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1979        if let Some(id) = self.transaction_id(hash)? {
1980            Ok(self.transaction_by_id_unhashed(id)?)
1981        } else {
1982            Ok(None)
1983        }
1984    }
1985
1986    fn transaction_by_hash_with_meta(
1987        &self,
1988        tx_hash: TxHash,
1989    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1990        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1991            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1992            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1993            let Some(sealed_header) = self.sealed_header(block_number)?
1994        {
1995            let (header, block_hash) = sealed_header.split();
1996            if let Some(block_body) = self.block_body_indices(block_number)? {
1997                // the index of the tx in the block is the offset:
1998                // len([start..tx_id])
1999                // NOTE: `transaction_id` is always `>=` the block's first
2000                // index
2001                let index = transaction_id - block_body.first_tx_num();
2002
2003                let meta = TransactionMeta {
2004                    tx_hash,
2005                    index,
2006                    block_hash,
2007                    block_number,
2008                    base_fee: header.base_fee_per_gas(),
2009                    excess_blob_gas: header.excess_blob_gas(),
2010                    timestamp: header.timestamp(),
2011                };
2012
2013                return Ok(Some((transaction, meta)))
2014            }
2015        }
2016
2017        Ok(None)
2018    }
2019
2020    fn transactions_by_block(
2021        &self,
2022        id: BlockHashOrNumber,
2023    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2024        if let Some(block_number) = self.convert_hash_or_number(id)? &&
2025            let Some(body) = self.block_body_indices(block_number)?
2026        {
2027            let tx_range = body.tx_num_range();
2028            return if tx_range.is_empty() {
2029                Ok(Some(Vec::new()))
2030            } else {
2031                self.transactions_by_tx_range(tx_range).map(Some)
2032            }
2033        }
2034        Ok(None)
2035    }
2036
2037    fn transactions_by_block_range(
2038        &self,
2039        range: impl RangeBounds<BlockNumber>,
2040    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2041        let range = to_range(range);
2042
2043        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2044            .into_iter()
2045            .map(|body| {
2046                let tx_num_range = body.tx_num_range();
2047                if tx_num_range.is_empty() {
2048                    Ok(Vec::new())
2049                } else {
2050                    self.transactions_by_tx_range(tx_num_range)
2051                }
2052            })
2053            .collect()
2054    }
2055
2056    fn transactions_by_tx_range(
2057        &self,
2058        range: impl RangeBounds<TxNumber>,
2059    ) -> ProviderResult<Vec<Self::Transaction>> {
2060        self.static_file_provider.transactions_by_tx_range(range)
2061    }
2062
2063    fn senders_by_tx_range(
2064        &self,
2065        range: impl RangeBounds<TxNumber>,
2066    ) -> ProviderResult<Vec<Address>> {
2067        if EitherWriterDestination::senders(self).is_static_file() {
2068            self.static_file_provider.senders_by_tx_range(range)
2069        } else {
2070            self.cursor_read_collect::<tables::TransactionSenders>(range)
2071        }
2072    }
2073
2074    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2075        if EitherWriterDestination::senders(self).is_static_file() {
2076            self.static_file_provider.transaction_sender(id)
2077        } else {
2078            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2079        }
2080    }
2081}
2082
2083impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2084    type Receipt = ReceiptTy<N>;
2085
2086    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2087        self.static_file_provider.get_with_static_file_or_database(
2088            StaticFileSegment::Receipts,
2089            id,
2090            |static_file| static_file.receipt(id),
2091            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2092        )
2093    }
2094
2095    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2096        if let Some(id) = self.transaction_id(hash)? {
2097            self.receipt(id)
2098        } else {
2099            Ok(None)
2100        }
2101    }
2102
2103    fn receipts_by_block(
2104        &self,
2105        block: BlockHashOrNumber,
2106    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2107        if let Some(number) = self.convert_hash_or_number(block)? &&
2108            let Some(body) = self.block_body_indices(number)?
2109        {
2110            let tx_range = body.tx_num_range();
2111            return if tx_range.is_empty() {
2112                Ok(Some(Vec::new()))
2113            } else {
2114                self.receipts_by_tx_range(tx_range).map(Some)
2115            }
2116        }
2117        Ok(None)
2118    }
2119
2120    fn receipts_by_tx_range(
2121        &self,
2122        range: impl RangeBounds<TxNumber>,
2123    ) -> ProviderResult<Vec<Self::Receipt>> {
2124        self.static_file_provider.get_range_with_static_file_or_database(
2125            StaticFileSegment::Receipts,
2126            to_range(range),
2127            |static_file, range, _| static_file.receipts_by_tx_range(range),
2128            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2129            |_| true,
2130        )
2131    }
2132
2133    fn receipts_by_block_range(
2134        &self,
2135        block_range: RangeInclusive<BlockNumber>,
2136    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2137        if block_range.is_empty() {
2138            return Ok(Vec::new());
2139        }
2140
2141        // collect block body indices for each block in the range
2142        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2143        let mut block_body_indices = Vec::with_capacity(range_len);
2144        for block_num in block_range {
2145            if let Some(indices) = self.block_body_indices(block_num)? {
2146                block_body_indices.push(indices);
2147            } else {
2148                // use default indices for missing blocks (empty block)
2149                block_body_indices.push(StoredBlockBodyIndices::default());
2150            }
2151        }
2152
2153        if block_body_indices.is_empty() {
2154            return Ok(Vec::new());
2155        }
2156
2157        // find blocks with transactions to determine transaction range
2158        let non_empty_blocks: Vec<_> =
2159            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2160
2161        if non_empty_blocks.is_empty() {
2162            // all blocks are empty
2163            return Ok(vec![Vec::new(); block_body_indices.len()]);
2164        }
2165
2166        // calculate the overall transaction range
2167        let first_tx = non_empty_blocks[0].first_tx_num();
2168        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2169
2170        // fetch all receipts in the transaction range
2171        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2172        let mut receipts_iter = all_receipts.into_iter();
2173
2174        // distribute receipts to their respective blocks
2175        let mut result = Vec::with_capacity(block_body_indices.len());
2176        for indices in &block_body_indices {
2177            if indices.tx_count == 0 {
2178                result.push(Vec::new());
2179            } else {
2180                let block_receipts =
2181                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2182                result.push(block_receipts);
2183            }
2184        }
2185
2186        Ok(result)
2187    }
2188}
2189
2190impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2191    for DatabaseProvider<TX, N>
2192{
2193    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2194        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2195    }
2196
2197    fn block_body_indices_range(
2198        &self,
2199        range: RangeInclusive<BlockNumber>,
2200    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2201        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2202    }
2203}
2204
2205impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2206    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2207        Ok(if let Some(encoded) = id.get_pre_encoded() {
2208            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2209        } else {
2210            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2211        })
2212    }
2213
2214    /// Get stage checkpoint progress.
2215    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2216        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2217    }
2218
2219    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2220        self.tx
2221            .cursor_read::<tables::StageCheckpoints>()?
2222            .walk(None)?
2223            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2224            .map_err(ProviderError::Database)
2225    }
2226}
2227
2228impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2229    /// Save stage checkpoint.
2230    fn save_stage_checkpoint(
2231        &self,
2232        id: StageId,
2233        checkpoint: StageCheckpoint,
2234    ) -> ProviderResult<()> {
2235        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2236    }
2237
2238    /// Save stage checkpoint progress.
2239    fn save_stage_checkpoint_progress(
2240        &self,
2241        id: StageId,
2242        checkpoint: Vec<u8>,
2243    ) -> ProviderResult<()> {
2244        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2245    }
2246
2247    #[instrument(level = "debug", target = "providers::db", skip_all)]
2248    fn update_pipeline_stages(
2249        &self,
2250        block_number: BlockNumber,
2251        drop_stage_checkpoint: bool,
2252    ) -> ProviderResult<()> {
2253        // iterate over all existing stages in the table and update its progress.
2254        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2255        for stage_id in StageId::ALL {
2256            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2257            cursor.upsert(
2258                stage_id.to_string(),
2259                &StageCheckpoint {
2260                    block_number,
2261                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2262                },
2263            )?;
2264        }
2265
2266        Ok(())
2267    }
2268}
2269
2270impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2271    fn plain_state_storages(
2272        &self,
2273        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2274    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2275        if self.cached_storage_settings().use_hashed_state() {
2276            let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2277
2278            addresses_with_keys
2279                .into_iter()
2280                .map(|(address, storage)| {
2281                    let hashed_address = keccak256(address);
2282                    storage
2283                        .into_iter()
2284                        .map(|key| -> ProviderResult<_> {
2285                            let hashed_key = keccak256(key);
2286                            let value = hashed_storage
2287                                .seek_by_key_subkey(hashed_address, hashed_key)?
2288                                .filter(|v| v.key == hashed_key)
2289                                .map(|v| v.value)
2290                                .unwrap_or_default();
2291                            Ok(StorageEntry { key, value })
2292                        })
2293                        .collect::<ProviderResult<Vec<_>>>()
2294                        .map(|storage| (address, storage))
2295                })
2296                .collect::<ProviderResult<Vec<(_, _)>>>()
2297        } else {
2298            let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2299
2300            addresses_with_keys
2301                .into_iter()
2302                .map(|(address, storage)| {
2303                    storage
2304                        .into_iter()
2305                        .map(|key| -> ProviderResult<_> {
2306                            Ok(plain_storage
2307                                .seek_by_key_subkey(address, key)?
2308                                .filter(|v| v.key == key)
2309                                .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2310                        })
2311                        .collect::<ProviderResult<Vec<_>>>()
2312                        .map(|storage| (address, storage))
2313                })
2314                .collect::<ProviderResult<Vec<(_, _)>>>()
2315        }
2316    }
2317
2318    fn changed_storages_with_range(
2319        &self,
2320        range: RangeInclusive<BlockNumber>,
2321    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2322        if self.cached_storage_settings().storage_v2 {
2323            self.storage_changesets_range(range)?.into_iter().try_fold(
2324                BTreeMap::new(),
2325                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2326                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2327                    accounts.entry(address).or_default().insert(storage_entry.key);
2328                    Ok(accounts)
2329                },
2330            )
2331        } else {
2332            self.tx
2333                .cursor_read::<tables::StorageChangeSets>()?
2334                .walk_range(BlockNumberAddress::range(range))?
2335                // fold all storages and save its old state so we can remove it from HashedStorage
2336                // it is needed as it is dup table.
2337                .try_fold(
2338                    BTreeMap::new(),
2339                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2340                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2341                        accounts.entry(address).or_default().insert(storage_entry.key);
2342                        Ok(accounts)
2343                    },
2344                )
2345        }
2346    }
2347
2348    fn changed_storages_and_blocks_with_range(
2349        &self,
2350        range: RangeInclusive<BlockNumber>,
2351    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2352        if self.cached_storage_settings().storage_v2 {
2353            self.storage_changesets_range(range)?.into_iter().try_fold(
2354                BTreeMap::new(),
2355                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2356                    storages
2357                        .entry((index.address(), storage.key))
2358                        .or_default()
2359                        .push(index.block_number());
2360                    Ok(storages)
2361                },
2362            )
2363        } else {
2364            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2365
2366            let storage_changeset_lists =
2367                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2368                    BTreeMap::new(),
2369                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2370                     entry|
2371                     -> ProviderResult<_> {
2372                        let (index, storage) = entry?;
2373                        storages
2374                            .entry((index.address(), storage.key))
2375                            .or_default()
2376                            .push(index.block_number());
2377                        Ok(storages)
2378                    },
2379                )?;
2380
2381            Ok(storage_changeset_lists)
2382        }
2383    }
2384}
2385
2386impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2387    for DatabaseProvider<TX, N>
2388{
2389    type Receipt = ReceiptTy<N>;
2390
2391    #[instrument(level = "debug", target = "providers::db", skip_all)]
2392    fn write_state<'a>(
2393        &self,
2394        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2395        is_value_known: OriginalValuesKnown,
2396        config: StateWriteConfig,
2397    ) -> ProviderResult<()> {
2398        let execution_outcome = execution_outcome.into();
2399
2400        if self.cached_storage_settings().use_hashed_state() &&
2401            !config.write_receipts &&
2402            !config.write_account_changesets &&
2403            !config.write_storage_changesets
2404        {
2405            // In storage v2 with all outputs directed to static files, plain state and changesets
2406            // are written elsewhere. Only bytecodes need MDBX writes, so skip the expensive
2407            // to_plain_state_and_reverts conversion that iterates all accounts and storage.
2408            self.write_bytecodes(
2409                execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2410            )?;
2411            return Ok(());
2412        }
2413
2414        let first_block = execution_outcome.first_block();
2415        let (plain_state, reverts) =
2416            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2417
2418        self.write_state_reverts(reverts, first_block, config)?;
2419        self.write_state_changes(plain_state)?;
2420
2421        if !config.write_receipts {
2422            return Ok(());
2423        }
2424
2425        let block_count = execution_outcome.len() as u64;
2426        let last_block = execution_outcome.last_block();
2427        let block_range = first_block..=last_block;
2428
2429        let tip = self.last_block_number()?.max(last_block);
2430
2431        // Fetch the first transaction number for each block in the range
2432        let block_indices: Vec<_> = self
2433            .block_body_indices_range(block_range)?
2434            .into_iter()
2435            .map(|b| b.first_tx_num)
2436            .collect();
2437
2438        // Ensure all expected blocks are present.
2439        if block_indices.len() < block_count as usize {
2440            let missing_blocks = block_count - block_indices.len() as u64;
2441            return Err(ProviderError::BlockBodyIndicesNotFound(
2442                last_block.saturating_sub(missing_blocks - 1),
2443            ));
2444        }
2445
2446        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2447
2448        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2449        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2450
2451        // All receipts from the last 128 blocks are required for blockchain tree, even with
2452        // [`PruneSegment::ContractLogs`].
2453        //
2454        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2455        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2456        // files yet. Once receipts exist in static files, we must continue writing to maintain
2457        // continuity and have no gaps.
2458        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2459            self.static_file_provider()
2460                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2461                .is_none()) &&
2462            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2463
2464        // Prepare set of addresses which logs should not be pruned.
2465        let mut allowed_addresses: AddressSet = AddressSet::default();
2466        for (_, addresses) in contract_log_pruner.range(..first_block) {
2467            allowed_addresses.extend(addresses.iter().copied());
2468        }
2469
2470        for (idx, (receipts, first_tx_index)) in
2471            execution_outcome.receipts().zip(block_indices).enumerate()
2472        {
2473            let block_number = first_block + idx as u64;
2474
2475            // Increment block number for receipts static file writer
2476            receipts_writer.increment_block(block_number)?;
2477
2478            // Skip writing receipts if pruning configuration requires us to.
2479            if prunable_receipts &&
2480                self.prune_modes
2481                    .receipts
2482                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2483            {
2484                continue
2485            }
2486
2487            // If there are new addresses to retain after this block number, track them
2488            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2489                allowed_addresses.extend(new_addresses.iter().copied());
2490            }
2491
2492            for (idx, receipt) in receipts.iter().enumerate() {
2493                let receipt_idx = first_tx_index + idx as u64;
2494                // Skip writing receipt if log filter is active and it does not have any logs to
2495                // retain
2496                if prunable_receipts &&
2497                    has_contract_log_filter &&
2498                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2499                {
2500                    continue
2501                }
2502
2503                receipts_writer.append_receipt(receipt_idx, receipt)?;
2504            }
2505        }
2506
2507        Ok(())
2508    }
2509
2510    fn write_state_reverts(
2511        &self,
2512        reverts: PlainStateReverts,
2513        first_block: BlockNumber,
2514        config: StateWriteConfig,
2515    ) -> ProviderResult<()> {
2516        // Write storage changes
2517        if config.write_storage_changesets {
2518            tracing::trace!("Writing storage changes");
2519            let mut storages_cursor =
2520                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2521            for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2522                let block_number = first_block + block_index as BlockNumber;
2523
2524                tracing::trace!(block_number, "Writing block change");
2525                // sort changes by address.
2526                storage_changes.par_sort_unstable_by_key(|a| a.address);
2527                let total_changes =
2528                    storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2529                let mut changeset = Vec::with_capacity(total_changes);
2530                for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2531                    let mut storage = storage_revert
2532                        .into_iter()
2533                        .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2534                        .collect::<Vec<_>>();
2535                    // sort storage slots by key.
2536                    storage.par_sort_unstable_by_key(|a| a.0);
2537
2538                    // If we are writing the primary storage wipe transition, the pre-existing
2539                    // storage state has to be taken from the database and written to storage
2540                    // history. See [StorageWipe::Primary] for more details.
2541                    //
2542                    // TODO(mediocregopher): This could be rewritten in a way which doesn't
2543                    // require collecting wiped entries into a Vec like this, see
2544                    // `write_storage_trie_changesets`.
2545                    let mut wiped_storage = Vec::new();
2546                    if wiped {
2547                        tracing::trace!(?address, "Wiping storage");
2548                        if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2549                            wiped_storage.push((entry.key, entry.value));
2550                            while let Some(entry) = storages_cursor.next_dup_val()? {
2551                                wiped_storage.push((entry.key, entry.value))
2552                            }
2553                        }
2554                    }
2555
2556                    tracing::trace!(?address, ?storage, "Writing storage reverts");
2557                    for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2558                        changeset.push(StorageBeforeTx { address, key, value });
2559                    }
2560                }
2561
2562                let mut storage_changesets_writer =
2563                    EitherWriter::new_storage_changesets(self, block_number)?;
2564                storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2565            }
2566        }
2567
2568        if !config.write_account_changesets {
2569            return Ok(());
2570        }
2571
2572        // Write account changes
2573        tracing::trace!(?first_block, "Writing account changes");
2574        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2575            let block_number = first_block + block_index as BlockNumber;
2576            let changeset = account_block_reverts
2577                .into_iter()
2578                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2579                .collect::<Vec<_>>();
2580            let mut account_changesets_writer =
2581                EitherWriter::new_account_changesets(self, block_number)?;
2582
2583            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2584        }
2585
2586        Ok(())
2587    }
2588
2589    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2590        // sort all entries so they can be written to database in more performant way.
2591        // and take smaller memory footprint.
2592        changes.accounts.par_sort_by_key(|a| a.0);
2593        changes.storage.par_sort_by_key(|a| a.address);
2594        changes.contracts.par_sort_by_key(|a| a.0);
2595
2596        if !self.cached_storage_settings().use_hashed_state() {
2597            // Write new account state
2598            tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2599            let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2600            // write account to database.
2601            for (address, account) in changes.accounts {
2602                if let Some(account) = account {
2603                    tracing::trace!(?address, "Updating plain state account");
2604                    accounts_cursor.upsert(address, &account.into())?;
2605                } else if accounts_cursor.seek_exact(address)?.is_some() {
2606                    tracing::trace!(?address, "Deleting plain state account");
2607                    accounts_cursor.delete_current()?;
2608                }
2609            }
2610
2611            // Write new storage state and wipe storage if needed.
2612            tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2613            let mut storages_cursor =
2614                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2615            for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2616                // Wiping of storage.
2617                if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2618                    storages_cursor.delete_current_duplicates()?;
2619                }
2620                // cast storages to B256.
2621                let mut storage = storage
2622                    .into_iter()
2623                    .map(|(k, value)| StorageEntry { key: k.into(), value })
2624                    .collect::<Vec<_>>();
2625                // sort storage slots by key.
2626                storage.par_sort_unstable_by_key(|a| a.key);
2627
2628                for entry in storage {
2629                    tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2630                    if let Some(db_entry) =
2631                        storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2632                        db_entry.key == entry.key
2633                    {
2634                        storages_cursor.delete_current()?;
2635                    }
2636
2637                    if !entry.value.is_zero() {
2638                        storages_cursor.upsert(address, &entry)?;
2639                    }
2640                }
2641            }
2642        }
2643
2644        // Write bytecode
2645        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2646        self.write_bytecodes(
2647            changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2648        )?;
2649
2650        Ok(())
2651    }
2652
2653    #[instrument(level = "debug", target = "providers::db", skip_all)]
2654    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2655        // Write hashed account updates.
2656        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2657        for (hashed_address, account) in hashed_state.accounts() {
2658            if let Some(account) = account {
2659                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2660            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2661                hashed_accounts_cursor.delete_current()?;
2662            }
2663        }
2664
2665        // Write hashed storage changes.
2666        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2667        let mut hashed_storage_cursor =
2668            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2669        for (hashed_address, storage) in sorted_storages {
2670            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2671                hashed_storage_cursor.delete_current_duplicates()?;
2672            }
2673
2674            for (hashed_slot, value) in storage.storage_slots_ref() {
2675                let entry = StorageEntry { key: *hashed_slot, value: *value };
2676
2677                if let Some(db_entry) =
2678                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2679                    db_entry.key == entry.key
2680                {
2681                    hashed_storage_cursor.delete_current()?;
2682                }
2683
2684                if !entry.value.is_zero() {
2685                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2686                }
2687            }
2688        }
2689
2690        Ok(())
2691    }
2692
2693    /// Remove the last N blocks of state.
2694    ///
2695    /// The latest state will be unwound
2696    ///
2697    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2698    ///    transaction ids.
2699    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2700    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2701    ///    the changesets.
2702    ///    - In order to have both the old and new values in the changesets, we also access the
2703    ///      plain state tables.
2704    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2705    ///    we:
2706    ///     1. Take the old value from the changeset
2707    ///     2. Take the new value from the plain state
2708    ///     3. Save the old value to the local state
2709    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2710    ///    have seen before we:
2711    ///     1. Take the old value from the changeset
2712    ///     2. Take the new value from the local state
2713    ///     3. Set the local state to the value in the changeset
2714    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2715        let range = block + 1..=self.last_block_number()?;
2716
2717        if range.is_empty() {
2718            return Ok(());
2719        }
2720
2721        // We are not removing block meta as it is used to get block changesets.
2722        let block_bodies = self.block_body_indices_range(range.clone())?;
2723
2724        // get transaction receipts
2725        let from_transaction_num =
2726            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2727
2728        let storage_range = BlockNumberAddress::range(range.clone());
2729        let storage_changeset = if self.cached_storage_settings().storage_v2 {
2730            let changesets = self.storage_changesets_range(range.clone())?;
2731            let mut changeset_writer =
2732                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2733            changeset_writer.prune_storage_changesets(block)?;
2734            changesets
2735        } else {
2736            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2737        };
2738        let account_changeset = if self.cached_storage_settings().storage_v2 {
2739            let changesets = self.account_changesets_range(range)?;
2740            let mut changeset_writer =
2741                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2742            changeset_writer.prune_account_changesets(block)?;
2743            changesets
2744        } else {
2745            self.take::<tables::AccountChangeSets>(range)?
2746        };
2747
2748        if self.cached_storage_settings().use_hashed_state() {
2749            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2750            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2751
2752            let (state, _) = self.populate_bundle_state_hashed(
2753                account_changeset,
2754                storage_changeset,
2755                &mut hashed_accounts_cursor,
2756                &mut hashed_storage_cursor,
2757            )?;
2758
2759            for (address, (old_account, new_account, storage)) in &state {
2760                if old_account != new_account {
2761                    let hashed_address = keccak256(address);
2762                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2763                    if let Some(account) = old_account {
2764                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2765                    } else if existing_entry.is_some() {
2766                        hashed_accounts_cursor.delete_current()?;
2767                    }
2768                }
2769
2770                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2771                    let hashed_address = keccak256(address);
2772                    let hashed_storage_key = keccak256(storage_key);
2773                    let storage_entry =
2774                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2775                    if hashed_storage_cursor
2776                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2777                        .is_some_and(|s| s.key == hashed_storage_key)
2778                    {
2779                        hashed_storage_cursor.delete_current()?
2780                    }
2781
2782                    if !old_storage_value.is_zero() {
2783                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2784                    }
2785                }
2786            }
2787        } else {
2788            // This is not working for blocks that are not at tip. as plain state is not the last
2789            // state of end range. We should rename the functions or add support to access
2790            // History state. Accessing history state can be tricky but we are not gaining
2791            // anything.
2792            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2793            let mut plain_storage_cursor =
2794                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2795
2796            let (state, _) = self.populate_bundle_state(
2797                account_changeset,
2798                storage_changeset,
2799                &mut plain_accounts_cursor,
2800                &mut plain_storage_cursor,
2801            )?;
2802
2803            for (address, (old_account, new_account, storage)) in &state {
2804                if old_account != new_account {
2805                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2806                    if let Some(account) = old_account {
2807                        plain_accounts_cursor.upsert(*address, account)?;
2808                    } else if existing_entry.is_some() {
2809                        plain_accounts_cursor.delete_current()?;
2810                    }
2811                }
2812
2813                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2814                    let storage_entry =
2815                        StorageEntry { key: *storage_key, value: *old_storage_value };
2816                    if plain_storage_cursor
2817                        .seek_by_key_subkey(*address, *storage_key)?
2818                        .is_some_and(|s| s.key == *storage_key)
2819                    {
2820                        plain_storage_cursor.delete_current()?
2821                    }
2822
2823                    if !old_storage_value.is_zero() {
2824                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2825                    }
2826                }
2827            }
2828        }
2829
2830        self.remove_receipts_from(from_transaction_num, block)?;
2831
2832        Ok(())
2833    }
2834
2835    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2836    ///
2837    /// The latest state will be unwound and returned back with all the blocks
2838    ///
2839    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2840    ///    transaction ids.
2841    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2842    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2843    ///    the changesets.
2844    ///    - In order to have both the old and new values in the changesets, we also access the
2845    ///      plain state tables.
2846    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2847    ///    we:
2848    ///     1. Take the old value from the changeset
2849    ///     2. Take the new value from the plain state
2850    ///     3. Save the old value to the local state
2851    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2852    ///    have seen before we:
2853    ///     1. Take the old value from the changeset
2854    ///     2. Take the new value from the local state
2855    ///     3. Set the local state to the value in the changeset
2856    fn take_state_above(
2857        &self,
2858        block: BlockNumber,
2859    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2860        let range = block + 1..=self.last_block_number()?;
2861
2862        if range.is_empty() {
2863            return Ok(ExecutionOutcome::default())
2864        }
2865        let start_block_number = *range.start();
2866
2867        // We are not removing block meta as it is used to get block changesets.
2868        let block_bodies = self.block_body_indices_range(range.clone())?;
2869
2870        // get transaction receipts
2871        let from_transaction_num =
2872            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2873        let to_transaction_num =
2874            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2875
2876        let storage_range = BlockNumberAddress::range(range.clone());
2877        let storage_changeset = if let Some(highest_block) = self
2878            .static_file_provider
2879            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2880            self.cached_storage_settings().storage_v2
2881        {
2882            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2883            let mut changeset_writer =
2884                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2885            changeset_writer.prune_storage_changesets(block)?;
2886            changesets
2887        } else {
2888            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2889        };
2890
2891        // if there are static files for this segment, prune them.
2892        let highest_changeset_block = self
2893            .static_file_provider
2894            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2895        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2896            self.cached_storage_settings().storage_v2
2897        {
2898            // TODO: add a `take` method that removes and returns the items instead of doing this
2899            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2900            let mut changeset_writer =
2901                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2902            changeset_writer.prune_account_changesets(block)?;
2903
2904            changesets
2905        } else {
2906            // Have to remove from static files if they exist, otherwise remove using `take` for the
2907            // changeset tables
2908            self.take::<tables::AccountChangeSets>(range)?
2909        };
2910
2911        let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2912            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2913            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2914
2915            let (state, reverts) = self.populate_bundle_state_hashed(
2916                account_changeset,
2917                storage_changeset,
2918                &mut hashed_accounts_cursor,
2919                &mut hashed_storage_cursor,
2920            )?;
2921
2922            for (address, (old_account, new_account, storage)) in &state {
2923                if old_account != new_account {
2924                    let hashed_address = keccak256(address);
2925                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2926                    if let Some(account) = old_account {
2927                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2928                    } else if existing_entry.is_some() {
2929                        hashed_accounts_cursor.delete_current()?;
2930                    }
2931                }
2932
2933                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2934                    let hashed_address = keccak256(address);
2935                    let hashed_storage_key = keccak256(storage_key);
2936                    let storage_entry =
2937                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2938                    if hashed_storage_cursor
2939                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2940                        .is_some_and(|s| s.key == hashed_storage_key)
2941                    {
2942                        hashed_storage_cursor.delete_current()?
2943                    }
2944
2945                    if !old_storage_value.is_zero() {
2946                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2947                    }
2948                }
2949            }
2950
2951            (state, reverts)
2952        } else {
2953            // This is not working for blocks that are not at tip. as plain state is not the last
2954            // state of end range. We should rename the functions or add support to access
2955            // History state. Accessing history state can be tricky but we are not gaining
2956            // anything.
2957            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2958            let mut plain_storage_cursor =
2959                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2960
2961            let (state, reverts) = self.populate_bundle_state(
2962                account_changeset,
2963                storage_changeset,
2964                &mut plain_accounts_cursor,
2965                &mut plain_storage_cursor,
2966            )?;
2967
2968            for (address, (old_account, new_account, storage)) in &state {
2969                if old_account != new_account {
2970                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2971                    if let Some(account) = old_account {
2972                        plain_accounts_cursor.upsert(*address, account)?;
2973                    } else if existing_entry.is_some() {
2974                        plain_accounts_cursor.delete_current()?;
2975                    }
2976                }
2977
2978                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2979                    let storage_entry =
2980                        StorageEntry { key: *storage_key, value: *old_storage_value };
2981                    if plain_storage_cursor
2982                        .seek_by_key_subkey(*address, *storage_key)?
2983                        .is_some_and(|s| s.key == *storage_key)
2984                    {
2985                        plain_storage_cursor.delete_current()?
2986                    }
2987
2988                    if !old_storage_value.is_zero() {
2989                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2990                    }
2991                }
2992            }
2993
2994            (state, reverts)
2995        };
2996
2997        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2998        let mut receipts_iter = self
2999            .static_file_provider
3000            .get_range_with_static_file_or_database(
3001                StaticFileSegment::Receipts,
3002                from_transaction_num..to_transaction_num + 1,
3003                |static_file, range, _| {
3004                    static_file
3005                        .receipts_by_tx_range(range.clone())
3006                        .map(|r| range.into_iter().zip(r).collect())
3007                },
3008                |range, _| {
3009                    self.tx
3010                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
3011                        .walk_range(range)?
3012                        .map(|r| r.map_err(Into::into))
3013                        .collect()
3014                },
3015                |_| true,
3016            )?
3017            .into_iter()
3018            .peekable();
3019
3020        let mut receipts = Vec::with_capacity(block_bodies.len());
3021        // loop break if we are at the end of the blocks.
3022        for block_body in block_bodies {
3023            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3024            for num in block_body.tx_num_range() {
3025                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3026                    block_receipts.push(receipts_iter.next().unwrap().1);
3027                }
3028            }
3029            receipts.push(block_receipts);
3030        }
3031
3032        self.remove_receipts_from(from_transaction_num, block)?;
3033
3034        Ok(ExecutionOutcome::new_init(
3035            state,
3036            reverts,
3037            Vec::new(),
3038            receipts,
3039            start_block_number,
3040            Vec::new(),
3041        ))
3042    }
3043}
3044
3045impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3046    fn write_account_trie_updates<A: TrieTableAdapter>(
3047        tx: &TX,
3048        trie_updates: &TrieUpdatesSorted,
3049        num_entries: &mut usize,
3050    ) -> ProviderResult<()>
3051    where
3052        TX: DbTxMut,
3053    {
3054        let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3055        // Process sorted account nodes
3056        for (key, updated_node) in trie_updates.account_nodes_ref() {
3057            let nibbles = A::AccountKey::from(*key);
3058            match updated_node {
3059                Some(node) => {
3060                    if !key.is_empty() {
3061                        *num_entries += 1;
3062                        account_trie_cursor.upsert(nibbles, node)?;
3063                    }
3064                }
3065                None => {
3066                    *num_entries += 1;
3067                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3068                        account_trie_cursor.delete_current()?;
3069                    }
3070                }
3071            }
3072        }
3073        Ok(())
3074    }
3075
3076    fn write_storage_tries<A: TrieTableAdapter>(
3077        tx: &TX,
3078        storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3079        num_entries: &mut usize,
3080    ) -> ProviderResult<()>
3081    where
3082        TX: DbTxMut,
3083    {
3084        let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3085        for (hashed_address, storage_trie_updates) in storage_tries {
3086            let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3087                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3088            *num_entries +=
3089                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3090            cursor = db_storage_trie_cursor.cursor;
3091        }
3092        Ok(())
3093    }
3094}
3095
3096impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3097    /// Writes trie updates to the database with already sorted updates.
3098    ///
3099    /// Returns the number of entries modified.
3100    #[instrument(level = "debug", target = "providers::db", skip_all)]
3101    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3102        if trie_updates.is_empty() {
3103            return Ok(0)
3104        }
3105
3106        // Track the number of inserted entries.
3107        let mut num_entries = 0;
3108
3109        reth_trie_db::with_adapter!(self, |A| {
3110            Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3111        });
3112
3113        num_entries +=
3114            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3115
3116        Ok(num_entries)
3117    }
3118}
3119
3120impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3121    /// Writes storage trie updates from the given storage trie map with already sorted updates.
3122    ///
3123    /// Expects the storage trie updates to already be sorted by the hashed address key.
3124    ///
3125    /// Returns the number of entries modified.
3126    fn write_storage_trie_updates_sorted<'a>(
3127        &self,
3128        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3129    ) -> ProviderResult<usize> {
3130        let mut num_entries = 0;
3131        let mut storage_tries = storage_tries.collect::<Vec<_>>();
3132        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3133        reth_trie_db::with_adapter!(self, |A| {
3134            Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3135        });
3136        Ok(num_entries)
3137    }
3138}
3139
3140impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3141    fn unwind_account_hashing<'a>(
3142        &self,
3143        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3144    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3145        // Aggregate all block changesets and make a list of accounts that have been changed.
3146        // Note that collecting and then reversing the order is necessary to ensure that the
3147        // changes are applied in the correct order.
3148        let hashed_accounts = changesets
3149            .into_iter()
3150            .map(|(_, e)| (keccak256(e.address), e.info))
3151            .collect::<Vec<_>>()
3152            .into_iter()
3153            .rev()
3154            .collect::<BTreeMap<_, _>>();
3155
3156        // Apply values to HashedState, and remove the account if it's None.
3157        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3158        for (hashed_address, account) in &hashed_accounts {
3159            if let Some(account) = account {
3160                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3161            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3162                hashed_accounts_cursor.delete_current()?;
3163            }
3164        }
3165
3166        Ok(hashed_accounts)
3167    }
3168
3169    fn unwind_account_hashing_range(
3170        &self,
3171        range: impl RangeBounds<BlockNumber>,
3172    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3173        let changesets = self.account_changesets_range(range)?;
3174        self.unwind_account_hashing(changesets.iter())
3175    }
3176
3177    fn insert_account_for_hashing(
3178        &self,
3179        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3180    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3181        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3182        let hashed_accounts =
3183            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3184        for (hashed_address, account) in &hashed_accounts {
3185            if let Some(account) = account {
3186                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3187            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3188                hashed_accounts_cursor.delete_current()?;
3189            }
3190        }
3191        Ok(hashed_accounts)
3192    }
3193
3194    fn unwind_storage_hashing(
3195        &self,
3196        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3197    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3198        // Aggregate all block changesets and make list of accounts that have been changed.
3199        let mut hashed_storages = changesets
3200            .into_iter()
3201            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3202                let hashed_key = keccak256(storage_entry.key);
3203                (keccak256(address), hashed_key, storage_entry.value)
3204            })
3205            .collect::<Vec<_>>();
3206        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3207
3208        // Apply values to HashedState, and remove the account if it's None.
3209        let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3210            B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3211        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3212        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3213            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3214
3215            if hashed_storage
3216                .seek_by_key_subkey(hashed_address, key)?
3217                .is_some_and(|entry| entry.key == key)
3218            {
3219                hashed_storage.delete_current()?;
3220            }
3221
3222            if !value.is_zero() {
3223                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3224            }
3225        }
3226        Ok(hashed_storage_keys)
3227    }
3228
3229    fn unwind_storage_hashing_range(
3230        &self,
3231        range: impl RangeBounds<BlockNumber>,
3232    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3233        let changesets = self.storage_changesets_range(range)?;
3234        self.unwind_storage_hashing(changesets.into_iter())
3235    }
3236
3237    fn insert_storage_for_hashing(
3238        &self,
3239        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3240    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3241        // hash values
3242        let hashed_storages =
3243            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3244                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3245                    map.insert(keccak256(entry.key), entry.value);
3246                    map
3247                });
3248                map.insert(keccak256(address), storage);
3249                map
3250            });
3251
3252        let hashed_storage_keys = hashed_storages
3253            .iter()
3254            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3255            .collect();
3256
3257        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3258        // Hash the address and key and apply them to HashedStorage (if Storage is None
3259        // just remove it);
3260        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3261            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3262                if hashed_storage_cursor
3263                    .seek_by_key_subkey(hashed_address, key)?
3264                    .is_some_and(|entry| entry.key == key)
3265                {
3266                    hashed_storage_cursor.delete_current()?;
3267                }
3268
3269                if !value.is_zero() {
3270                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3271                }
3272                Ok(())
3273            })
3274        })?;
3275
3276        Ok(hashed_storage_keys)
3277    }
3278}
3279
3280impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3281    fn unwind_account_history_indices<'a>(
3282        &self,
3283        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3284    ) -> ProviderResult<usize> {
3285        let mut last_indices = changesets
3286            .into_iter()
3287            .map(|(index, account)| (account.address, *index))
3288            .collect::<Vec<_>>();
3289        last_indices.sort_unstable_by_key(|(a, _)| *a);
3290
3291        if self.cached_storage_settings().storage_v2 {
3292            let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3293            self.pending_rocksdb_batches.lock().push(batch);
3294        } else {
3295            // Unwind the account history index in MDBX.
3296            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3297            for &(address, rem_index) in &last_indices {
3298                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3299                    &mut cursor,
3300                    ShardedKey::last(address),
3301                    rem_index,
3302                    |sharded_key| sharded_key.key == address,
3303                )?;
3304
3305                // Check the last returned partial shard.
3306                // If it's not empty, the shard needs to be reinserted.
3307                if !partial_shard.is_empty() {
3308                    cursor.insert(
3309                        ShardedKey::last(address),
3310                        &BlockNumberList::new_pre_sorted(partial_shard),
3311                    )?;
3312                }
3313            }
3314        }
3315
3316        let changesets = last_indices.len();
3317        Ok(changesets)
3318    }
3319
3320    fn unwind_account_history_indices_range(
3321        &self,
3322        range: impl RangeBounds<BlockNumber>,
3323    ) -> ProviderResult<usize> {
3324        let changesets = self.account_changesets_range(range)?;
3325        self.unwind_account_history_indices(changesets.iter())
3326    }
3327
3328    fn insert_account_history_index(
3329        &self,
3330        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3331    ) -> ProviderResult<()> {
3332        self.append_history_index::<_, tables::AccountsHistory>(
3333            account_transitions,
3334            ShardedKey::new,
3335        )
3336    }
3337
3338    fn unwind_storage_history_indices(
3339        &self,
3340        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3341    ) -> ProviderResult<usize> {
3342        let mut storage_changesets = changesets
3343            .into_iter()
3344            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3345            .collect::<Vec<_>>();
3346        storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3347
3348        if self.cached_storage_settings().storage_v2 {
3349            let batch =
3350                self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3351            self.pending_rocksdb_batches.lock().push(batch);
3352        } else {
3353            // Unwind the storage history index in MDBX.
3354            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3355            for &(address, storage_key, rem_index) in &storage_changesets {
3356                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3357                    &mut cursor,
3358                    StorageShardedKey::last(address, storage_key),
3359                    rem_index,
3360                    |storage_sharded_key| {
3361                        storage_sharded_key.address == address &&
3362                            storage_sharded_key.sharded_key.key == storage_key
3363                    },
3364                )?;
3365
3366                // Check the last returned partial shard.
3367                // If it's not empty, the shard needs to be reinserted.
3368                if !partial_shard.is_empty() {
3369                    cursor.insert(
3370                        StorageShardedKey::last(address, storage_key),
3371                        &BlockNumberList::new_pre_sorted(partial_shard),
3372                    )?;
3373                }
3374            }
3375        }
3376
3377        let changesets = storage_changesets.len();
3378        Ok(changesets)
3379    }
3380
3381    fn unwind_storage_history_indices_range(
3382        &self,
3383        range: impl RangeBounds<BlockNumber>,
3384    ) -> ProviderResult<usize> {
3385        let changesets = self.storage_changesets_range(range)?;
3386        self.unwind_storage_history_indices(changesets.into_iter())
3387    }
3388
3389    fn insert_storage_history_index(
3390        &self,
3391        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3392    ) -> ProviderResult<()> {
3393        self.append_history_index::<_, tables::StoragesHistory>(
3394            storage_transitions,
3395            |(address, storage_key), highest_block_number| {
3396                StorageShardedKey::new(address, storage_key, highest_block_number)
3397            },
3398        )
3399    }
3400
3401    #[instrument(level = "debug", target = "providers::db", skip_all)]
3402    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3403        let storage_settings = self.cached_storage_settings();
3404        if !storage_settings.storage_v2 {
3405            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3406            self.insert_account_history_index(indices)?;
3407        }
3408
3409        if !storage_settings.storage_v2 {
3410            let indices = self.changed_storages_and_blocks_with_range(range)?;
3411            self.insert_storage_history_index(indices)?;
3412        }
3413
3414        Ok(())
3415    }
3416}
3417
3418impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3419    for DatabaseProvider<TX, N>
3420{
3421    fn take_block_and_execution_above(
3422        &self,
3423        block: BlockNumber,
3424    ) -> ProviderResult<Chain<Self::Primitives>> {
3425        let range = block + 1..=self.last_block_number()?;
3426
3427        self.unwind_trie_state_from(block + 1)?;
3428
3429        // get execution res
3430        let execution_state = self.take_state_above(block)?;
3431
3432        let blocks = self.recovered_block_range(range)?;
3433
3434        // remove block bodies it is needed for both get block range and get block execution results
3435        // that is why it is deleted afterwards.
3436        self.remove_blocks_above(block)?;
3437
3438        // Update pipeline progress
3439        self.update_pipeline_stages(block, true)?;
3440
3441        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3442    }
3443
3444    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3445        self.unwind_trie_state_from(block + 1)?;
3446
3447        // remove execution res
3448        self.remove_state_above(block)?;
3449
3450        // remove block bodies it is needed for both get block range and get block execution results
3451        // that is why it is deleted afterwards.
3452        self.remove_blocks_above(block)?;
3453
3454        // Update pipeline progress
3455        self.update_pipeline_stages(block, true)?;
3456
3457        Ok(())
3458    }
3459}
3460
3461impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3462    for DatabaseProvider<TX, N>
3463{
3464    type Block = BlockTy<N>;
3465    type Receipt = ReceiptTy<N>;
3466
3467    /// Inserts the block into the database, writing to both static files and MDBX.
3468    ///
3469    /// This is a convenience method primarily used in tests. For production use,
3470    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3471    fn insert_block(
3472        &self,
3473        block: &RecoveredBlock<Self::Block>,
3474    ) -> ProviderResult<StoredBlockBodyIndices> {
3475        let block_number = block.number();
3476
3477        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3478        let executed_block = ExecutedBlock::new(
3479            Arc::new(block.clone()),
3480            Arc::new(BlockExecutionOutput {
3481                result: BlockExecutionResult {
3482                    receipts: Default::default(),
3483                    requests: Default::default(),
3484                    gas_used: 0,
3485                    blob_gas_used: 0,
3486                },
3487                state: Default::default(),
3488            }),
3489            ComputedTrieData::default(),
3490        );
3491
3492        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3493        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3494
3495        // Return the body indices
3496        self.block_body_indices(block_number)?
3497            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3498    }
3499
3500    fn append_block_bodies(
3501        &self,
3502        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3503    ) -> ProviderResult<()> {
3504        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3505
3506        // Initialize writer if we will be writing transactions to staticfiles
3507        let mut tx_writer =
3508            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3509
3510        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3511        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3512
3513        // Get id for the next tx_num or zero if there are no transactions.
3514        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3515
3516        for (block_number, body) in &bodies {
3517            // Increment block on static file header.
3518            tx_writer.increment_block(*block_number)?;
3519
3520            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3521            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3522
3523            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3524
3525            // insert block meta
3526            block_indices_cursor.append(*block_number, &block_indices)?;
3527
3528            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3529
3530            let Some(body) = body else { continue };
3531
3532            // write transaction block index
3533            if !body.transactions().is_empty() {
3534                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3535                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3536            }
3537
3538            // write transactions
3539            for transaction in body.transactions() {
3540                tx_writer.append_transaction(next_tx_num, transaction)?;
3541
3542                // Increment transaction id for each transaction.
3543                next_tx_num += 1;
3544            }
3545        }
3546
3547        self.storage.writer().write_block_bodies(self, bodies)?;
3548
3549        Ok(())
3550    }
3551
3552    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3553        let last_block_number = self.last_block_number()?;
3554        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3555        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3556            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3557        }
3558
3559        // Get highest static file block for the total block range
3560        let highest_static_file_block = self
3561            .static_file_provider()
3562            .get_highest_static_file_block(StaticFileSegment::Headers)
3563            .expect("todo: error handling, headers should exist");
3564
3565        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3566        // we remove only what is ABOVE the block.
3567        //
3568        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3569        // will have three blocks to remove, which will be block 8, 7, and 6.
3570        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3571        self.static_file_provider()
3572            .get_writer(block, StaticFileSegment::Headers)?
3573            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3574
3575        // First transaction to be removed
3576        let unwind_tx_from = self
3577            .block_body_indices(block)?
3578            .map(|b| b.next_tx_num())
3579            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3580
3581        // Last transaction to be removed
3582        let unwind_tx_to = self
3583            .tx
3584            .cursor_read::<tables::BlockBodyIndices>()?
3585            .last()?
3586            // shouldn't happen because this was OK above
3587            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3588            .1
3589            .last_tx_num();
3590
3591        if unwind_tx_from <= unwind_tx_to {
3592            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3593            self.with_rocksdb_batch(|batch| {
3594                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3595                for (hash, _) in hashes {
3596                    writer.delete_transaction_hash_number(hash)?;
3597                }
3598                Ok(((), writer.into_raw_rocksdb_batch()))
3599            })?;
3600        }
3601
3602        // Skip sender pruning when sender_recovery is fully pruned, since no sender data
3603        // exists in static files or the database.
3604        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3605            EitherWriter::new_senders(self, last_block_number)?
3606                .prune_senders(unwind_tx_from, block)?;
3607        }
3608
3609        self.remove_bodies_above(block)?;
3610
3611        Ok(())
3612    }
3613
3614    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3615        self.storage.writer().remove_block_bodies_above(self, block)?;
3616
3617        // First transaction to be removed
3618        let unwind_tx_from = self
3619            .block_body_indices(block)?
3620            .map(|b| b.next_tx_num())
3621            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3622
3623        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3624        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3625
3626        let static_file_tx_num =
3627            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3628
3629        let to_delete = static_file_tx_num
3630            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3631            .unwrap_or_default();
3632
3633        self.static_file_provider
3634            .latest_writer(StaticFileSegment::Transactions)?
3635            .prune_transactions(to_delete, block)?;
3636
3637        Ok(())
3638    }
3639
3640    /// Appends blocks with their execution state to the database.
3641    ///
3642    /// **Note:** This function is only used in tests.
3643    ///
3644    /// History indices are written to the appropriate backend based on storage settings:
3645    /// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
3646    ///
3647    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3648    fn append_blocks_with_state(
3649        &self,
3650        blocks: Vec<RecoveredBlock<Self::Block>>,
3651        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3652        hashed_state: HashedPostStateSorted,
3653    ) -> ProviderResult<()> {
3654        if blocks.is_empty() {
3655            debug!(target: "providers::db", "Attempted to append empty block range");
3656            return Ok(())
3657        }
3658
3659        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3660        // `None`.
3661        let first_number = blocks[0].number();
3662
3663        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3664        // `None`.
3665        let last_block_number = blocks[blocks.len() - 1].number();
3666
3667        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3668
3669        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3670        // This is necessary because with edge storage, changesets are written to static files
3671        // whose index isn't updated until commit, making them invisible to subsequent reads
3672        // within the same transaction.
3673        let (account_transitions, storage_transitions) = {
3674            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3675            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3676            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3677                let block_number = first_number + block_idx as u64;
3678                for (address, account_revert) in block_reverts {
3679                    account_transitions.entry(*address).or_default().push(block_number);
3680                    for storage_key in account_revert.storage.keys() {
3681                        let key = B256::from(storage_key.to_be_bytes());
3682                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3683                    }
3684                }
3685            }
3686            (account_transitions, storage_transitions)
3687        };
3688
3689        // Insert the blocks
3690        for block in blocks {
3691            self.insert_block(&block)?;
3692            durations_recorder.record_relative(metrics::Action::InsertBlock);
3693        }
3694
3695        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3696        durations_recorder.record_relative(metrics::Action::InsertState);
3697
3698        // insert hashes and intermediate merkle nodes
3699        self.write_hashed_state(&hashed_state)?;
3700        durations_recorder.record_relative(metrics::Action::InsertHashes);
3701
3702        // Use pre-computed transitions for history indices since static file
3703        // writes aren't visible until commit.
3704        // Note: For MDBX we use insert_*_history_index. For RocksDB we use
3705        // append_*_history_shard which handles read-merge-write internally.
3706        let storage_settings = self.cached_storage_settings();
3707        if storage_settings.storage_v2 {
3708            self.with_rocksdb_batch(|mut batch| {
3709                for (address, blocks) in account_transitions {
3710                    batch.append_account_history_shard(address, blocks)?;
3711                }
3712                Ok(((), Some(batch.into_inner())))
3713            })?;
3714        } else {
3715            self.insert_account_history_index(account_transitions)?;
3716        }
3717        if storage_settings.storage_v2 {
3718            self.with_rocksdb_batch(|mut batch| {
3719                for ((address, key), blocks) in storage_transitions {
3720                    batch.append_storage_history_shard(address, key, blocks)?;
3721                }
3722                Ok(((), Some(batch.into_inner())))
3723            })?;
3724        } else {
3725            self.insert_storage_history_index(storage_transitions)?;
3726        }
3727        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3728
3729        // Update pipeline progress
3730        self.update_pipeline_stages(last_block_number, false)?;
3731        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3732
3733        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3734
3735        Ok(())
3736    }
3737}
3738
3739impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3740    fn get_prune_checkpoint(
3741        &self,
3742        segment: PruneSegment,
3743    ) -> ProviderResult<Option<PruneCheckpoint>> {
3744        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3745    }
3746
3747    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3748        Ok(PruneSegment::variants()
3749            .filter_map(|segment| {
3750                self.tx
3751                    .get::<tables::PruneCheckpoints>(segment)
3752                    .transpose()
3753                    .map(|chk| chk.map(|chk| (segment, chk)))
3754            })
3755            .collect::<Result<_, _>>()?)
3756    }
3757}
3758
3759impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3760    fn save_prune_checkpoint(
3761        &self,
3762        segment: PruneSegment,
3763        checkpoint: PruneCheckpoint,
3764    ) -> ProviderResult<()> {
3765        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3766    }
3767}
3768
3769impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3770    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3771        let db_entries = self.tx.entries::<T>()?;
3772        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3773            Ok(entries) => entries,
3774            Err(ProviderError::UnsupportedProvider) => 0,
3775            Err(err) => return Err(err),
3776        };
3777
3778        Ok(db_entries + static_file_entries)
3779    }
3780}
3781
3782impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3783    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3784        let mut finalized_blocks = self
3785            .tx
3786            .cursor_read::<tables::ChainState>()?
3787            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3788            .take(1)
3789            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3790
3791        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3792        Ok(last_finalized_block_number)
3793    }
3794
3795    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3796        let mut finalized_blocks = self
3797            .tx
3798            .cursor_read::<tables::ChainState>()?
3799            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3800            .take(1)
3801            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3802
3803        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3804        Ok(last_finalized_block_number)
3805    }
3806}
3807
3808impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3809    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3810        Ok(self
3811            .tx
3812            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3813    }
3814
3815    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3816        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3817    }
3818}
3819
3820impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3821    type Tx = TX;
3822
3823    fn tx_ref(&self) -> &Self::Tx {
3824        &self.tx
3825    }
3826
3827    fn tx_mut(&mut self) -> &mut Self::Tx {
3828        &mut self.tx
3829    }
3830
3831    fn into_tx(self) -> Self::Tx {
3832        self.tx
3833    }
3834
3835    fn prune_modes_ref(&self) -> &PruneModes {
3836        self.prune_modes_ref()
3837    }
3838
3839    /// Commit database transaction, static files, and pending `RocksDB` batches.
3840    #[instrument(
3841        name = "DatabaseProvider::commit",
3842        level = "debug",
3843        target = "providers::db",
3844        skip_all
3845    )]
3846    fn commit(self) -> ProviderResult<()> {
3847        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3848            self.commit_unwind()?;
3849        } else {
3850            // Normal path: finalize() will call sync_all() if not already synced
3851            let mut timings = metrics::CommitTimings::default();
3852
3853            let start = Instant::now();
3854            self.static_file_provider.finalize()?;
3855            timings.sf = start.elapsed();
3856
3857            let start = Instant::now();
3858            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3859            for batch in batches {
3860                self.rocksdb_provider.commit_batch(batch)?;
3861            }
3862            timings.rocksdb = start.elapsed();
3863
3864            let start = Instant::now();
3865            self.tx.commit()?;
3866            timings.mdbx = start.elapsed();
3867
3868            self.metrics.record_commit(&timings);
3869        }
3870
3871        Ok(())
3872    }
3873}
3874
3875impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3876    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3877        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3878    }
3879}
3880
3881impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3882    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3883        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3884    }
3885}
3886
3887impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3888    fn cached_storage_settings(&self) -> StorageSettings {
3889        *self.storage_settings.read()
3890    }
3891
3892    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3893        *self.storage_settings.write() = settings;
3894    }
3895}
3896
3897impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3898    fn storage_path(&self) -> PathBuf {
3899        self.db_path.clone()
3900    }
3901}
3902
3903#[cfg(test)]
3904mod tests {
3905    use super::*;
3906    use crate::{
3907        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3908        BlockWriter,
3909    };
3910    use alloy_consensus::Header;
3911    use alloy_primitives::{
3912        map::{AddressMap, B256Map},
3913        U256,
3914    };
3915    use reth_chain_state::ExecutedBlock;
3916    use reth_db_api::models::StorageSettings;
3917    use reth_ethereum_primitives::Receipt;
3918    use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3919    use reth_primitives_traits::SealedBlock;
3920    use reth_storage_api::MetadataWriter;
3921    use reth_testing_utils::generators::{self, random_block, BlockParams};
3922    use reth_trie::{
3923        HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3924    };
3925    use revm_database::BundleState;
3926    use revm_state::AccountInfo;
3927    use std::{sync::mpsc, time::Duration};
3928
3929    #[test]
3930    fn test_receipts_by_block_range_empty_range() {
3931        let factory = create_test_provider_factory();
3932        let provider = factory.provider().unwrap();
3933
3934        // empty range should return empty vec
3935        let start = 10u64;
3936        let end = 9u64;
3937        let result = provider.receipts_by_block_range(start..=end).unwrap();
3938        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3939    }
3940
3941    #[test]
3942    fn unwind_commit_waits_for_pre_commit_readers() {
3943        let factory = create_test_provider_factory();
3944        factory.set_storage_settings_cache(StorageSettings::v2());
3945
3946        let reader = factory.provider().unwrap();
3947        let provider_rw = factory.unwind_provider_rw().unwrap();
3948        provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3949        let (done_tx, done_rx) = mpsc::channel();
3950
3951        let handle = std::thread::spawn(move || {
3952            let result = provider_rw.commit();
3953            done_tx.send(result).unwrap();
3954        });
3955
3956        assert!(
3957            done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3958            "unwind commit should wait while an older read transaction is still open"
3959        );
3960
3961        drop(reader);
3962
3963        done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
3964        handle.join().unwrap();
3965    }
3966
3967    #[test]
3968    fn test_receipts_by_block_range_nonexistent_blocks() {
3969        let factory = create_test_provider_factory();
3970        let provider = factory.provider().unwrap();
3971
3972        // non-existent blocks should return empty vecs for each block
3973        let result = provider.receipts_by_block_range(10..=12).unwrap();
3974        assert_eq!(result, vec![vec![], vec![], vec![]]);
3975    }
3976
3977    #[test]
3978    fn test_receipts_by_block_range_single_block() {
3979        let factory = create_test_provider_factory();
3980        let data = BlockchainTestData::default();
3981
3982        let provider_rw = factory.provider_rw().unwrap();
3983        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3984        provider_rw
3985            .write_state(
3986                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3987                crate::OriginalValuesKnown::No,
3988                StateWriteConfig::default(),
3989            )
3990            .unwrap();
3991        provider_rw.insert_block(&data.blocks[0].0).unwrap();
3992        provider_rw
3993            .write_state(
3994                &data.blocks[0].1,
3995                crate::OriginalValuesKnown::No,
3996                StateWriteConfig::default(),
3997            )
3998            .unwrap();
3999        provider_rw.commit().unwrap();
4000
4001        let provider = factory.provider().unwrap();
4002        let result = provider.receipts_by_block_range(1..=1).unwrap();
4003
4004        // should have one vec with one receipt
4005        assert_eq!(result.len(), 1);
4006        assert_eq!(result[0].len(), 1);
4007        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4008    }
4009
4010    #[test]
4011    fn test_receipts_by_block_range_multiple_blocks() {
4012        let factory = create_test_provider_factory();
4013        let data = BlockchainTestData::default();
4014
4015        let provider_rw = factory.provider_rw().unwrap();
4016        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4017        provider_rw
4018            .write_state(
4019                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4020                crate::OriginalValuesKnown::No,
4021                StateWriteConfig::default(),
4022            )
4023            .unwrap();
4024        for i in 0..3 {
4025            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4026            provider_rw
4027                .write_state(
4028                    &data.blocks[i].1,
4029                    crate::OriginalValuesKnown::No,
4030                    StateWriteConfig::default(),
4031                )
4032                .unwrap();
4033        }
4034        provider_rw.commit().unwrap();
4035
4036        let provider = factory.provider().unwrap();
4037        let result = provider.receipts_by_block_range(1..=3).unwrap();
4038
4039        // should have 3 vecs, each with one receipt
4040        assert_eq!(result.len(), 3);
4041        for (i, block_receipts) in result.iter().enumerate() {
4042            assert_eq!(block_receipts.len(), 1);
4043            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4044        }
4045    }
4046
4047    #[test]
4048    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4049        let factory = create_test_provider_factory();
4050        let data = BlockchainTestData::default();
4051
4052        let provider_rw = factory.provider_rw().unwrap();
4053        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4054        provider_rw
4055            .write_state(
4056                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4057                crate::OriginalValuesKnown::No,
4058                StateWriteConfig::default(),
4059            )
4060            .unwrap();
4061
4062        // insert blocks 1-3 with receipts
4063        for i in 0..3 {
4064            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4065            provider_rw
4066                .write_state(
4067                    &data.blocks[i].1,
4068                    crate::OriginalValuesKnown::No,
4069                    StateWriteConfig::default(),
4070                )
4071                .unwrap();
4072        }
4073        provider_rw.commit().unwrap();
4074
4075        let provider = factory.provider().unwrap();
4076        let result = provider.receipts_by_block_range(1..=3).unwrap();
4077
4078        // verify each block has one receipt
4079        assert_eq!(result.len(), 3);
4080        for block_receipts in &result {
4081            assert_eq!(block_receipts.len(), 1);
4082        }
4083    }
4084
4085    #[test]
4086    fn test_receipts_by_block_range_partial_range() {
4087        let factory = create_test_provider_factory();
4088        let data = BlockchainTestData::default();
4089
4090        let provider_rw = factory.provider_rw().unwrap();
4091        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4092        provider_rw
4093            .write_state(
4094                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4095                crate::OriginalValuesKnown::No,
4096                StateWriteConfig::default(),
4097            )
4098            .unwrap();
4099        for i in 0..3 {
4100            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4101            provider_rw
4102                .write_state(
4103                    &data.blocks[i].1,
4104                    crate::OriginalValuesKnown::No,
4105                    StateWriteConfig::default(),
4106                )
4107                .unwrap();
4108        }
4109        provider_rw.commit().unwrap();
4110
4111        let provider = factory.provider().unwrap();
4112
4113        // request range that includes both existing and non-existing blocks
4114        let result = provider.receipts_by_block_range(2..=5).unwrap();
4115        assert_eq!(result.len(), 4);
4116
4117        // blocks 2-3 should have receipts, blocks 4-5 should be empty
4118        assert_eq!(result[0].len(), 1); // block 2
4119        assert_eq!(result[1].len(), 1); // block 3
4120        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
4121        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
4122
4123        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4124        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4125    }
4126
4127    #[test]
4128    fn test_receipts_by_block_range_all_empty_blocks() {
4129        let factory = create_test_provider_factory();
4130        let mut rng = generators::rng();
4131
4132        // create blocks with no transactions
4133        let mut blocks = Vec::new();
4134        for i in 0..3 {
4135            let block =
4136                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4137            blocks.push(block);
4138        }
4139
4140        let provider_rw = factory.provider_rw().unwrap();
4141        for block in blocks {
4142            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4143        }
4144        provider_rw.commit().unwrap();
4145
4146        let provider = factory.provider().unwrap();
4147        let result = provider.receipts_by_block_range(1..=3).unwrap();
4148
4149        assert_eq!(result.len(), 3);
4150        for block_receipts in result {
4151            assert_eq!(block_receipts.len(), 0);
4152        }
4153    }
4154
4155    #[test]
4156    fn test_receipts_by_block_range_consistency_with_individual_calls() {
4157        let factory = create_test_provider_factory();
4158        let data = BlockchainTestData::default();
4159
4160        let provider_rw = factory.provider_rw().unwrap();
4161        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4162        provider_rw
4163            .write_state(
4164                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4165                crate::OriginalValuesKnown::No,
4166                StateWriteConfig::default(),
4167            )
4168            .unwrap();
4169        for i in 0..3 {
4170            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4171            provider_rw
4172                .write_state(
4173                    &data.blocks[i].1,
4174                    crate::OriginalValuesKnown::No,
4175                    StateWriteConfig::default(),
4176                )
4177                .unwrap();
4178        }
4179        provider_rw.commit().unwrap();
4180
4181        let provider = factory.provider().unwrap();
4182
4183        // get receipts using block range method
4184        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4185
4186        // get receipts using individual block calls
4187        let mut individual_results = Vec::new();
4188        for block_num in 1..=3 {
4189            let receipts =
4190                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4191            individual_results.push(receipts);
4192        }
4193
4194        assert_eq!(range_result, individual_results);
4195    }
4196
4197    #[test]
4198    fn test_write_trie_updates_sorted() {
4199        use reth_trie::{
4200            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4201            BranchNodeCompact, StorageTrieEntry,
4202        };
4203
4204        let factory = create_test_provider_factory();
4205        let provider_rw = factory.provider_rw().unwrap();
4206
4207        // Pre-populate account trie with data that will be deleted
4208        {
4209            let tx = provider_rw.tx_ref();
4210            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4211
4212            // Add account node that will be deleted
4213            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4214            cursor
4215                .upsert(
4216                    to_delete,
4217                    &BranchNodeCompact::new(
4218                        0b1010_1010_1010_1010, // state_mask
4219                        0b0000_0000_0000_0000, // tree_mask
4220                        0b0000_0000_0000_0000, // hash_mask
4221                        vec![],
4222                        None,
4223                    ),
4224                )
4225                .unwrap();
4226
4227            // Add account node that will be updated
4228            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4229            cursor
4230                .upsert(
4231                    to_update,
4232                    &BranchNodeCompact::new(
4233                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4234                        0b0000_0000_0000_0000, // tree_mask
4235                        0b0000_0000_0000_0000, // hash_mask
4236                        vec![],
4237                        None,
4238                    ),
4239                )
4240                .unwrap();
4241        }
4242
4243        // Pre-populate storage tries with data
4244        let storage_address1 = B256::from([1u8; 32]);
4245        let storage_address2 = B256::from([2u8; 32]);
4246        {
4247            let tx = provider_rw.tx_ref();
4248            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4249
4250            // Add storage nodes for address1 (one will be deleted)
4251            storage_cursor
4252                .upsert(
4253                    storage_address1,
4254                    &StorageTrieEntry {
4255                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4256                        node: BranchNodeCompact::new(
4257                            0b0011_0011_0011_0011, // will be deleted
4258                            0b0000_0000_0000_0000,
4259                            0b0000_0000_0000_0000,
4260                            vec![],
4261                            None,
4262                        ),
4263                    },
4264                )
4265                .unwrap();
4266
4267            // Add storage nodes for address2 (will be wiped)
4268            storage_cursor
4269                .upsert(
4270                    storage_address2,
4271                    &StorageTrieEntry {
4272                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4273                        node: BranchNodeCompact::new(
4274                            0b1100_1100_1100_1100, // will be wiped
4275                            0b0000_0000_0000_0000,
4276                            0b0000_0000_0000_0000,
4277                            vec![],
4278                            None,
4279                        ),
4280                    },
4281                )
4282                .unwrap();
4283            storage_cursor
4284                .upsert(
4285                    storage_address2,
4286                    &StorageTrieEntry {
4287                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4288                        node: BranchNodeCompact::new(
4289                            0b0011_1100_0011_1100, // will be wiped
4290                            0b0000_0000_0000_0000,
4291                            0b0000_0000_0000_0000,
4292                            vec![],
4293                            None,
4294                        ),
4295                    },
4296                )
4297                .unwrap();
4298        }
4299
4300        // Create sorted account trie updates
4301        let account_nodes = vec![
4302            (
4303                Nibbles::from_nibbles([0x1, 0x2]),
4304                Some(BranchNodeCompact::new(
4305                    0b1111_1111_1111_1111, // state_mask (updated)
4306                    0b0000_0000_0000_0000, // tree_mask
4307                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4308                    vec![],
4309                    None,
4310                )),
4311            ),
4312            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4313            (
4314                Nibbles::from_nibbles([0x5, 0x6]),
4315                Some(BranchNodeCompact::new(
4316                    0b1111_1111_1111_1111, // state_mask
4317                    0b0000_0000_0000_0000, // tree_mask
4318                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4319                    vec![],
4320                    None,
4321                )),
4322            ),
4323        ];
4324
4325        // Create sorted storage trie updates
4326        let storage_trie1 = StorageTrieUpdatesSorted {
4327            is_deleted: false,
4328            storage_nodes: vec![
4329                (
4330                    Nibbles::from_nibbles([0x1, 0x0]),
4331                    Some(BranchNodeCompact::new(
4332                        0b1111_0000_0000_0000, // state_mask
4333                        0b0000_0000_0000_0000, // tree_mask
4334                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4335                        vec![],
4336                        None,
4337                    )),
4338                ),
4339                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4340            ],
4341        };
4342
4343        let storage_trie2 = StorageTrieUpdatesSorted {
4344            is_deleted: true, // Wipe all storage for this address
4345            storage_nodes: vec![],
4346        };
4347
4348        let mut storage_tries = B256Map::default();
4349        storage_tries.insert(storage_address1, storage_trie1);
4350        storage_tries.insert(storage_address2, storage_trie2);
4351
4352        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4353
4354        // Write the sorted trie updates
4355        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4356
4357        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4358        // storage deletion = 5
4359        assert_eq!(num_entries, 5);
4360
4361        // Verify account trie updates were written correctly
4362        let tx = provider_rw.tx_ref();
4363        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4364
4365        // Check first account node was updated
4366        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4367        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4368        assert!(entry1.is_some(), "Updated account node should exist");
4369        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4370        assert_eq!(
4371            entry1.unwrap().1.state_mask,
4372            expected_mask,
4373            "Account node should have updated state_mask"
4374        );
4375
4376        // Check deleted account node no longer exists
4377        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4378        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4379        assert!(entry2.is_none(), "Deleted account node should not exist");
4380
4381        // Check new account node exists
4382        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4383        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4384        assert!(entry3.is_some(), "New account node should exist");
4385
4386        // Verify storage trie updates were written correctly
4387        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4388
4389        // Check storage for address1
4390        let storage_entries1: Vec<_> = storage_cursor
4391            .walk_dup(Some(storage_address1), None)
4392            .unwrap()
4393            .collect::<Result<Vec<_>, _>>()
4394            .unwrap();
4395        assert_eq!(
4396            storage_entries1.len(),
4397            1,
4398            "Storage address1 should have 1 entry after deletion"
4399        );
4400        assert_eq!(
4401            storage_entries1[0].1.nibbles.0,
4402            Nibbles::from_nibbles([0x1, 0x0]),
4403            "Remaining entry should be [0x1, 0x0]"
4404        );
4405
4406        // Check storage for address2 was wiped
4407        let storage_entries2: Vec<_> = storage_cursor
4408            .walk_dup(Some(storage_address2), None)
4409            .unwrap()
4410            .collect::<Result<Vec<_>, _>>()
4411            .unwrap();
4412        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4413
4414        provider_rw.commit().unwrap();
4415    }
4416
4417    #[test]
4418    fn test_prunable_receipts_logic() {
4419        let insert_blocks =
4420            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4421                let mut rng = generators::rng();
4422                for block_num in 0..=tip_block {
4423                    let block = random_block(
4424                        &mut rng,
4425                        block_num,
4426                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4427                    );
4428                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4429                }
4430            };
4431
4432        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4433            let outcome = ExecutionOutcome {
4434                first_block: block,
4435                receipts: vec![vec![Receipt {
4436                    tx_type: Default::default(),
4437                    success: true,
4438                    cumulative_gas_used: block, // identifier to assert against
4439                    logs: vec![],
4440                }]],
4441                ..Default::default()
4442            };
4443            provider_rw
4444                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4445                .unwrap();
4446            provider_rw.commit().unwrap();
4447        };
4448
4449        // Legacy mode (receipts in DB) - should be prunable
4450        {
4451            let factory = create_test_provider_factory();
4452            let storage_settings = StorageSettings::v1();
4453            factory.set_storage_settings_cache(storage_settings);
4454            let factory = factory.with_prune_modes(PruneModes {
4455                receipts: Some(PruneMode::Before(100)),
4456                ..Default::default()
4457            });
4458
4459            let tip_block = 200u64;
4460            let first_block = 1u64;
4461
4462            // create chain
4463            let provider_rw = factory.provider_rw().unwrap();
4464            insert_blocks(&provider_rw, tip_block, 1);
4465            provider_rw.commit().unwrap();
4466
4467            write_receipts(
4468                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4469                first_block,
4470            );
4471            write_receipts(
4472                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4473                tip_block - 1,
4474            );
4475
4476            let provider = factory.provider().unwrap();
4477
4478            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4479                assert!(provider
4480                    .receipts_by_block(block.into())
4481                    .unwrap()
4482                    .is_some_and(|r| r.len() == num_receipts));
4483            }
4484        }
4485
4486        // Static files mode
4487        {
4488            let factory = create_test_provider_factory();
4489            let storage_settings = StorageSettings::v2();
4490            factory.set_storage_settings_cache(storage_settings);
4491            let factory = factory.with_prune_modes(PruneModes {
4492                receipts: Some(PruneMode::Before(2)),
4493                ..Default::default()
4494            });
4495
4496            let tip_block = 200u64;
4497
4498            // create chain
4499            let provider_rw = factory.provider_rw().unwrap();
4500            insert_blocks(&provider_rw, tip_block, 1);
4501            provider_rw.commit().unwrap();
4502
4503            // Attempt to write receipts for block 0 and 1 (should be skipped)
4504            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4505            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4506
4507            assert!(factory
4508                .static_file_provider()
4509                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4510                .is_none(),);
4511            assert!(factory
4512                .static_file_provider()
4513                .get_highest_static_file_block(StaticFileSegment::Receipts)
4514                .is_some_and(|b| b == 1),);
4515
4516            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4517            // static files.
4518            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4519            assert!(factory
4520                .static_file_provider()
4521                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4522                .is_some_and(|num| num == 2),);
4523
4524            // After having a receipt already in static files, attempt to skip the next receipt by
4525            // changing the prune mode. It should NOT skip it and should still write the receipt,
4526            // since static files do not support gaps.
4527            let factory = factory.with_prune_modes(PruneModes {
4528                receipts: Some(PruneMode::Before(100)),
4529                ..Default::default()
4530            });
4531            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4532            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4533            write_receipts(provider_rw, 3);
4534
4535            // Ensure we can only fetch the 2 last receipts.
4536            //
4537            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4538            // to the block number it belongs to easily identify and assert.
4539            let provider = factory.provider().unwrap();
4540            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4541            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4542                assert!(provider
4543                    .receipts_by_block(num.into())
4544                    .unwrap()
4545                    .is_some_and(|r| r.len() == num_receipts));
4546
4547                let receipt = provider.receipt(num).unwrap();
4548                if num_receipts > 0 {
4549                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4550                } else {
4551                    assert!(receipt.is_none());
4552                }
4553            }
4554        }
4555    }
4556
4557    #[test]
4558    fn test_try_into_history_rejects_unexecuted_blocks() {
4559        use reth_storage_api::TryIntoHistoricalStateProvider;
4560
4561        let factory = create_test_provider_factory();
4562
4563        // Insert genesis block to have some data
4564        let data = BlockchainTestData::default();
4565        let provider_rw = factory.provider_rw().unwrap();
4566        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4567        provider_rw
4568            .write_state(
4569                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4570                crate::OriginalValuesKnown::No,
4571                StateWriteConfig::default(),
4572            )
4573            .unwrap();
4574        provider_rw.commit().unwrap();
4575
4576        // Get a fresh provider - Execution checkpoint is 0, no receipts written beyond genesis
4577        let provider = factory.provider().unwrap();
4578
4579        // Requesting historical state for block 0 (executed) should succeed
4580        let result = provider.try_into_history_at_block(0);
4581        assert!(result.is_ok(), "Block 0 should be available");
4582
4583        // Get another provider and request state for block 100 (not executed)
4584        let provider = factory.provider().unwrap();
4585        let result = provider.try_into_history_at_block(100);
4586
4587        // Should fail with BlockNotExecuted error
4588        match result {
4589            Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4590            Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4591            Ok(_) => panic!("Expected error, got Ok"),
4592        }
4593    }
4594
4595    #[test]
4596    fn test_unwind_storage_hashing_with_hashed_state() {
4597        let factory = create_test_provider_factory();
4598        let storage_settings = StorageSettings::v2();
4599        factory.set_storage_settings_cache(storage_settings);
4600
4601        let address = Address::random();
4602        let hashed_address = keccak256(address);
4603
4604        let plain_slot = B256::random();
4605        let hashed_slot = keccak256(plain_slot);
4606
4607        let current_value = U256::from(100);
4608        let old_value = U256::from(42);
4609
4610        let provider_rw = factory.provider_rw().unwrap();
4611        provider_rw
4612            .tx
4613            .cursor_dup_write::<tables::HashedStorages>()
4614            .unwrap()
4615            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4616            .unwrap();
4617
4618        let changesets = vec![(
4619            BlockNumberAddress((1, address)),
4620            StorageEntry { key: plain_slot, value: old_value },
4621        )];
4622
4623        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4624
4625        assert_eq!(result.len(), 1);
4626        assert!(result.contains_key(&hashed_address));
4627        assert!(result[&hashed_address].contains(&hashed_slot));
4628
4629        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4630        let entry = cursor
4631            .seek_by_key_subkey(hashed_address, hashed_slot)
4632            .unwrap()
4633            .expect("entry should exist");
4634        assert_eq!(entry.key, hashed_slot);
4635        assert_eq!(entry.value, old_value);
4636    }
4637
4638    #[test]
4639    fn test_write_and_remove_state_roundtrip_legacy() {
4640        let factory = create_test_provider_factory();
4641        let storage_settings = StorageSettings::v1();
4642        assert!(!storage_settings.use_hashed_state());
4643        factory.set_storage_settings_cache(storage_settings);
4644
4645        let address = Address::with_last_byte(1);
4646        let hashed_address = keccak256(address);
4647        let slot = U256::from(5);
4648        let slot_key = B256::from(slot);
4649        let hashed_slot = keccak256(slot_key);
4650
4651        let mut rng = generators::rng();
4652        let block0 =
4653            random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4654        let block1 =
4655            random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4656
4657        {
4658            let provider_rw = factory.provider_rw().unwrap();
4659            provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4660            provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4661            provider_rw
4662                .tx
4663                .cursor_write::<tables::PlainAccountState>()
4664                .unwrap()
4665                .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4666                .unwrap();
4667            provider_rw.commit().unwrap();
4668        }
4669
4670        let provider_rw = factory.provider_rw().unwrap();
4671
4672        let mut state_init: BundleStateInit = AddressMap::default();
4673        let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4674        storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4675        state_init.insert(
4676            address,
4677            (
4678                Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4679                Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4680                storage_map,
4681            ),
4682        );
4683
4684        let mut reverts_init: RevertsInit = HashMap::default();
4685        let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4686        block_reverts.insert(
4687            address,
4688            (
4689                Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4690                vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4691            ),
4692        );
4693        reverts_init.insert(1, block_reverts);
4694
4695        let execution_outcome =
4696            ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4697
4698        provider_rw
4699            .write_state(
4700                &execution_outcome,
4701                OriginalValuesKnown::Yes,
4702                StateWriteConfig {
4703                    write_receipts: false,
4704                    write_account_changesets: true,
4705                    write_storage_changesets: true,
4706                },
4707            )
4708            .unwrap();
4709
4710        let hashed_state =
4711            execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4712        provider_rw.write_hashed_state(&hashed_state).unwrap();
4713
4714        let account = provider_rw
4715            .tx
4716            .cursor_read::<tables::PlainAccountState>()
4717            .unwrap()
4718            .seek_exact(address)
4719            .unwrap()
4720            .unwrap()
4721            .1;
4722        assert_eq!(account.nonce, 1);
4723
4724        let storage_entry = provider_rw
4725            .tx
4726            .cursor_dup_read::<tables::PlainStorageState>()
4727            .unwrap()
4728            .seek_by_key_subkey(address, slot_key)
4729            .unwrap()
4730            .unwrap();
4731        assert_eq!(storage_entry.key, slot_key);
4732        assert_eq!(storage_entry.value, U256::from(10));
4733
4734        let hashed_entry = provider_rw
4735            .tx
4736            .cursor_dup_read::<tables::HashedStorages>()
4737            .unwrap()
4738            .seek_by_key_subkey(hashed_address, hashed_slot)
4739            .unwrap()
4740            .unwrap();
4741        assert_eq!(hashed_entry.key, hashed_slot);
4742        assert_eq!(hashed_entry.value, U256::from(10));
4743
4744        let account_cs_entries = provider_rw
4745            .tx
4746            .cursor_dup_read::<tables::AccountChangeSets>()
4747            .unwrap()
4748            .walk(Some(1))
4749            .unwrap()
4750            .collect::<Result<Vec<_>, _>>()
4751            .unwrap();
4752        assert!(!account_cs_entries.is_empty());
4753
4754        let storage_cs_entries = provider_rw
4755            .tx
4756            .cursor_read::<tables::StorageChangeSets>()
4757            .unwrap()
4758            .walk(Some(BlockNumberAddress((1, address))))
4759            .unwrap()
4760            .collect::<Result<Vec<_>, _>>()
4761            .unwrap();
4762        assert!(!storage_cs_entries.is_empty());
4763        assert_eq!(storage_cs_entries[0].1.key, slot_key);
4764
4765        provider_rw.remove_state_above(0).unwrap();
4766
4767        let restored_account = provider_rw
4768            .tx
4769            .cursor_read::<tables::PlainAccountState>()
4770            .unwrap()
4771            .seek_exact(address)
4772            .unwrap()
4773            .unwrap()
4774            .1;
4775        assert_eq!(restored_account.nonce, 0);
4776
4777        let storage_gone = provider_rw
4778            .tx
4779            .cursor_dup_read::<tables::PlainStorageState>()
4780            .unwrap()
4781            .seek_by_key_subkey(address, slot_key)
4782            .unwrap();
4783        assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4784
4785        let account_cs_after = provider_rw
4786            .tx
4787            .cursor_dup_read::<tables::AccountChangeSets>()
4788            .unwrap()
4789            .walk(Some(1))
4790            .unwrap()
4791            .collect::<Result<Vec<_>, _>>()
4792            .unwrap();
4793        assert!(account_cs_after.is_empty());
4794
4795        let storage_cs_after = provider_rw
4796            .tx
4797            .cursor_read::<tables::StorageChangeSets>()
4798            .unwrap()
4799            .walk(Some(BlockNumberAddress((1, address))))
4800            .unwrap()
4801            .collect::<Result<Vec<_>, _>>()
4802            .unwrap();
4803        assert!(storage_cs_after.is_empty());
4804    }
4805
4806    #[test]
4807    fn test_unwind_storage_hashing_legacy() {
4808        let factory = create_test_provider_factory();
4809        let storage_settings = StorageSettings::v1();
4810        assert!(!storage_settings.use_hashed_state());
4811        factory.set_storage_settings_cache(storage_settings);
4812
4813        let address = Address::random();
4814        let hashed_address = keccak256(address);
4815
4816        let plain_slot = B256::random();
4817        let hashed_slot = keccak256(plain_slot);
4818
4819        let current_value = U256::from(100);
4820        let old_value = U256::from(42);
4821
4822        let provider_rw = factory.provider_rw().unwrap();
4823        provider_rw
4824            .tx
4825            .cursor_dup_write::<tables::HashedStorages>()
4826            .unwrap()
4827            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4828            .unwrap();
4829
4830        let changesets = vec![(
4831            BlockNumberAddress((1, address)),
4832            StorageEntry { key: plain_slot, value: old_value },
4833        )];
4834
4835        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4836
4837        assert_eq!(result.len(), 1);
4838        assert!(result.contains_key(&hashed_address));
4839        assert!(result[&hashed_address].contains(&hashed_slot));
4840
4841        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4842        let entry = cursor
4843            .seek_by_key_subkey(hashed_address, hashed_slot)
4844            .unwrap()
4845            .expect("entry should exist");
4846        assert_eq!(entry.key, hashed_slot);
4847        assert_eq!(entry.value, old_value);
4848    }
4849
4850    #[test]
4851    fn test_write_state_and_historical_read_hashed() {
4852        use reth_storage_api::StateProvider;
4853        use reth_trie::{HashedPostState, KeccakKeyHasher};
4854        use revm_database::BundleState;
4855        use revm_state::AccountInfo;
4856
4857        let factory = create_test_provider_factory();
4858        factory.set_storage_settings_cache(StorageSettings::v2());
4859
4860        let address = Address::with_last_byte(1);
4861        let slot = U256::from(5);
4862        let slot_key = B256::from(slot);
4863        let hashed_address = keccak256(address);
4864        let hashed_slot = keccak256(slot_key);
4865
4866        {
4867            let sf = factory.static_file_provider();
4868            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4869            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4870            hw.append_header(&h0, &B256::ZERO).unwrap();
4871            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4872            hw.append_header(&h1, &B256::ZERO).unwrap();
4873            hw.commit().unwrap();
4874
4875            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4876            aw.append_account_changeset(vec![], 0).unwrap();
4877            aw.commit().unwrap();
4878
4879            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4880            sw.append_storage_changeset(vec![], 0).unwrap();
4881            sw.commit().unwrap();
4882        }
4883
4884        let provider_rw = factory.provider_rw().unwrap();
4885
4886        let bundle = BundleState::builder(1..=1)
4887            .state_present_account_info(
4888                address,
4889                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4890            )
4891            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4892            .revert_account_info(1, address, Some(None))
4893            .revert_storage(1, address, vec![(slot, U256::ZERO)])
4894            .build();
4895
4896        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4897
4898        provider_rw
4899            .tx
4900            .put::<tables::BlockBodyIndices>(
4901                1,
4902                StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4903            )
4904            .unwrap();
4905
4906        provider_rw
4907            .write_state(
4908                &execution_outcome,
4909                OriginalValuesKnown::Yes,
4910                StateWriteConfig {
4911                    write_receipts: false,
4912                    write_account_changesets: true,
4913                    write_storage_changesets: true,
4914                },
4915            )
4916            .unwrap();
4917
4918        let hashed_state =
4919            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4920        provider_rw.write_hashed_state(&hashed_state).unwrap();
4921
4922        let plain_storage_entries = provider_rw
4923            .tx
4924            .cursor_dup_read::<tables::PlainStorageState>()
4925            .unwrap()
4926            .walk(None)
4927            .unwrap()
4928            .collect::<Result<Vec<_>, _>>()
4929            .unwrap();
4930        assert!(plain_storage_entries.is_empty());
4931
4932        let hashed_entry = provider_rw
4933            .tx
4934            .cursor_dup_read::<tables::HashedStorages>()
4935            .unwrap()
4936            .seek_by_key_subkey(hashed_address, hashed_slot)
4937            .unwrap()
4938            .unwrap();
4939        assert_eq!(hashed_entry.key, hashed_slot);
4940        assert_eq!(hashed_entry.value, U256::from(10));
4941
4942        provider_rw.static_file_provider().commit().unwrap();
4943
4944        let sf = factory.static_file_provider();
4945        let storage_cs = sf.storage_changeset(1).unwrap();
4946        assert!(!storage_cs.is_empty());
4947        assert_eq!(storage_cs[0].1.key, slot_key);
4948
4949        let account_cs = sf.account_block_changeset(1).unwrap();
4950        assert!(!account_cs.is_empty());
4951        assert_eq!(account_cs[0].address, address);
4952
4953        let historical_value =
4954            HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4955        assert_eq!(historical_value, None);
4956    }
4957
4958    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4959    enum StorageMode {
4960        V1,
4961        V2,
4962    }
4963
4964    fn run_save_blocks_and_verify(mode: StorageMode) {
4965        use alloy_primitives::map::{FbBuildHasher, HashMap};
4966
4967        let factory = create_test_provider_factory();
4968
4969        match mode {
4970            StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4971            StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4972        }
4973
4974        let num_blocks = 3u64;
4975        let accounts_per_block = 5usize;
4976        let slots_per_account = 3usize;
4977
4978        let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4979            SealedHeader::new(
4980                Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4981                B256::ZERO,
4982            ),
4983            Default::default(),
4984        );
4985
4986        let genesis_executed = ExecutedBlock::new(
4987            Arc::new(genesis.try_recover().unwrap()),
4988            Arc::new(BlockExecutionOutput {
4989                result: BlockExecutionResult {
4990                    receipts: vec![],
4991                    requests: Default::default(),
4992                    gas_used: 0,
4993                    blob_gas_used: 0,
4994                },
4995                state: Default::default(),
4996            }),
4997            ComputedTrieData::default(),
4998        );
4999        let provider_rw = factory.provider_rw().unwrap();
5000        provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5001        provider_rw.commit().unwrap();
5002
5003        let mut blocks: Vec<ExecutedBlock> = Vec::new();
5004        let mut parent_hash = B256::ZERO;
5005
5006        for block_num in 1..=num_blocks {
5007            let mut builder = BundleState::builder(block_num..=block_num);
5008
5009            for acct_idx in 0..accounts_per_block {
5010                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5011                let info = AccountInfo {
5012                    nonce: block_num,
5013                    balance: U256::from(block_num * 100 + acct_idx as u64),
5014                    ..Default::default()
5015                };
5016
5017                let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5018                    slots_per_account as u64)
5019                    .map(|s| {
5020                        (
5021                            U256::from(s + acct_idx as u64 * 100),
5022                            (U256::ZERO, U256::from(block_num * 1000 + s)),
5023                        )
5024                    })
5025                    .collect();
5026
5027                let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5028                    .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5029                    .collect();
5030
5031                builder = builder
5032                    .state_present_account_info(address, info)
5033                    .revert_account_info(block_num, address, Some(None))
5034                    .state_storage(address, storage)
5035                    .revert_storage(block_num, address, revert_storage);
5036            }
5037
5038            let bundle = builder.build();
5039
5040            let hashed_state =
5041                HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5042
5043            let header = Header {
5044                number: block_num,
5045                parent_hash,
5046                difficulty: U256::from(1),
5047                ..Default::default()
5048            };
5049            let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5050                header,
5051                Default::default(),
5052            );
5053            parent_hash = block.hash();
5054
5055            let executed = ExecutedBlock::new(
5056                Arc::new(block.try_recover().unwrap()),
5057                Arc::new(BlockExecutionOutput {
5058                    result: BlockExecutionResult {
5059                        receipts: vec![],
5060                        requests: Default::default(),
5061                        gas_used: 0,
5062                        blob_gas_used: 0,
5063                    },
5064                    state: bundle,
5065                }),
5066                ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5067            );
5068            blocks.push(executed);
5069        }
5070
5071        let provider_rw = factory.provider_rw().unwrap();
5072        provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5073        provider_rw.commit().unwrap();
5074
5075        let provider = factory.provider().unwrap();
5076
5077        for block_num in 1..=num_blocks {
5078            for acct_idx in 0..accounts_per_block {
5079                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5080                let hashed_address = keccak256(address);
5081
5082                let ha_entry = provider
5083                    .tx_ref()
5084                    .cursor_read::<tables::HashedAccounts>()
5085                    .unwrap()
5086                    .seek_exact(hashed_address)
5087                    .unwrap();
5088                assert!(
5089                    ha_entry.is_some(),
5090                    "HashedAccounts missing for block {block_num} acct {acct_idx}"
5091                );
5092
5093                for s in 1..=slots_per_account as u64 {
5094                    let slot = U256::from(s + acct_idx as u64 * 100);
5095                    let slot_key = B256::from(slot);
5096                    let hashed_slot = keccak256(slot_key);
5097
5098                    let hs_entry = provider
5099                        .tx_ref()
5100                        .cursor_dup_read::<tables::HashedStorages>()
5101                        .unwrap()
5102                        .seek_by_key_subkey(hashed_address, hashed_slot)
5103                        .unwrap();
5104                    assert!(
5105                        hs_entry.is_some(),
5106                        "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5107                    );
5108                    let entry = hs_entry.unwrap();
5109                    assert_eq!(entry.key, hashed_slot);
5110                    assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5111                }
5112            }
5113        }
5114
5115        for block_num in 1..=num_blocks {
5116            let header = provider.header_by_number(block_num).unwrap();
5117            assert!(header.is_some(), "Header missing for block {block_num}");
5118
5119            let indices = provider.block_body_indices(block_num).unwrap();
5120            assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5121        }
5122
5123        let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5124        let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5125
5126        if mode == StorageMode::V2 {
5127            assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5128            assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5129
5130            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5131            assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5132
5133            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5134            assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5135
5136            provider.static_file_provider().commit().unwrap();
5137            let sf = factory.static_file_provider();
5138
5139            for block_num in 1..=num_blocks {
5140                let account_cs = sf.account_block_changeset(block_num).unwrap();
5141                assert!(
5142                    !account_cs.is_empty(),
5143                    "v2: static file AccountChangeSets should exist for block {block_num}"
5144                );
5145
5146                let storage_cs = sf.storage_changeset(block_num).unwrap();
5147                assert!(
5148                    !storage_cs.is_empty(),
5149                    "v2: static file StorageChangeSets should exist for block {block_num}"
5150                );
5151
5152                for (_, entry) in &storage_cs {
5153                    assert!(
5154                        entry.key != keccak256(entry.key),
5155                        "v2: static file storage changeset should have plain slot keys"
5156                    );
5157                }
5158            }
5159
5160            let rocksdb = factory.rocksdb_provider();
5161            for block_num in 1..=num_blocks {
5162                for acct_idx in 0..accounts_per_block {
5163                    let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5164                    let shards = rocksdb.account_history_shards(address).unwrap();
5165                    assert!(
5166                        !shards.is_empty(),
5167                        "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5168                    );
5169
5170                    for s in 1..=slots_per_account as u64 {
5171                        let slot = U256::from(s + acct_idx as u64 * 100);
5172                        let slot_key = B256::from(slot);
5173                        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5174                        assert!(
5175                            !shards.is_empty(),
5176                            "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5177                        );
5178                    }
5179                }
5180            }
5181        } else {
5182            assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5183            assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5184
5185            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5186            assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5187
5188            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5189            assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5190
5191            for block_num in 1..=num_blocks {
5192                let storage_entries: Vec<_> = provider
5193                    .tx_ref()
5194                    .cursor_dup_read::<tables::StorageChangeSets>()
5195                    .unwrap()
5196                    .walk_range(BlockNumberAddress::range(block_num..=block_num))
5197                    .unwrap()
5198                    .collect::<Result<Vec<_>, _>>()
5199                    .unwrap();
5200                assert!(
5201                    !storage_entries.is_empty(),
5202                    "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5203                );
5204
5205                for (_, entry) in &storage_entries {
5206                    let slot_key = B256::from(entry.key);
5207                    assert!(
5208                        slot_key != keccak256(slot_key),
5209                        "v1: storage changeset keys should be plain (not hashed)"
5210                    );
5211                }
5212            }
5213
5214            let mdbx_account_history =
5215                provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5216            assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5217
5218            let mdbx_storage_history =
5219                provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5220            assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5221        }
5222    }
5223
5224    #[test]
5225    fn test_save_blocks_v1_table_assertions() {
5226        run_save_blocks_and_verify(StorageMode::V1);
5227    }
5228
5229    #[test]
5230    fn test_save_blocks_v2_table_assertions() {
5231        run_save_blocks_and_verify(StorageMode::V2);
5232    }
5233
5234    #[test]
5235    fn test_write_and_remove_state_roundtrip_v2() {
5236        let factory = create_test_provider_factory();
5237        let storage_settings = StorageSettings::v2();
5238        assert!(storage_settings.use_hashed_state());
5239        factory.set_storage_settings_cache(storage_settings);
5240
5241        let address = Address::with_last_byte(1);
5242        let hashed_address = keccak256(address);
5243        let slot = U256::from(5);
5244        let slot_key = B256::from(slot);
5245        let hashed_slot = keccak256(slot_key);
5246
5247        {
5248            let sf = factory.static_file_provider();
5249            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5250            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5251            hw.append_header(&h0, &B256::ZERO).unwrap();
5252            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5253            hw.append_header(&h1, &B256::ZERO).unwrap();
5254            hw.commit().unwrap();
5255
5256            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5257            aw.append_account_changeset(vec![], 0).unwrap();
5258            aw.commit().unwrap();
5259
5260            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5261            sw.append_storage_changeset(vec![], 0).unwrap();
5262            sw.commit().unwrap();
5263        }
5264
5265        {
5266            let provider_rw = factory.provider_rw().unwrap();
5267            provider_rw
5268                .tx
5269                .put::<tables::BlockBodyIndices>(
5270                    0,
5271                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5272                )
5273                .unwrap();
5274            provider_rw
5275                .tx
5276                .put::<tables::BlockBodyIndices>(
5277                    1,
5278                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5279                )
5280                .unwrap();
5281            provider_rw
5282                .tx
5283                .cursor_write::<tables::HashedAccounts>()
5284                .unwrap()
5285                .upsert(
5286                    hashed_address,
5287                    &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5288                )
5289                .unwrap();
5290            provider_rw.commit().unwrap();
5291        }
5292
5293        let provider_rw = factory.provider_rw().unwrap();
5294
5295        let bundle = BundleState::builder(1..=1)
5296            .state_present_account_info(
5297                address,
5298                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5299            )
5300            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5301            .revert_account_info(1, address, Some(None))
5302            .revert_storage(1, address, vec![(slot, U256::ZERO)])
5303            .build();
5304
5305        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5306
5307        provider_rw
5308            .write_state(
5309                &execution_outcome,
5310                OriginalValuesKnown::Yes,
5311                StateWriteConfig {
5312                    write_receipts: false,
5313                    write_account_changesets: true,
5314                    write_storage_changesets: true,
5315                },
5316            )
5317            .unwrap();
5318
5319        let hashed_state =
5320            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5321        provider_rw.write_hashed_state(&hashed_state).unwrap();
5322
5323        let hashed_account = provider_rw
5324            .tx
5325            .cursor_read::<tables::HashedAccounts>()
5326            .unwrap()
5327            .seek_exact(hashed_address)
5328            .unwrap()
5329            .unwrap()
5330            .1;
5331        assert_eq!(hashed_account.nonce, 1);
5332
5333        let hashed_entry = provider_rw
5334            .tx
5335            .cursor_dup_read::<tables::HashedStorages>()
5336            .unwrap()
5337            .seek_by_key_subkey(hashed_address, hashed_slot)
5338            .unwrap()
5339            .unwrap();
5340        assert_eq!(hashed_entry.key, hashed_slot);
5341        assert_eq!(hashed_entry.value, U256::from(10));
5342
5343        let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5344        assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5345
5346        let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5347        assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5348
5349        provider_rw.static_file_provider().commit().unwrap();
5350
5351        let sf = factory.static_file_provider();
5352        let storage_cs = sf.storage_changeset(1).unwrap();
5353        assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5354        assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5355
5356        provider_rw.remove_state_above(0).unwrap();
5357
5358        let restored_account = provider_rw
5359            .tx
5360            .cursor_read::<tables::HashedAccounts>()
5361            .unwrap()
5362            .seek_exact(hashed_address)
5363            .unwrap();
5364        assert!(
5365            restored_account.is_none(),
5366            "v2: account should be removed (didn't exist before block 1)"
5367        );
5368
5369        let storage_gone = provider_rw
5370            .tx
5371            .cursor_dup_read::<tables::HashedStorages>()
5372            .unwrap()
5373            .seek_by_key_subkey(hashed_address, hashed_slot)
5374            .unwrap();
5375        assert!(
5376            storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5377            "v2: storage should be reverted (removed or different key)"
5378        );
5379
5380        let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5381        assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5382
5383        let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5384        assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5385    }
5386
5387    #[test]
5388    fn test_unwind_storage_history_indices_v2() {
5389        let factory = create_test_provider_factory();
5390        factory.set_storage_settings_cache(StorageSettings::v2());
5391
5392        let address = Address::with_last_byte(1);
5393        let slot_key = B256::from(U256::from(42));
5394
5395        {
5396            let rocksdb = factory.rocksdb_provider();
5397            let mut batch = rocksdb.batch();
5398            batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5399            batch.commit().unwrap();
5400
5401            let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5402            assert!(!shards.is_empty(), "history should be written to rocksdb");
5403        }
5404
5405        let provider_rw = factory.provider_rw().unwrap();
5406
5407        let changesets = vec![
5408            (
5409                BlockNumberAddress((7, address)),
5410                StorageEntry { key: slot_key, value: U256::from(5) },
5411            ),
5412            (
5413                BlockNumberAddress((10, address)),
5414                StorageEntry { key: slot_key, value: U256::from(8) },
5415            ),
5416        ];
5417
5418        let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5419        assert_eq!(count, 2);
5420
5421        provider_rw.commit().unwrap();
5422
5423        let rocksdb = factory.rocksdb_provider();
5424        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5425
5426        assert!(
5427            !shards.is_empty(),
5428            "history shards should still exist with block 3 after partial unwind"
5429        );
5430
5431        let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5432        assert!(all_blocks.contains(&3), "block 3 should remain");
5433        assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5434        assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5435    }
5436}