Skip to main content

reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, AddressSet, B256Map, HashMap},
31    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::Database,
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44        StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55    SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader, StoragePath,
65    StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77    cmp::Ordering,
78    collections::{BTreeMap, BTreeSet},
79    fmt::Debug,
80    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81    path::PathBuf,
82    sync::Arc,
83};
84use tracing::{debug, instrument, trace};
85
86/// Determines the commit order for database operations.
87#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
90    #[default]
91    Normal,
92    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
93    /// Used for unwind operations to allow recovery by truncating static files on restart.
94    Unwind,
95}
96
97impl CommitOrder {
98    /// Returns true if this is unwind commit order.
99    pub const fn is_unwind(&self) -> bool {
100        matches!(self, Self::Unwind)
101    }
102}
103
104/// A [`DatabaseProvider`] that holds a read-only database transaction.
105pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107/// A [`DatabaseProvider`] that holds a read-write database transaction.
108///
109/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
110/// Once that issue is solved, we can probably revert back to being an alias type.
111#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113    pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119    fn deref(&self) -> &Self::Target {
120        &self.0
121    }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125    fn deref_mut(&mut self) -> &mut Self::Target {
126        &mut self.0
127    }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131    for DatabaseProviderRW<DB, N>
132{
133    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134        &self.0
135    }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139    /// Commit database transaction and static file if it exists.
140    pub fn commit(self) -> ProviderResult<()> {
141        self.0.commit()
142    }
143
144    /// Consume `DbTx` or `DbTxMut`.
145    pub fn into_tx(self) -> <DB as Database>::TXMut {
146        self.0.into_tx()
147    }
148
149    /// Override the minimum pruning distance for testing purposes.
150    #[cfg(any(test, feature = "test-utils"))]
151    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152        self.0.minimum_pruning_distance = distance;
153        self
154    }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158    for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161        provider.0
162    }
163}
164
165/// Mode for [`DatabaseProvider::save_blocks`].
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168    /// Full mode: write block structure + receipts + state + trie.
169    /// Used by engine/production code.
170    Full,
171    /// Blocks only: write block structure (headers, txs, senders, indices).
172    /// Receipts/state/trie are skipped - they may come later via separate calls.
173    /// Used by `insert_block`.
174    BlocksOnly,
175}
176
177impl SaveBlocksMode {
178    /// Returns `true` if this is [`SaveBlocksMode::Full`].
179    pub const fn with_state(self) -> bool {
180        matches!(self, Self::Full)
181    }
182}
183
184/// A provider struct that fetches data from the database.
185/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
186pub struct DatabaseProvider<TX, N: NodeTypes> {
187    /// Database transaction.
188    tx: TX,
189    /// Chain spec
190    chain_spec: Arc<N::ChainSpec>,
191    /// Static File provider
192    static_file_provider: StaticFileProvider<N::Primitives>,
193    /// Pruning configuration
194    prune_modes: PruneModes,
195    /// Node storage handler.
196    storage: Arc<N::Storage>,
197    /// Storage configuration settings for this node
198    storage_settings: Arc<RwLock<StorageSettings>>,
199    /// `RocksDB` provider
200    rocksdb_provider: RocksDBProvider,
201    /// Changeset cache for trie unwinding
202    changeset_cache: ChangesetCache,
203    /// Task runtime for spawning parallel I/O work.
204    runtime: reth_tasks::Runtime,
205    /// Path to the database directory.
206    db_path: PathBuf,
207    /// Pending `RocksDB` batches to be committed at provider commit time.
208    pending_rocksdb_batches: PendingRocksDBBatches,
209    /// Commit order for database operations.
210    commit_order: CommitOrder,
211    /// Minimum distance from tip required for pruning
212    minimum_pruning_distance: u64,
213    /// Database provider metrics
214    metrics: metrics::DatabaseProviderMetrics,
215}
216
217impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        let mut s = f.debug_struct("DatabaseProvider");
220        s.field("tx", &self.tx)
221            .field("chain_spec", &self.chain_spec)
222            .field("static_file_provider", &self.static_file_provider)
223            .field("prune_modes", &self.prune_modes)
224            .field("storage", &self.storage)
225            .field("storage_settings", &self.storage_settings)
226            .field("rocksdb_provider", &self.rocksdb_provider)
227            .field("changeset_cache", &self.changeset_cache)
228            .field("runtime", &self.runtime)
229            .field("pending_rocksdb_batches", &"<pending batches>")
230            .field("commit_order", &self.commit_order)
231            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
232            .finish()
233    }
234}
235
236impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
237    /// Returns reference to prune modes.
238    pub const fn prune_modes_ref(&self) -> &PruneModes {
239        &self.prune_modes
240    }
241
242    /// Sets the minimum pruning distance.
243    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
244        self.minimum_pruning_distance = distance;
245        self
246    }
247}
248
249impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
250    /// State provider for latest state
251    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
252        trace!(target: "providers::db", "Returning latest state provider");
253        Box::new(LatestStateProviderRef::new(self))
254    }
255
256    /// Storage provider for state at that given block hash
257    pub fn history_by_block_hash<'a>(
258        &'a self,
259        block_hash: BlockHash,
260    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
261        let mut block_number =
262            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
263        if block_number == self.best_block_number().unwrap_or_default() &&
264            block_number == self.last_block_number().unwrap_or_default()
265        {
266            return Ok(Box::new(LatestStateProviderRef::new(self)))
267        }
268
269        // +1 as the changeset that we want is the one that was applied after this block.
270        block_number += 1;
271
272        let account_history_prune_checkpoint =
273            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
274        let storage_history_prune_checkpoint =
275            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
276
277        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
278
279        // If we pruned account or storage history, we can't return state on every historical block.
280        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
281        if let Some(prune_checkpoint_block_number) =
282            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
283        {
284            state_provider = state_provider.with_lowest_available_account_history_block_number(
285                prune_checkpoint_block_number + 1,
286            );
287        }
288        if let Some(prune_checkpoint_block_number) =
289            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
290        {
291            state_provider = state_provider.with_lowest_available_storage_history_block_number(
292                prune_checkpoint_block_number + 1,
293            );
294        }
295
296        Ok(Box::new(state_provider))
297    }
298
299    #[cfg(feature = "test-utils")]
300    /// Sets the prune modes for provider.
301    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
302        self.prune_modes = prune_modes;
303    }
304}
305
306impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
307    type Primitives = N::Primitives;
308}
309
310impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
311    /// Returns a static file provider
312    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
313        self.static_file_provider.clone()
314    }
315
316    fn get_static_file_writer(
317        &self,
318        block: BlockNumber,
319        segment: StaticFileSegment,
320    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
321        self.static_file_provider.get_writer(block, segment)
322    }
323}
324
325impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
326    /// Returns the `RocksDB` provider.
327    fn rocksdb_provider(&self) -> RocksDBProvider {
328        self.rocksdb_provider.clone()
329    }
330
331    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
332        self.pending_rocksdb_batches.lock().push(batch);
333    }
334
335    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
336        let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
337        for batch in batches {
338            self.rocksdb_provider.commit_batch(batch)?;
339        }
340        Ok(())
341    }
342}
343
344impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
345    for DatabaseProvider<TX, N>
346{
347    type ChainSpec = N::ChainSpec;
348
349    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
350        self.chain_spec.clone()
351    }
352}
353
354impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
355    /// Creates a provider with an inner read-write transaction.
356    #[allow(clippy::too_many_arguments)]
357    fn new_rw_inner(
358        tx: TX,
359        chain_spec: Arc<N::ChainSpec>,
360        static_file_provider: StaticFileProvider<N::Primitives>,
361        prune_modes: PruneModes,
362        storage: Arc<N::Storage>,
363        storage_settings: Arc<RwLock<StorageSettings>>,
364        rocksdb_provider: RocksDBProvider,
365        changeset_cache: ChangesetCache,
366        runtime: reth_tasks::Runtime,
367        db_path: PathBuf,
368        commit_order: CommitOrder,
369    ) -> Self {
370        Self {
371            tx,
372            chain_spec,
373            static_file_provider,
374            prune_modes,
375            storage,
376            storage_settings,
377            rocksdb_provider,
378            changeset_cache,
379            runtime,
380            db_path,
381            pending_rocksdb_batches: Default::default(),
382            commit_order,
383            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
384            metrics: metrics::DatabaseProviderMetrics::default(),
385        }
386    }
387
388    /// Creates a provider with an inner read-write transaction using normal commit order.
389    #[allow(clippy::too_many_arguments)]
390    pub fn new_rw(
391        tx: TX,
392        chain_spec: Arc<N::ChainSpec>,
393        static_file_provider: StaticFileProvider<N::Primitives>,
394        prune_modes: PruneModes,
395        storage: Arc<N::Storage>,
396        storage_settings: Arc<RwLock<StorageSettings>>,
397        rocksdb_provider: RocksDBProvider,
398        changeset_cache: ChangesetCache,
399        runtime: reth_tasks::Runtime,
400        db_path: PathBuf,
401    ) -> Self {
402        Self::new_rw_inner(
403            tx,
404            chain_spec,
405            static_file_provider,
406            prune_modes,
407            storage,
408            storage_settings,
409            rocksdb_provider,
410            changeset_cache,
411            runtime,
412            db_path,
413            CommitOrder::Normal,
414        )
415    }
416
417    /// Creates a provider with an inner read-write transaction using unwind commit order.
418    #[allow(clippy::too_many_arguments)]
419    pub fn new_unwind_rw(
420        tx: TX,
421        chain_spec: Arc<N::ChainSpec>,
422        static_file_provider: StaticFileProvider<N::Primitives>,
423        prune_modes: PruneModes,
424        storage: Arc<N::Storage>,
425        storage_settings: Arc<RwLock<StorageSettings>>,
426        rocksdb_provider: RocksDBProvider,
427        changeset_cache: ChangesetCache,
428        runtime: reth_tasks::Runtime,
429        db_path: PathBuf,
430    ) -> Self {
431        Self::new_rw_inner(
432            tx,
433            chain_spec,
434            static_file_provider,
435            prune_modes,
436            storage,
437            storage_settings,
438            rocksdb_provider,
439            changeset_cache,
440            runtime,
441            db_path,
442            CommitOrder::Unwind,
443        )
444    }
445}
446
447impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
448    fn as_ref(&self) -> &Self {
449        self
450    }
451}
452
453impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
454    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
455    ///
456    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
457    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
458    where
459        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
460    {
461        let rocksdb = self.rocksdb_provider();
462        let rocksdb_batch = rocksdb.batch();
463
464        let (result, raw_batch) = f(rocksdb_batch)?;
465
466        if let Some(batch) = raw_batch {
467            self.set_pending_rocksdb_batch(batch);
468        }
469        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
470
471        Ok(result)
472    }
473
474    /// Creates the context for static file writes.
475    fn static_file_write_ctx(
476        &self,
477        save_mode: SaveBlocksMode,
478        first_block: BlockNumber,
479        last_block: BlockNumber,
480    ) -> ProviderResult<StaticFileWriteCtx> {
481        let tip = self.last_block_number()?.max(last_block);
482        Ok(StaticFileWriteCtx {
483            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
484                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
485            write_receipts: save_mode.with_state() &&
486                EitherWriter::receipts_destination(self).is_static_file(),
487            write_account_changesets: save_mode.with_state() &&
488                EitherWriterDestination::account_changesets(self).is_static_file(),
489            write_storage_changesets: save_mode.with_state() &&
490                EitherWriterDestination::storage_changesets(self).is_static_file(),
491            tip,
492            receipts_prune_mode: self.prune_modes.receipts,
493            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
494            receipts_prunable: self
495                .static_file_provider
496                .get_highest_static_file_tx(StaticFileSegment::Receipts)
497                .is_none() &&
498                PruneMode::Distance(self.minimum_pruning_distance)
499                    .should_prune(first_block, tip),
500        })
501    }
502
503    /// Creates the context for `RocksDB` writes.
504    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
505        RocksDBWriteCtx {
506            first_block_number: first_block,
507            prune_tx_lookup: self.prune_modes.transaction_lookup,
508            storage_settings: self.cached_storage_settings(),
509            pending_batches: self.pending_rocksdb_batches.clone(),
510        }
511    }
512
513    /// Writes executed blocks and state to storage.
514    ///
515    /// This method parallelizes static file (SF) writes with MDBX writes.
516    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
517    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
518    ///
519    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
520    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
521    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
522    pub fn save_blocks(
523        &self,
524        blocks: Vec<ExecutedBlock<N::Primitives>>,
525        save_mode: SaveBlocksMode,
526    ) -> ProviderResult<()> {
527        if blocks.is_empty() {
528            debug!(target: "providers::db", "Attempted to write empty block range");
529            return Ok(())
530        }
531
532        let total_start = Instant::now();
533        let block_count = blocks.len() as u64;
534        let first_number = blocks.first().unwrap().recovered_block().number();
535        let last_block_number = blocks.last().unwrap().recovered_block().number();
536
537        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
538
539        // Compute tx_nums upfront (both threads need these)
540        let first_tx_num = self
541            .tx
542            .cursor_read::<tables::TransactionBlocks>()?
543            .last()?
544            .map(|(n, _)| n + 1)
545            .unwrap_or_default();
546
547        let tx_nums: Vec<TxNumber> = {
548            let mut nums = Vec::with_capacity(blocks.len());
549            let mut current = first_tx_num;
550            for block in &blocks {
551                nums.push(current);
552                current += block.recovered_block().body().transaction_count() as u64;
553            }
554            nums
555        };
556
557        let mut timings =
558            metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
559
560        // avoid capturing &self.tx in scope below.
561        let sf_provider = &self.static_file_provider;
562        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
563        let rocksdb_provider = self.rocksdb_provider.clone();
564        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
565        let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
566
567        let mut sf_result = None;
568        let mut rocksdb_result = None;
569
570        // Write to all backends in parallel.
571        let runtime = &self.runtime;
572        // Propagate tracing context into rayon-spawned threads so that static file
573        // and RocksDB write spans appear as children of save_blocks in traces.
574        let span = tracing::Span::current();
575        runtime.storage_pool().in_place_scope(|s| {
576            // SF writes
577            s.spawn(|_| {
578                let _guard = span.enter();
579                let start = Instant::now();
580                sf_result = Some(
581                    sf_provider
582                        .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
583                        .map(|()| start.elapsed()),
584                );
585            });
586
587            // RocksDB writes
588            if rocksdb_enabled {
589                s.spawn(|_| {
590                    let _guard = span.enter();
591                    let start = Instant::now();
592                    rocksdb_result = Some(
593                        rocksdb_provider
594                            .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
595                            .map(|()| start.elapsed()),
596                    );
597                });
598            }
599
600            // MDBX writes
601            let mdbx_start = Instant::now();
602
603            // Collect all transaction hashes across all blocks, sort them, and write in batch
604            if !self.cached_storage_settings().storage_v2 &&
605                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
606            {
607                let start = Instant::now();
608                let total_tx_count: usize =
609                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
610                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
611                for (i, block) in blocks.iter().enumerate() {
612                    let recovered_block = block.recovered_block();
613                    for (tx_num, transaction) in
614                        (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
615                    {
616                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
617                    }
618                }
619
620                // Sort by hash for optimal MDBX insertion performance
621                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
622
623                // Write all transaction hash numbers in a single batch
624                self.with_rocksdb_batch(|batch| {
625                    let mut tx_hash_writer =
626                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
627                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
628                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
629                    Ok(((), raw_batch))
630                })?;
631                self.metrics.record_duration(
632                    metrics::Action::InsertTransactionHashNumbers,
633                    start.elapsed(),
634                );
635            }
636
637            for (i, block) in blocks.iter().enumerate() {
638                let recovered_block = block.recovered_block();
639
640                let start = Instant::now();
641                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
642                timings.insert_block += start.elapsed();
643
644                if save_mode.with_state() {
645                    let execution_output = block.execution_outcome();
646
647                    // Write state and changesets to the database.
648                    // Must be written after blocks because of the receipt lookup.
649                    // Skip receipts/account changesets if they're being written to static files.
650                    let start = Instant::now();
651                    self.write_state(
652                        WriteStateInput::Single {
653                            outcome: execution_output,
654                            block: recovered_block.number(),
655                        },
656                        OriginalValuesKnown::No,
657                        StateWriteConfig {
658                            write_receipts: !sf_ctx.write_receipts,
659                            write_account_changesets: !sf_ctx.write_account_changesets,
660                            write_storage_changesets: !sf_ctx.write_storage_changesets,
661                        },
662                    )?;
663                    timings.write_state += start.elapsed();
664                }
665            }
666
667            // Write all hashed state and trie updates in single batches.
668            // This reduces cursor open/close overhead from N calls to 1.
669            if save_mode.with_state() {
670                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
671                let start = Instant::now();
672                let merged_hashed_state = HashedPostStateSorted::merge_batch(
673                    blocks.iter().rev().map(|b| b.trie_data().hashed_state),
674                );
675                if !merged_hashed_state.is_empty() {
676                    self.write_hashed_state(&merged_hashed_state)?;
677                }
678                timings.write_hashed_state += start.elapsed();
679
680                let start = Instant::now();
681                let merged_trie =
682                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
683                if !merged_trie.is_empty() {
684                    self.write_trie_updates_sorted(&merged_trie)?;
685                }
686                timings.write_trie_updates += start.elapsed();
687            }
688
689            // Full mode: update history indices
690            if save_mode.with_state() {
691                let start = Instant::now();
692                self.update_history_indices(first_number..=last_block_number)?;
693                timings.update_history_indices = start.elapsed();
694            }
695
696            // Update pipeline progress
697            let start = Instant::now();
698            self.update_pipeline_stages(last_block_number, false)?;
699            timings.update_pipeline_stages = start.elapsed();
700
701            timings.mdbx = mdbx_start.elapsed();
702
703            Ok::<_, ProviderError>(())
704        })?;
705
706        // Collect results from spawned tasks
707        timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
708
709        if rocksdb_enabled {
710            timings.rocksdb = rocksdb_result.ok_or_else(|| {
711                ProviderError::Database(reth_db_api::DatabaseError::Other(
712                    "RocksDB thread panicked".into(),
713                ))
714            })??;
715        }
716
717        timings.total = total_start.elapsed();
718
719        self.metrics.record_save_blocks(&timings);
720        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
721
722        Ok(())
723    }
724
725    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
726    ///
727    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
728    #[instrument(level = "debug", target = "providers::db", skip_all)]
729    fn insert_block_mdbx_only(
730        &self,
731        block: &RecoveredBlock<BlockTy<N>>,
732        first_tx_num: TxNumber,
733    ) -> ProviderResult<StoredBlockBodyIndices> {
734        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
735            EitherWriterDestination::senders(self).is_database()
736        {
737            let start = Instant::now();
738            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
739            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
740            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
741                cursor.append(tx_num, &sender)?;
742            }
743            self.metrics
744                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
745        }
746
747        let block_number = block.number();
748        let tx_count = block.body().transaction_count() as u64;
749
750        let start = Instant::now();
751        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
752        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
753
754        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
755
756        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
757    }
758
759    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
760    /// `Ommers`/`Withdrawals`).
761    fn write_block_body_indices(
762        &self,
763        block_number: BlockNumber,
764        body: &BodyTy<N>,
765        first_tx_num: TxNumber,
766        tx_count: u64,
767    ) -> ProviderResult<()> {
768        // MDBX: BlockBodyIndices
769        let start = Instant::now();
770        self.tx
771            .cursor_write::<tables::BlockBodyIndices>()?
772            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
773        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
774
775        // MDBX: TransactionBlocks (last tx -> block mapping)
776        if tx_count > 0 {
777            let start = Instant::now();
778            self.tx
779                .cursor_write::<tables::TransactionBlocks>()?
780                .append(first_tx_num + tx_count - 1, &block_number)?;
781            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
782        }
783
784        // MDBX: Ommers/Withdrawals
785        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
786
787        Ok(())
788    }
789
790    /// Unwinds trie state starting at and including the given block.
791    ///
792    /// This includes calculating the resulted state root and comparing it with the parent block
793    /// state root.
794    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
795        let changed_accounts = self.account_changesets_range(from..)?;
796
797        // Unwind account hashes.
798        self.unwind_account_hashing(changed_accounts.iter())?;
799
800        // Unwind account history indices.
801        self.unwind_account_history_indices(changed_accounts.iter())?;
802
803        let changed_storages = self.storage_changesets_range(from..)?;
804
805        // Unwind storage hashes.
806        self.unwind_storage_hashing(changed_storages.iter().copied())?;
807
808        // Unwind storage history indices.
809        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
810
811        // Unwind accounts/storages trie tables using the revert.
812        // Get the database tip block number
813        let db_tip_block = self
814            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
815            .as_ref()
816            .map(|chk| chk.block_number)
817            .ok_or_else(|| ProviderError::InsufficientChangesets {
818                requested: from,
819                available: 0..=0,
820            })?;
821
822        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
823        self.write_trie_updates_sorted(&trie_revert)?;
824
825        Ok(())
826    }
827
828    /// Removes receipts from all transactions starting with provided number (inclusive).
829    fn remove_receipts_from(
830        &self,
831        from_tx: TxNumber,
832        last_block: BlockNumber,
833    ) -> ProviderResult<()> {
834        // iterate over block body and remove receipts
835        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
836
837        if EitherWriter::receipts_destination(self).is_static_file() {
838            let static_file_receipt_num =
839                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
840
841            let to_delete = static_file_receipt_num
842                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
843                .unwrap_or_default();
844
845            self.static_file_provider
846                .latest_writer(StaticFileSegment::Receipts)?
847                .prune_receipts(to_delete, last_block)?;
848        }
849
850        Ok(())
851    }
852
853    /// Writes bytecodes to MDBX.
854    fn write_bytecodes(
855        &self,
856        bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
857    ) -> ProviderResult<()> {
858        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
859        for (hash, bytecode) in bytecodes {
860            bytecodes_cursor.upsert(hash, &bytecode)?;
861        }
862        Ok(())
863    }
864}
865
866impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
867    fn try_into_history_at_block(
868        self,
869        mut block_number: BlockNumber,
870    ) -> ProviderResult<StateProviderBox> {
871        let best_block = self.best_block_number().unwrap_or_default();
872
873        // Reject requests for blocks beyond the best block
874        if block_number > best_block {
875            return Err(ProviderError::BlockNotExecuted {
876                requested: block_number,
877                executed: best_block,
878            });
879        }
880
881        // If requesting state at the best block, use the latest state provider
882        if block_number == best_block {
883            return Ok(Box::new(LatestStateProvider::new(self)));
884        }
885
886        // +1 as the changeset that we want is the one that was applied after this block.
887        block_number += 1;
888
889        let account_history_prune_checkpoint =
890            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
891        let storage_history_prune_checkpoint =
892            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
893
894        let mut state_provider = HistoricalStateProvider::new(self, block_number);
895
896        // If we pruned account or storage history, we can't return state on every historical block.
897        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
898        if let Some(prune_checkpoint_block_number) =
899            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
900        {
901            state_provider = state_provider.with_lowest_available_account_history_block_number(
902                prune_checkpoint_block_number + 1,
903            );
904        }
905        if let Some(prune_checkpoint_block_number) =
906            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
907        {
908            state_provider = state_provider.with_lowest_available_storage_history_block_number(
909                prune_checkpoint_block_number + 1,
910            );
911        }
912
913        Ok(Box::new(state_provider))
914    }
915}
916
917/// For a given key, unwind all history shards that contain block numbers at or above the given
918/// block number.
919///
920/// S - Sharded key subtype.
921/// T - Table to walk over.
922/// C - Cursor implementation.
923///
924/// This function walks the entries from the given start key and deletes all shards that belong to
925/// the key and contain block numbers at or above the given block number. Shards entirely below
926/// the block number are preserved.
927///
928/// The boundary shard (the shard that spans across the block number) is removed from the database.
929/// Any indices that are below the block number are filtered out and returned for reinsertion.
930/// The boundary shard is returned for reinsertion (if it's not empty).
931fn unwind_history_shards<S, T, C>(
932    cursor: &mut C,
933    start_key: T::Key,
934    block_number: BlockNumber,
935    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
936) -> ProviderResult<Vec<u64>>
937where
938    T: Table<Value = BlockNumberList>,
939    T::Key: AsRef<ShardedKey<S>>,
940    C: DbCursorRO<T> + DbCursorRW<T>,
941{
942    // Start from the given key and iterate through shards
943    let mut item = cursor.seek_exact(start_key)?;
944    while let Some((sharded_key, list)) = item {
945        // If the shard does not belong to the key, break.
946        if !shard_belongs_to_key(&sharded_key) {
947            break
948        }
949
950        // Always delete the current shard from the database first
951        // We'll decide later what (if anything) to reinsert
952        cursor.delete_current()?;
953
954        // Get the first (lowest) block number in this shard
955        // All block numbers in a shard are sorted in ascending order
956        let first = list.iter().next().expect("List can't be empty");
957
958        // Case 1: Entire shard is at or above the unwinding point
959        // Keep it deleted (don't return anything for reinsertion)
960        if first >= block_number {
961            item = cursor.prev()?;
962            continue
963        }
964        // Case 2: This is a boundary shard (spans across the unwinding point)
965        // The shard contains some blocks below and some at/above the unwinding point
966        else if block_number <= sharded_key.as_ref().highest_block_number {
967            // Return only the block numbers that are below the unwinding point
968            // These will be reinserted to preserve the historical data
969            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
970        }
971        // Case 3: Entire shard is below the unwinding point
972        // Return all block numbers for reinsertion (preserve entire shard)
973        return Ok(list.iter().collect::<Vec<_>>())
974    }
975
976    // No shards found or all processed
977    Ok(Vec::new())
978}
979
980impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
981    /// Creates a provider with an inner read-only transaction.
982    #[allow(clippy::too_many_arguments)]
983    pub fn new(
984        tx: TX,
985        chain_spec: Arc<N::ChainSpec>,
986        static_file_provider: StaticFileProvider<N::Primitives>,
987        prune_modes: PruneModes,
988        storage: Arc<N::Storage>,
989        storage_settings: Arc<RwLock<StorageSettings>>,
990        rocksdb_provider: RocksDBProvider,
991        changeset_cache: ChangesetCache,
992        runtime: reth_tasks::Runtime,
993        db_path: PathBuf,
994    ) -> Self {
995        Self {
996            tx,
997            chain_spec,
998            static_file_provider,
999            prune_modes,
1000            storage,
1001            storage_settings,
1002            rocksdb_provider,
1003            changeset_cache,
1004            runtime,
1005            db_path,
1006            pending_rocksdb_batches: Default::default(),
1007            commit_order: CommitOrder::Normal,
1008            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1009            metrics: metrics::DatabaseProviderMetrics::default(),
1010        }
1011    }
1012
1013    /// Consume `DbTx` or `DbTxMut`.
1014    pub fn into_tx(self) -> TX {
1015        self.tx
1016    }
1017
1018    /// Pass `DbTx` or `DbTxMut` mutable reference.
1019    pub const fn tx_mut(&mut self) -> &mut TX {
1020        &mut self.tx
1021    }
1022
1023    /// Pass `DbTx` or `DbTxMut` immutable reference.
1024    pub const fn tx_ref(&self) -> &TX {
1025        &self.tx
1026    }
1027
1028    /// Returns a reference to the chain specification.
1029    pub fn chain_spec(&self) -> &N::ChainSpec {
1030        &self.chain_spec
1031    }
1032}
1033
1034impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1035    fn recovered_block<H, HF, B, BF>(
1036        &self,
1037        id: BlockHashOrNumber,
1038        _transaction_kind: TransactionVariant,
1039        header_by_number: HF,
1040        construct_block: BF,
1041    ) -> ProviderResult<Option<B>>
1042    where
1043        H: AsRef<HeaderTy<N>>,
1044        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1045        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1046    {
1047        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1048        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1049
1050        // Get the block body
1051        //
1052        // If the body indices are not found, this means that the transactions either do not exist
1053        // in the database yet, or they do exit but are not indexed. If they exist but are not
1054        // indexed, we don't have enough information to return the block anyways, so we return
1055        // `None`.
1056        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1057
1058        let tx_range = body.tx_num_range();
1059
1060        let transactions = if tx_range.is_empty() {
1061            vec![]
1062        } else {
1063            self.transactions_by_tx_range(tx_range.clone())?
1064        };
1065
1066        let body = self
1067            .storage
1068            .reader()
1069            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1070            .pop()
1071            .ok_or(ProviderError::InvalidStorageOutput)?;
1072
1073        let senders = if tx_range.is_empty() {
1074            vec![]
1075        } else {
1076            let known_senders: HashMap<TxNumber, Address> =
1077                EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1078
1079            let mut senders = Vec::with_capacity(body.transactions().len());
1080            for (tx_num, tx) in tx_range.zip(body.transactions()) {
1081                match known_senders.get(&tx_num) {
1082                    None => {
1083                        let sender = tx.recover_signer_unchecked()?;
1084                        senders.push(sender);
1085                    }
1086                    Some(sender) => senders.push(*sender),
1087                }
1088            }
1089            senders
1090        };
1091
1092        construct_block(header, body, senders)
1093    }
1094
1095    /// Returns a range of blocks from the database.
1096    ///
1097    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1098    /// construct blocks from the following inputs:
1099    ///     – Header
1100    ///     - Range of transaction numbers
1101    ///     – Ommers
1102    ///     – Withdrawals
1103    ///     – Senders
1104    fn block_range<F, H, HF, R>(
1105        &self,
1106        range: RangeInclusive<BlockNumber>,
1107        headers_range: HF,
1108        mut assemble_block: F,
1109    ) -> ProviderResult<Vec<R>>
1110    where
1111        H: AsRef<HeaderTy<N>>,
1112        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1113        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1114    {
1115        if range.is_empty() {
1116            return Ok(Vec::new())
1117        }
1118
1119        let len = range.end().saturating_sub(*range.start()) as usize;
1120        let mut blocks = Vec::with_capacity(len);
1121
1122        let headers = headers_range(range.clone())?;
1123
1124        // If the body indices are not found, this means that the transactions either do
1125        // not exist in the database yet, or they do exit but are
1126        // not indexed. If they exist but are not indexed, we don't
1127        // have enough information to return the block anyways, so
1128        // we skip the block.
1129        let present_headers = self
1130            .block_body_indices_range(range)?
1131            .into_iter()
1132            .map(|b| b.tx_num_range())
1133            .zip(headers)
1134            .collect::<Vec<_>>();
1135
1136        let mut inputs = Vec::with_capacity(present_headers.len());
1137        for (tx_range, header) in &present_headers {
1138            let transactions = if tx_range.is_empty() {
1139                Vec::new()
1140            } else {
1141                self.transactions_by_tx_range(tx_range.clone())?
1142            };
1143
1144            inputs.push((header.as_ref(), transactions));
1145        }
1146
1147        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1148
1149        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1150            blocks.push(assemble_block(header, body, tx_range)?);
1151        }
1152
1153        Ok(blocks)
1154    }
1155
1156    /// Returns a range of blocks from the database, along with the senders of each
1157    /// transaction in the blocks.
1158    ///
1159    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1160    /// construct blocks from the following inputs:
1161    ///     – Header
1162    ///     - Transactions
1163    ///     – Ommers
1164    ///     – Withdrawals
1165    ///     – Senders
1166    fn block_with_senders_range<H, HF, B, BF>(
1167        &self,
1168        range: RangeInclusive<BlockNumber>,
1169        headers_range: HF,
1170        assemble_block: BF,
1171    ) -> ProviderResult<Vec<B>>
1172    where
1173        H: AsRef<HeaderTy<N>>,
1174        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1175        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1176    {
1177        self.block_range(range, headers_range, |header, body, tx_range| {
1178            let senders = if tx_range.is_empty() {
1179                Vec::new()
1180            } else {
1181                let known_senders: HashMap<TxNumber, Address> =
1182                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1183
1184                let mut senders = Vec::with_capacity(body.transactions().len());
1185                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1186                    match known_senders.get(&tx_num) {
1187                        None => {
1188                            // recover the sender from the transaction if not found
1189                            let sender = tx.recover_signer_unchecked()?;
1190                            senders.push(sender);
1191                        }
1192                        Some(sender) => senders.push(*sender),
1193                    }
1194                }
1195
1196                senders
1197            };
1198
1199            assemble_block(header, body, senders)
1200        })
1201    }
1202
1203    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1204    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
1205    /// account changesets.
1206    fn populate_bundle_state<A, S>(
1207        &self,
1208        account_changeset: Vec<(u64, AccountBeforeTx)>,
1209        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1210        plain_accounts_cursor: &mut A,
1211        plain_storage_cursor: &mut S,
1212    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1213    where
1214        A: DbCursorRO<PlainAccountState>,
1215        S: DbDupCursorRO<PlainStorageState>,
1216    {
1217        // iterate previous value and get plain state value to create changeset
1218        // Double option around Account represent if Account state is know (first option) and
1219        // account is removed (Second Option)
1220        let mut state: BundleStateInit = HashMap::default();
1221
1222        // This is not working for blocks that are not at tip. as plain state is not the last
1223        // state of end range. We should rename the functions or add support to access
1224        // History state. Accessing history state can be tricky but we are not gaining
1225        // anything.
1226
1227        let mut reverts: RevertsInit = HashMap::default();
1228
1229        // add account changeset changes
1230        for (block_number, account_before) in account_changeset.into_iter().rev() {
1231            let AccountBeforeTx { info: old_info, address } = account_before;
1232            match state.entry(address) {
1233                hash_map::Entry::Vacant(entry) => {
1234                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1235                    entry.insert((old_info, new_info, HashMap::default()));
1236                }
1237                hash_map::Entry::Occupied(mut entry) => {
1238                    // overwrite old account state.
1239                    entry.get_mut().0 = old_info;
1240                }
1241            }
1242            // insert old info into reverts.
1243            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1244        }
1245
1246        // add storage changeset changes
1247        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1248            let BlockNumberAddress((block_number, address)) = block_and_address;
1249            // get account state or insert from plain state.
1250            let account_state = match state.entry(address) {
1251                hash_map::Entry::Vacant(entry) => {
1252                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1253                    entry.insert((present_info, present_info, HashMap::default()))
1254                }
1255                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1256            };
1257
1258            // match storage.
1259            match account_state.2.entry(old_storage.key) {
1260                hash_map::Entry::Vacant(entry) => {
1261                    let new_storage = plain_storage_cursor
1262                        .seek_by_key_subkey(address, old_storage.key)?
1263                        .filter(|storage| storage.key == old_storage.key)
1264                        .unwrap_or_default();
1265                    entry.insert((old_storage.value, new_storage.value));
1266                }
1267                hash_map::Entry::Occupied(mut entry) => {
1268                    entry.get_mut().0 = old_storage.value;
1269                }
1270            };
1271
1272            reverts
1273                .entry(block_number)
1274                .or_default()
1275                .entry(address)
1276                .or_default()
1277                .1
1278                .push(old_storage);
1279        }
1280
1281        Ok((state, reverts))
1282    }
1283
1284    /// Like [`populate_bundle_state`](Self::populate_bundle_state), but reads current values from
1285    /// `HashedAccounts`/`HashedStorages`. Addresses and storage keys are hashed via `keccak256`
1286    /// for DB lookups. The output `BundleStateInit`/`RevertsInit` structures remain keyed by
1287    /// plain address and plain storage key.
1288    fn populate_bundle_state_hashed(
1289        &self,
1290        account_changeset: Vec<(u64, AccountBeforeTx)>,
1291        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1292        hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1293        hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1294    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1295        let mut state: BundleStateInit = HashMap::default();
1296        let mut reverts: RevertsInit = HashMap::default();
1297
1298        // add account changeset changes
1299        for (block_number, account_before) in account_changeset.into_iter().rev() {
1300            let AccountBeforeTx { info: old_info, address } = account_before;
1301            match state.entry(address) {
1302                hash_map::Entry::Vacant(entry) => {
1303                    let hashed_address = keccak256(address);
1304                    let new_info =
1305                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1306                    entry.insert((old_info, new_info, HashMap::default()));
1307                }
1308                hash_map::Entry::Occupied(mut entry) => {
1309                    entry.get_mut().0 = old_info;
1310                }
1311            }
1312            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1313        }
1314
1315        // add storage changeset changes
1316        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1317            let BlockNumberAddress((block_number, address)) = block_and_address;
1318            let account_state = match state.entry(address) {
1319                hash_map::Entry::Vacant(entry) => {
1320                    let hashed_address = keccak256(address);
1321                    let present_info =
1322                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1323                    entry.insert((present_info, present_info, HashMap::default()))
1324                }
1325                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1326            };
1327
1328            // Storage keys in changesets are plain; hash them for HashedStorages lookup.
1329            let hashed_storage_key = keccak256(old_storage.key);
1330            match account_state.2.entry(old_storage.key) {
1331                hash_map::Entry::Vacant(entry) => {
1332                    let hashed_address = keccak256(address);
1333                    let new_storage = hashed_storage_cursor
1334                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
1335                        .filter(|storage| storage.key == hashed_storage_key)
1336                        .unwrap_or_default();
1337                    entry.insert((old_storage.value, new_storage.value));
1338                }
1339                hash_map::Entry::Occupied(mut entry) => {
1340                    entry.get_mut().0 = old_storage.value;
1341                }
1342            };
1343
1344            reverts
1345                .entry(block_number)
1346                .or_default()
1347                .entry(address)
1348                .or_default()
1349                .1
1350                .push(old_storage);
1351        }
1352
1353        Ok((state, reverts))
1354    }
1355}
1356
1357impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1358    /// Insert history index to the database.
1359    ///
1360    /// For each updated partial key, this function retrieves the last shard from the database
1361    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1362    /// the shards back into the database.
1363    ///
1364    /// This function is used by history indexing stages.
1365    fn append_history_index<P, T>(
1366        &self,
1367        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1368        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1369    ) -> ProviderResult<()>
1370    where
1371        P: Copy,
1372        T: Table<Value = BlockNumberList>,
1373    {
1374        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1375        // will append duplicate entries instead of updating existing ones, causing data corruption.
1376        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1377
1378        let mut cursor = self.tx.cursor_write::<T>()?;
1379
1380        for (partial_key, indices) in index_updates {
1381            let last_key = sharded_key_factory(partial_key, u64::MAX);
1382            let mut last_shard = cursor
1383                .seek_exact(last_key.clone())?
1384                .map(|(_, list)| list)
1385                .unwrap_or_else(BlockNumberList::empty);
1386
1387            last_shard.append(indices).map_err(ProviderError::other)?;
1388
1389            // fast path: all indices fit in one shard
1390            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1391                cursor.upsert(last_key, &last_shard)?;
1392                continue;
1393            }
1394
1395            // slow path: rechunk into multiple shards
1396            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1397            let mut chunks_peekable = chunks.into_iter().peekable();
1398
1399            while let Some(chunk) = chunks_peekable.next() {
1400                let shard = BlockNumberList::new_pre_sorted(chunk);
1401                let highest_block_number = if chunks_peekable.peek().is_some() {
1402                    shard.iter().next_back().expect("`chunks` does not return empty list")
1403                } else {
1404                    // Insert last list with `u64::MAX`.
1405                    u64::MAX
1406                };
1407
1408                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1409            }
1410        }
1411
1412        Ok(())
1413    }
1414}
1415
1416impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1417    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1418        if self.cached_storage_settings().use_hashed_state() {
1419            let hashed_address = keccak256(address);
1420            Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1421        } else {
1422            Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1423        }
1424    }
1425}
1426
1427impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1428    fn changed_accounts_with_range(
1429        &self,
1430        range: RangeInclusive<BlockNumber>,
1431    ) -> ProviderResult<BTreeSet<Address>> {
1432        let mut reader = EitherReader::new_account_changesets(self)?;
1433
1434        reader.changed_accounts_with_range(range)
1435    }
1436
1437    fn basic_accounts(
1438        &self,
1439        iter: impl IntoIterator<Item = Address>,
1440    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1441        if self.cached_storage_settings().use_hashed_state() {
1442            let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1443            Ok(iter
1444                .into_iter()
1445                .map(|address| {
1446                    let hashed_address = keccak256(address);
1447                    hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1448                })
1449                .collect::<Result<Vec<_>, _>>()?)
1450        } else {
1451            let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1452            Ok(iter
1453                .into_iter()
1454                .map(|address| {
1455                    plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1456                })
1457                .collect::<Result<Vec<_>, _>>()?)
1458        }
1459    }
1460
1461    fn changed_accounts_and_blocks_with_range(
1462        &self,
1463        range: RangeInclusive<BlockNumber>,
1464    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1465        let highest_static_block = self
1466            .static_file_provider
1467            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1468
1469        if let Some(highest) = highest_static_block &&
1470            self.cached_storage_settings().storage_v2
1471        {
1472            let start = *range.start();
1473            let static_end = (*range.end()).min(highest);
1474
1475            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1476            if start <= static_end {
1477                for block in start..=static_end {
1478                    let block_changesets = self.account_block_changeset(block)?;
1479                    for changeset in block_changesets {
1480                        changed_accounts_and_blocks
1481                            .entry(changeset.address)
1482                            .or_default()
1483                            .push(block);
1484                    }
1485                }
1486            }
1487
1488            Ok(changed_accounts_and_blocks)
1489        } else {
1490            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1491
1492            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1493                BTreeMap::new(),
1494                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1495                    let (index, account) = entry?;
1496                    accounts.entry(account.address).or_default().push(index);
1497                    Ok(accounts)
1498                },
1499            )?;
1500
1501            Ok(account_transitions)
1502        }
1503    }
1504}
1505
1506impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1507    fn storage_changeset(
1508        &self,
1509        block_number: BlockNumber,
1510    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1511        if self.cached_storage_settings().storage_v2 {
1512            self.static_file_provider.storage_changeset(block_number)
1513        } else {
1514            let range = block_number..=block_number;
1515            let storage_range = BlockNumberAddress::range(range);
1516            self.tx
1517                .cursor_dup_read::<tables::StorageChangeSets>()?
1518                .walk_range(storage_range)?
1519                .map(|r| {
1520                    let (bna, entry) = r?;
1521                    Ok((bna, entry))
1522                })
1523                .collect()
1524        }
1525    }
1526
1527    fn get_storage_before_block(
1528        &self,
1529        block_number: BlockNumber,
1530        address: Address,
1531        storage_key: B256,
1532    ) -> ProviderResult<Option<StorageEntry>> {
1533        if self.cached_storage_settings().storage_v2 {
1534            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1535        } else {
1536            Ok(self
1537                .tx
1538                .cursor_dup_read::<tables::StorageChangeSets>()?
1539                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1540                .filter(|entry| entry.key == storage_key))
1541        }
1542    }
1543
1544    fn storage_changesets_range(
1545        &self,
1546        range: impl RangeBounds<BlockNumber>,
1547    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1548        if self.cached_storage_settings().storage_v2 {
1549            self.static_file_provider.storage_changesets_range(range)
1550        } else {
1551            self.tx
1552                .cursor_dup_read::<tables::StorageChangeSets>()?
1553                .walk_range(BlockNumberAddressRange::from(range))?
1554                .map(|r| {
1555                    let (bna, entry) = r?;
1556                    Ok((bna, entry))
1557                })
1558                .collect()
1559        }
1560    }
1561
1562    fn storage_changeset_count(&self) -> ProviderResult<usize> {
1563        if self.cached_storage_settings().storage_v2 {
1564            self.static_file_provider.storage_changeset_count()
1565        } else {
1566            Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1567        }
1568    }
1569}
1570
1571impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1572    fn account_block_changeset(
1573        &self,
1574        block_number: BlockNumber,
1575    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1576        if self.cached_storage_settings().storage_v2 {
1577            let static_changesets =
1578                self.static_file_provider.account_block_changeset(block_number)?;
1579            Ok(static_changesets)
1580        } else {
1581            let range = block_number..=block_number;
1582            self.tx
1583                .cursor_read::<tables::AccountChangeSets>()?
1584                .walk_range(range)?
1585                .map(|result| -> ProviderResult<_> {
1586                    let (_, account_before) = result?;
1587                    Ok(account_before)
1588                })
1589                .collect()
1590        }
1591    }
1592
1593    fn get_account_before_block(
1594        &self,
1595        block_number: BlockNumber,
1596        address: Address,
1597    ) -> ProviderResult<Option<AccountBeforeTx>> {
1598        if self.cached_storage_settings().storage_v2 {
1599            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1600        } else {
1601            self.tx
1602                .cursor_dup_read::<tables::AccountChangeSets>()?
1603                .seek_by_key_subkey(block_number, address)?
1604                .filter(|acc| acc.address == address)
1605                .map(Ok)
1606                .transpose()
1607        }
1608    }
1609
1610    fn account_changesets_range(
1611        &self,
1612        range: impl core::ops::RangeBounds<BlockNumber>,
1613    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1614        if self.cached_storage_settings().storage_v2 {
1615            self.static_file_provider.account_changesets_range(range)
1616        } else {
1617            self.tx
1618                .cursor_read::<tables::AccountChangeSets>()?
1619                .walk_range(to_range(range))?
1620                .map(|r| r.map_err(Into::into))
1621                .collect()
1622        }
1623    }
1624
1625    fn account_changeset_count(&self) -> ProviderResult<usize> {
1626        // check if account changesets are in static files, otherwise just count the changeset
1627        // entries in the DB
1628        if self.cached_storage_settings().storage_v2 {
1629            self.static_file_provider.account_changeset_count()
1630        } else {
1631            Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1632        }
1633    }
1634}
1635
1636impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1637    for DatabaseProvider<TX, N>
1638{
1639    type Header = HeaderTy<N>;
1640
1641    fn local_tip_header(
1642        &self,
1643        highest_uninterrupted_block: BlockNumber,
1644    ) -> ProviderResult<SealedHeader<Self::Header>> {
1645        let static_file_provider = self.static_file_provider();
1646
1647        // Make sure Headers static file is at the same height. If it's further, this
1648        // input execution was interrupted previously and we need to unwind the static file.
1649        let next_static_file_block_num = static_file_provider
1650            .get_highest_static_file_block(StaticFileSegment::Headers)
1651            .map(|id| id + 1)
1652            .unwrap_or_default();
1653        let next_block = highest_uninterrupted_block + 1;
1654
1655        match next_static_file_block_num.cmp(&next_block) {
1656            // The node shutdown between an executed static file commit and before the database
1657            // commit, so we need to unwind the static files.
1658            Ordering::Greater => {
1659                let mut static_file_producer =
1660                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1661                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1662                // Since this is a database <-> static file inconsistency, we commit the change
1663                // straight away.
1664                static_file_producer.commit()?
1665            }
1666            Ordering::Less => {
1667                // There's either missing or corrupted files.
1668                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1669            }
1670            Ordering::Equal => {}
1671        }
1672
1673        let local_head = static_file_provider
1674            .sealed_header(highest_uninterrupted_block)?
1675            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1676
1677        Ok(local_head)
1678    }
1679}
1680
1681impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1682    type Header = HeaderTy<N>;
1683
1684    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1685        if let Some(num) = self.block_number(block_hash)? {
1686            Ok(self.header_by_number(num)?)
1687        } else {
1688            Ok(None)
1689        }
1690    }
1691
1692    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1693        self.static_file_provider.header_by_number(num)
1694    }
1695
1696    fn headers_range(
1697        &self,
1698        range: impl RangeBounds<BlockNumber>,
1699    ) -> ProviderResult<Vec<Self::Header>> {
1700        self.static_file_provider.headers_range(range)
1701    }
1702
1703    fn sealed_header(
1704        &self,
1705        number: BlockNumber,
1706    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1707        self.static_file_provider.sealed_header(number)
1708    }
1709
1710    fn sealed_headers_while(
1711        &self,
1712        range: impl RangeBounds<BlockNumber>,
1713        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1714    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1715        self.static_file_provider.sealed_headers_while(range, predicate)
1716    }
1717}
1718
1719impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1720    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1721        self.static_file_provider.block_hash(number)
1722    }
1723
1724    fn canonical_hashes_range(
1725        &self,
1726        start: BlockNumber,
1727        end: BlockNumber,
1728    ) -> ProviderResult<Vec<B256>> {
1729        self.static_file_provider.canonical_hashes_range(start, end)
1730    }
1731}
1732
1733impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1734    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1735        let best_number = self.best_block_number()?;
1736        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1737        Ok(ChainInfo { best_hash, best_number })
1738    }
1739
1740    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1741        // The best block number is tracked via the finished stage which gets updated in the same tx
1742        // when new blocks committed
1743        Ok(self
1744            .get_stage_checkpoint(StageId::Finish)?
1745            .map(|checkpoint| checkpoint.block_number)
1746            .unwrap_or_default())
1747    }
1748
1749    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1750        self.static_file_provider.last_block_number()
1751    }
1752
1753    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1754        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1755    }
1756}
1757
1758impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1759    type Block = BlockTy<N>;
1760
1761    fn find_block_by_hash(
1762        &self,
1763        hash: B256,
1764        source: BlockSource,
1765    ) -> ProviderResult<Option<Self::Block>> {
1766        if source.is_canonical() {
1767            self.block(hash.into())
1768        } else {
1769            Ok(None)
1770        }
1771    }
1772
1773    /// Returns the block with matching number from database.
1774    ///
1775    /// If the header for this block is not found, this returns `None`.
1776    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1777    /// will return None.
1778    ///
1779    /// Returns an error if the requested block is below the earliest available history.
1780    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1781        if let Some(number) = self.convert_hash_or_number(id)? {
1782            let earliest_available = self.static_file_provider.earliest_history_height();
1783            if number < earliest_available {
1784                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1785            }
1786
1787            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1788
1789            // If the body indices are not found, this means that the transactions either do not
1790            // exist in the database yet, or they do exit but are not indexed.
1791            // If they exist but are not indexed, we don't have enough
1792            // information to return the block anyways, so we return `None`.
1793            let Some(transactions) = self.transactions_by_block(number.into())? else {
1794                return Ok(None)
1795            };
1796
1797            let body = self
1798                .storage
1799                .reader()
1800                .read_block_bodies(self, vec![(&header, transactions)])?
1801                .pop()
1802                .ok_or(ProviderError::InvalidStorageOutput)?;
1803
1804            return Ok(Some(Self::Block::new(header, body)))
1805        }
1806
1807        Ok(None)
1808    }
1809
1810    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1811        Ok(None)
1812    }
1813
1814    fn pending_block_and_receipts(
1815        &self,
1816    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1817        Ok(None)
1818    }
1819
1820    /// Returns the block with senders with matching number or hash from database.
1821    ///
1822    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1823    /// spot, and we want fast querying.**
1824    ///
1825    /// If the header for this block is not found, this returns `None`.
1826    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1827    /// will return None.
1828    fn recovered_block(
1829        &self,
1830        id: BlockHashOrNumber,
1831        transaction_kind: TransactionVariant,
1832    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1833        self.recovered_block(
1834            id,
1835            transaction_kind,
1836            |block_number| self.header_by_number(block_number),
1837            |header, body, senders| {
1838                Self::Block::new(header, body)
1839                    // Note: we're using unchecked here because we know the block contains valid txs
1840                    // wrt to its height and can ignore the s value check so pre
1841                    // EIP-2 txs are allowed
1842                    .try_into_recovered_unchecked(senders)
1843                    .map(Some)
1844                    .map_err(|_| ProviderError::SenderRecoveryError)
1845            },
1846        )
1847    }
1848
1849    fn sealed_block_with_senders(
1850        &self,
1851        id: BlockHashOrNumber,
1852        transaction_kind: TransactionVariant,
1853    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1854        self.recovered_block(
1855            id,
1856            transaction_kind,
1857            |block_number| self.sealed_header(block_number),
1858            |header, body, senders| {
1859                Self::Block::new_sealed(header, body)
1860                    // Note: we're using unchecked here because we know the block contains valid txs
1861                    // wrt to its height and can ignore the s value check so pre
1862                    // EIP-2 txs are allowed
1863                    .try_with_senders_unchecked(senders)
1864                    .map(Some)
1865                    .map_err(|_| ProviderError::SenderRecoveryError)
1866            },
1867        )
1868    }
1869
1870    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1871        self.block_range(
1872            range,
1873            |range| self.headers_range(range),
1874            |header, body, _| Ok(Self::Block::new(header, body)),
1875        )
1876    }
1877
1878    fn block_with_senders_range(
1879        &self,
1880        range: RangeInclusive<BlockNumber>,
1881    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1882        self.block_with_senders_range(
1883            range,
1884            |range| self.headers_range(range),
1885            |header, body, senders| {
1886                Self::Block::new(header, body)
1887                    .try_into_recovered_unchecked(senders)
1888                    .map_err(|_| ProviderError::SenderRecoveryError)
1889            },
1890        )
1891    }
1892
1893    fn recovered_block_range(
1894        &self,
1895        range: RangeInclusive<BlockNumber>,
1896    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1897        self.block_with_senders_range(
1898            range,
1899            |range| self.sealed_headers_range(range),
1900            |header, body, senders| {
1901                Self::Block::new_sealed(header, body)
1902                    .try_with_senders(senders)
1903                    .map_err(|_| ProviderError::SenderRecoveryError)
1904            },
1905        )
1906    }
1907
1908    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1909        Ok(self
1910            .tx
1911            .cursor_read::<tables::TransactionBlocks>()?
1912            .seek(id)
1913            .map(|b| b.map(|(_, bn)| bn))?)
1914    }
1915}
1916
1917impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1918    for DatabaseProvider<TX, N>
1919{
1920    /// Recovers transaction hashes by walking through `Transactions` table and
1921    /// calculating them in a parallel manner. Returned unsorted.
1922    fn transaction_hashes_by_range(
1923        &self,
1924        tx_range: Range<TxNumber>,
1925    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1926        self.static_file_provider.transaction_hashes_by_range(tx_range)
1927    }
1928}
1929
1930// Calculates the hash of the given transaction
1931impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1932    type Transaction = TxTy<N>;
1933
1934    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1935        self.with_rocksdb_snapshot(|rocksdb_ref| {
1936            let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1937            reader.get_transaction_hash_number(tx_hash)
1938        })
1939    }
1940
1941    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1942        self.static_file_provider.transaction_by_id(id)
1943    }
1944
1945    fn transaction_by_id_unhashed(
1946        &self,
1947        id: TxNumber,
1948    ) -> ProviderResult<Option<Self::Transaction>> {
1949        self.static_file_provider.transaction_by_id_unhashed(id)
1950    }
1951
1952    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1953        if let Some(id) = self.transaction_id(hash)? {
1954            Ok(self.transaction_by_id_unhashed(id)?)
1955        } else {
1956            Ok(None)
1957        }
1958    }
1959
1960    fn transaction_by_hash_with_meta(
1961        &self,
1962        tx_hash: TxHash,
1963    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1964        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1965            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1966            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1967            let Some(sealed_header) = self.sealed_header(block_number)?
1968        {
1969            let (header, block_hash) = sealed_header.split();
1970            if let Some(block_body) = self.block_body_indices(block_number)? {
1971                // the index of the tx in the block is the offset:
1972                // len([start..tx_id])
1973                // NOTE: `transaction_id` is always `>=` the block's first
1974                // index
1975                let index = transaction_id - block_body.first_tx_num();
1976
1977                let meta = TransactionMeta {
1978                    tx_hash,
1979                    index,
1980                    block_hash,
1981                    block_number,
1982                    base_fee: header.base_fee_per_gas(),
1983                    excess_blob_gas: header.excess_blob_gas(),
1984                    timestamp: header.timestamp(),
1985                };
1986
1987                return Ok(Some((transaction, meta)))
1988            }
1989        }
1990
1991        Ok(None)
1992    }
1993
1994    fn transactions_by_block(
1995        &self,
1996        id: BlockHashOrNumber,
1997    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1998        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1999            let Some(body) = self.block_body_indices(block_number)?
2000        {
2001            let tx_range = body.tx_num_range();
2002            return if tx_range.is_empty() {
2003                Ok(Some(Vec::new()))
2004            } else {
2005                self.transactions_by_tx_range(tx_range).map(Some)
2006            }
2007        }
2008        Ok(None)
2009    }
2010
2011    fn transactions_by_block_range(
2012        &self,
2013        range: impl RangeBounds<BlockNumber>,
2014    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2015        let range = to_range(range);
2016
2017        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2018            .into_iter()
2019            .map(|body| {
2020                let tx_num_range = body.tx_num_range();
2021                if tx_num_range.is_empty() {
2022                    Ok(Vec::new())
2023                } else {
2024                    self.transactions_by_tx_range(tx_num_range)
2025                }
2026            })
2027            .collect()
2028    }
2029
2030    fn transactions_by_tx_range(
2031        &self,
2032        range: impl RangeBounds<TxNumber>,
2033    ) -> ProviderResult<Vec<Self::Transaction>> {
2034        self.static_file_provider.transactions_by_tx_range(range)
2035    }
2036
2037    fn senders_by_tx_range(
2038        &self,
2039        range: impl RangeBounds<TxNumber>,
2040    ) -> ProviderResult<Vec<Address>> {
2041        if EitherWriterDestination::senders(self).is_static_file() {
2042            self.static_file_provider.senders_by_tx_range(range)
2043        } else {
2044            self.cursor_read_collect::<tables::TransactionSenders>(range)
2045        }
2046    }
2047
2048    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2049        if EitherWriterDestination::senders(self).is_static_file() {
2050            self.static_file_provider.transaction_sender(id)
2051        } else {
2052            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2053        }
2054    }
2055}
2056
2057impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2058    type Receipt = ReceiptTy<N>;
2059
2060    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2061        self.static_file_provider.get_with_static_file_or_database(
2062            StaticFileSegment::Receipts,
2063            id,
2064            |static_file| static_file.receipt(id),
2065            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2066        )
2067    }
2068
2069    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2070        if let Some(id) = self.transaction_id(hash)? {
2071            self.receipt(id)
2072        } else {
2073            Ok(None)
2074        }
2075    }
2076
2077    fn receipts_by_block(
2078        &self,
2079        block: BlockHashOrNumber,
2080    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2081        if let Some(number) = self.convert_hash_or_number(block)? &&
2082            let Some(body) = self.block_body_indices(number)?
2083        {
2084            let tx_range = body.tx_num_range();
2085            return if tx_range.is_empty() {
2086                Ok(Some(Vec::new()))
2087            } else {
2088                self.receipts_by_tx_range(tx_range).map(Some)
2089            }
2090        }
2091        Ok(None)
2092    }
2093
2094    fn receipts_by_tx_range(
2095        &self,
2096        range: impl RangeBounds<TxNumber>,
2097    ) -> ProviderResult<Vec<Self::Receipt>> {
2098        self.static_file_provider.get_range_with_static_file_or_database(
2099            StaticFileSegment::Receipts,
2100            to_range(range),
2101            |static_file, range, _| static_file.receipts_by_tx_range(range),
2102            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2103            |_| true,
2104        )
2105    }
2106
2107    fn receipts_by_block_range(
2108        &self,
2109        block_range: RangeInclusive<BlockNumber>,
2110    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2111        if block_range.is_empty() {
2112            return Ok(Vec::new());
2113        }
2114
2115        // collect block body indices for each block in the range
2116        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2117        let mut block_body_indices = Vec::with_capacity(range_len);
2118        for block_num in block_range {
2119            if let Some(indices) = self.block_body_indices(block_num)? {
2120                block_body_indices.push(indices);
2121            } else {
2122                // use default indices for missing blocks (empty block)
2123                block_body_indices.push(StoredBlockBodyIndices::default());
2124            }
2125        }
2126
2127        if block_body_indices.is_empty() {
2128            return Ok(Vec::new());
2129        }
2130
2131        // find blocks with transactions to determine transaction range
2132        let non_empty_blocks: Vec<_> =
2133            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2134
2135        if non_empty_blocks.is_empty() {
2136            // all blocks are empty
2137            return Ok(vec![Vec::new(); block_body_indices.len()]);
2138        }
2139
2140        // calculate the overall transaction range
2141        let first_tx = non_empty_blocks[0].first_tx_num();
2142        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2143
2144        // fetch all receipts in the transaction range
2145        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2146        let mut receipts_iter = all_receipts.into_iter();
2147
2148        // distribute receipts to their respective blocks
2149        let mut result = Vec::with_capacity(block_body_indices.len());
2150        for indices in &block_body_indices {
2151            if indices.tx_count == 0 {
2152                result.push(Vec::new());
2153            } else {
2154                let block_receipts =
2155                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2156                result.push(block_receipts);
2157            }
2158        }
2159
2160        Ok(result)
2161    }
2162}
2163
2164impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2165    for DatabaseProvider<TX, N>
2166{
2167    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2168        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2169    }
2170
2171    fn block_body_indices_range(
2172        &self,
2173        range: RangeInclusive<BlockNumber>,
2174    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2175        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2176    }
2177}
2178
2179impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2180    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2181        Ok(if let Some(encoded) = id.get_pre_encoded() {
2182            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2183        } else {
2184            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2185        })
2186    }
2187
2188    /// Get stage checkpoint progress.
2189    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2190        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2191    }
2192
2193    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2194        self.tx
2195            .cursor_read::<tables::StageCheckpoints>()?
2196            .walk(None)?
2197            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2198            .map_err(ProviderError::Database)
2199    }
2200}
2201
2202impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2203    /// Save stage checkpoint.
2204    fn save_stage_checkpoint(
2205        &self,
2206        id: StageId,
2207        checkpoint: StageCheckpoint,
2208    ) -> ProviderResult<()> {
2209        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2210    }
2211
2212    /// Save stage checkpoint progress.
2213    fn save_stage_checkpoint_progress(
2214        &self,
2215        id: StageId,
2216        checkpoint: Vec<u8>,
2217    ) -> ProviderResult<()> {
2218        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2219    }
2220
2221    #[instrument(level = "debug", target = "providers::db", skip_all)]
2222    fn update_pipeline_stages(
2223        &self,
2224        block_number: BlockNumber,
2225        drop_stage_checkpoint: bool,
2226    ) -> ProviderResult<()> {
2227        // iterate over all existing stages in the table and update its progress.
2228        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2229        for stage_id in StageId::ALL {
2230            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2231            cursor.upsert(
2232                stage_id.to_string(),
2233                &StageCheckpoint {
2234                    block_number,
2235                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2236                },
2237            )?;
2238        }
2239
2240        Ok(())
2241    }
2242}
2243
2244impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2245    fn plain_state_storages(
2246        &self,
2247        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2248    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2249        if self.cached_storage_settings().use_hashed_state() {
2250            let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2251
2252            addresses_with_keys
2253                .into_iter()
2254                .map(|(address, storage)| {
2255                    let hashed_address = keccak256(address);
2256                    storage
2257                        .into_iter()
2258                        .map(|key| -> ProviderResult<_> {
2259                            let hashed_key = keccak256(key);
2260                            let value = hashed_storage
2261                                .seek_by_key_subkey(hashed_address, hashed_key)?
2262                                .filter(|v| v.key == hashed_key)
2263                                .map(|v| v.value)
2264                                .unwrap_or_default();
2265                            Ok(StorageEntry { key, value })
2266                        })
2267                        .collect::<ProviderResult<Vec<_>>>()
2268                        .map(|storage| (address, storage))
2269                })
2270                .collect::<ProviderResult<Vec<(_, _)>>>()
2271        } else {
2272            let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2273
2274            addresses_with_keys
2275                .into_iter()
2276                .map(|(address, storage)| {
2277                    storage
2278                        .into_iter()
2279                        .map(|key| -> ProviderResult<_> {
2280                            Ok(plain_storage
2281                                .seek_by_key_subkey(address, key)?
2282                                .filter(|v| v.key == key)
2283                                .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2284                        })
2285                        .collect::<ProviderResult<Vec<_>>>()
2286                        .map(|storage| (address, storage))
2287                })
2288                .collect::<ProviderResult<Vec<(_, _)>>>()
2289        }
2290    }
2291
2292    fn changed_storages_with_range(
2293        &self,
2294        range: RangeInclusive<BlockNumber>,
2295    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2296        if self.cached_storage_settings().storage_v2 {
2297            self.storage_changesets_range(range)?.into_iter().try_fold(
2298                BTreeMap::new(),
2299                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2300                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2301                    accounts.entry(address).or_default().insert(storage_entry.key);
2302                    Ok(accounts)
2303                },
2304            )
2305        } else {
2306            self.tx
2307                .cursor_read::<tables::StorageChangeSets>()?
2308                .walk_range(BlockNumberAddress::range(range))?
2309                // fold all storages and save its old state so we can remove it from HashedStorage
2310                // it is needed as it is dup table.
2311                .try_fold(
2312                    BTreeMap::new(),
2313                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2314                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2315                        accounts.entry(address).or_default().insert(storage_entry.key);
2316                        Ok(accounts)
2317                    },
2318                )
2319        }
2320    }
2321
2322    fn changed_storages_and_blocks_with_range(
2323        &self,
2324        range: RangeInclusive<BlockNumber>,
2325    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2326        if self.cached_storage_settings().storage_v2 {
2327            self.storage_changesets_range(range)?.into_iter().try_fold(
2328                BTreeMap::new(),
2329                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2330                    storages
2331                        .entry((index.address(), storage.key))
2332                        .or_default()
2333                        .push(index.block_number());
2334                    Ok(storages)
2335                },
2336            )
2337        } else {
2338            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2339
2340            let storage_changeset_lists =
2341                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2342                    BTreeMap::new(),
2343                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2344                     entry|
2345                     -> ProviderResult<_> {
2346                        let (index, storage) = entry?;
2347                        storages
2348                            .entry((index.address(), storage.key))
2349                            .or_default()
2350                            .push(index.block_number());
2351                        Ok(storages)
2352                    },
2353                )?;
2354
2355            Ok(storage_changeset_lists)
2356        }
2357    }
2358}
2359
2360impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2361    for DatabaseProvider<TX, N>
2362{
2363    type Receipt = ReceiptTy<N>;
2364
2365    #[instrument(level = "debug", target = "providers::db", skip_all)]
2366    fn write_state<'a>(
2367        &self,
2368        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2369        is_value_known: OriginalValuesKnown,
2370        config: StateWriteConfig,
2371    ) -> ProviderResult<()> {
2372        let execution_outcome = execution_outcome.into();
2373
2374        if self.cached_storage_settings().use_hashed_state() &&
2375            !config.write_receipts &&
2376            !config.write_account_changesets &&
2377            !config.write_storage_changesets
2378        {
2379            // In storage v2 with all outputs directed to static files, plain state and changesets
2380            // are written elsewhere. Only bytecodes need MDBX writes, so skip the expensive
2381            // to_plain_state_and_reverts conversion that iterates all accounts and storage.
2382            self.write_bytecodes(
2383                execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2384            )?;
2385            return Ok(());
2386        }
2387
2388        let first_block = execution_outcome.first_block();
2389        let (plain_state, reverts) =
2390            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2391
2392        self.write_state_reverts(reverts, first_block, config)?;
2393        self.write_state_changes(plain_state)?;
2394
2395        if !config.write_receipts {
2396            return Ok(());
2397        }
2398
2399        let block_count = execution_outcome.len() as u64;
2400        let last_block = execution_outcome.last_block();
2401        let block_range = first_block..=last_block;
2402
2403        let tip = self.last_block_number()?.max(last_block);
2404
2405        // Fetch the first transaction number for each block in the range
2406        let block_indices: Vec<_> = self
2407            .block_body_indices_range(block_range)?
2408            .into_iter()
2409            .map(|b| b.first_tx_num)
2410            .collect();
2411
2412        // Ensure all expected blocks are present.
2413        if block_indices.len() < block_count as usize {
2414            let missing_blocks = block_count - block_indices.len() as u64;
2415            return Err(ProviderError::BlockBodyIndicesNotFound(
2416                last_block.saturating_sub(missing_blocks - 1),
2417            ));
2418        }
2419
2420        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2421
2422        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2423        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2424
2425        // All receipts from the last 128 blocks are required for blockchain tree, even with
2426        // [`PruneSegment::ContractLogs`].
2427        //
2428        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2429        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2430        // files yet. Once receipts exist in static files, we must continue writing to maintain
2431        // continuity and have no gaps.
2432        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2433            self.static_file_provider()
2434                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2435                .is_none()) &&
2436            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2437
2438        // Prepare set of addresses which logs should not be pruned.
2439        let mut allowed_addresses: AddressSet = AddressSet::default();
2440        for (_, addresses) in contract_log_pruner.range(..first_block) {
2441            allowed_addresses.extend(addresses.iter().copied());
2442        }
2443
2444        for (idx, (receipts, first_tx_index)) in
2445            execution_outcome.receipts().zip(block_indices).enumerate()
2446        {
2447            let block_number = first_block + idx as u64;
2448
2449            // Increment block number for receipts static file writer
2450            receipts_writer.increment_block(block_number)?;
2451
2452            // Skip writing receipts if pruning configuration requires us to.
2453            if prunable_receipts &&
2454                self.prune_modes
2455                    .receipts
2456                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2457            {
2458                continue
2459            }
2460
2461            // If there are new addresses to retain after this block number, track them
2462            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2463                allowed_addresses.extend(new_addresses.iter().copied());
2464            }
2465
2466            for (idx, receipt) in receipts.iter().enumerate() {
2467                let receipt_idx = first_tx_index + idx as u64;
2468                // Skip writing receipt if log filter is active and it does not have any logs to
2469                // retain
2470                if prunable_receipts &&
2471                    has_contract_log_filter &&
2472                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2473                {
2474                    continue
2475                }
2476
2477                receipts_writer.append_receipt(receipt_idx, receipt)?;
2478            }
2479        }
2480
2481        Ok(())
2482    }
2483
2484    fn write_state_reverts(
2485        &self,
2486        reverts: PlainStateReverts,
2487        first_block: BlockNumber,
2488        config: StateWriteConfig,
2489    ) -> ProviderResult<()> {
2490        // Write storage changes
2491        if config.write_storage_changesets {
2492            tracing::trace!("Writing storage changes");
2493            let mut storages_cursor =
2494                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2495            for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2496                let block_number = first_block + block_index as BlockNumber;
2497
2498                tracing::trace!(block_number, "Writing block change");
2499                // sort changes by address.
2500                storage_changes.par_sort_unstable_by_key(|a| a.address);
2501                let total_changes =
2502                    storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2503                let mut changeset = Vec::with_capacity(total_changes);
2504                for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2505                    let mut storage = storage_revert
2506                        .into_iter()
2507                        .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2508                        .collect::<Vec<_>>();
2509                    // sort storage slots by key.
2510                    storage.par_sort_unstable_by_key(|a| a.0);
2511
2512                    // If we are writing the primary storage wipe transition, the pre-existing
2513                    // storage state has to be taken from the database and written to storage
2514                    // history. See [StorageWipe::Primary] for more details.
2515                    //
2516                    // TODO(mediocregopher): This could be rewritten in a way which doesn't
2517                    // require collecting wiped entries into a Vec like this, see
2518                    // `write_storage_trie_changesets`.
2519                    let mut wiped_storage = Vec::new();
2520                    if wiped {
2521                        tracing::trace!(?address, "Wiping storage");
2522                        if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2523                            wiped_storage.push((entry.key, entry.value));
2524                            while let Some(entry) = storages_cursor.next_dup_val()? {
2525                                wiped_storage.push((entry.key, entry.value))
2526                            }
2527                        }
2528                    }
2529
2530                    tracing::trace!(?address, ?storage, "Writing storage reverts");
2531                    for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2532                        changeset.push(StorageBeforeTx { address, key, value });
2533                    }
2534                }
2535
2536                let mut storage_changesets_writer =
2537                    EitherWriter::new_storage_changesets(self, block_number)?;
2538                storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2539            }
2540        }
2541
2542        if !config.write_account_changesets {
2543            return Ok(());
2544        }
2545
2546        // Write account changes
2547        tracing::trace!(?first_block, "Writing account changes");
2548        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2549            let block_number = first_block + block_index as BlockNumber;
2550            let changeset = account_block_reverts
2551                .into_iter()
2552                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2553                .collect::<Vec<_>>();
2554            let mut account_changesets_writer =
2555                EitherWriter::new_account_changesets(self, block_number)?;
2556
2557            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2558        }
2559
2560        Ok(())
2561    }
2562
2563    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2564        // sort all entries so they can be written to database in more performant way.
2565        // and take smaller memory footprint.
2566        changes.accounts.par_sort_by_key(|a| a.0);
2567        changes.storage.par_sort_by_key(|a| a.address);
2568        changes.contracts.par_sort_by_key(|a| a.0);
2569
2570        if !self.cached_storage_settings().use_hashed_state() {
2571            // Write new account state
2572            tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2573            let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2574            // write account to database.
2575            for (address, account) in changes.accounts {
2576                if let Some(account) = account {
2577                    tracing::trace!(?address, "Updating plain state account");
2578                    accounts_cursor.upsert(address, &account.into())?;
2579                } else if accounts_cursor.seek_exact(address)?.is_some() {
2580                    tracing::trace!(?address, "Deleting plain state account");
2581                    accounts_cursor.delete_current()?;
2582                }
2583            }
2584
2585            // Write new storage state and wipe storage if needed.
2586            tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2587            let mut storages_cursor =
2588                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2589            for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2590                // Wiping of storage.
2591                if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2592                    storages_cursor.delete_current_duplicates()?;
2593                }
2594                // cast storages to B256.
2595                let mut storage = storage
2596                    .into_iter()
2597                    .map(|(k, value)| StorageEntry { key: k.into(), value })
2598                    .collect::<Vec<_>>();
2599                // sort storage slots by key.
2600                storage.par_sort_unstable_by_key(|a| a.key);
2601
2602                for entry in storage {
2603                    tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2604                    if let Some(db_entry) =
2605                        storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2606                        db_entry.key == entry.key
2607                    {
2608                        storages_cursor.delete_current()?;
2609                    }
2610
2611                    if !entry.value.is_zero() {
2612                        storages_cursor.upsert(address, &entry)?;
2613                    }
2614                }
2615            }
2616        }
2617
2618        // Write bytecode
2619        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2620        self.write_bytecodes(
2621            changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2622        )?;
2623
2624        Ok(())
2625    }
2626
2627    #[instrument(level = "debug", target = "providers::db", skip_all)]
2628    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2629        // Write hashed account updates.
2630        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2631        for (hashed_address, account) in hashed_state.accounts() {
2632            if let Some(account) = account {
2633                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2634            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2635                hashed_accounts_cursor.delete_current()?;
2636            }
2637        }
2638
2639        // Write hashed storage changes.
2640        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2641        let mut hashed_storage_cursor =
2642            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2643        for (hashed_address, storage) in sorted_storages {
2644            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2645                hashed_storage_cursor.delete_current_duplicates()?;
2646            }
2647
2648            for (hashed_slot, value) in storage.storage_slots_ref() {
2649                let entry = StorageEntry { key: *hashed_slot, value: *value };
2650
2651                if let Some(db_entry) =
2652                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2653                    db_entry.key == entry.key
2654                {
2655                    hashed_storage_cursor.delete_current()?;
2656                }
2657
2658                if !entry.value.is_zero() {
2659                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2660                }
2661            }
2662        }
2663
2664        Ok(())
2665    }
2666
2667    /// Remove the last N blocks of state.
2668    ///
2669    /// The latest state will be unwound
2670    ///
2671    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2672    ///    transaction ids.
2673    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2674    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2675    ///    the changesets.
2676    ///    - In order to have both the old and new values in the changesets, we also access the
2677    ///      plain state tables.
2678    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2679    ///    we:
2680    ///     1. Take the old value from the changeset
2681    ///     2. Take the new value from the plain state
2682    ///     3. Save the old value to the local state
2683    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2684    ///    have seen before we:
2685    ///     1. Take the old value from the changeset
2686    ///     2. Take the new value from the local state
2687    ///     3. Set the local state to the value in the changeset
2688    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2689        let range = block + 1..=self.last_block_number()?;
2690
2691        if range.is_empty() {
2692            return Ok(());
2693        }
2694
2695        // We are not removing block meta as it is used to get block changesets.
2696        let block_bodies = self.block_body_indices_range(range.clone())?;
2697
2698        // get transaction receipts
2699        let from_transaction_num =
2700            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2701
2702        let storage_range = BlockNumberAddress::range(range.clone());
2703        let storage_changeset = if self.cached_storage_settings().storage_v2 {
2704            let changesets = self.storage_changesets_range(range.clone())?;
2705            let mut changeset_writer =
2706                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2707            changeset_writer.prune_storage_changesets(block)?;
2708            changesets
2709        } else {
2710            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2711        };
2712        let account_changeset = if self.cached_storage_settings().storage_v2 {
2713            let changesets = self.account_changesets_range(range)?;
2714            let mut changeset_writer =
2715                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2716            changeset_writer.prune_account_changesets(block)?;
2717            changesets
2718        } else {
2719            self.take::<tables::AccountChangeSets>(range)?
2720        };
2721
2722        if self.cached_storage_settings().use_hashed_state() {
2723            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2724            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2725
2726            let (state, _) = self.populate_bundle_state_hashed(
2727                account_changeset,
2728                storage_changeset,
2729                &mut hashed_accounts_cursor,
2730                &mut hashed_storage_cursor,
2731            )?;
2732
2733            for (address, (old_account, new_account, storage)) in &state {
2734                if old_account != new_account {
2735                    let hashed_address = keccak256(address);
2736                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2737                    if let Some(account) = old_account {
2738                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2739                    } else if existing_entry.is_some() {
2740                        hashed_accounts_cursor.delete_current()?;
2741                    }
2742                }
2743
2744                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2745                    let hashed_address = keccak256(address);
2746                    let hashed_storage_key = keccak256(storage_key);
2747                    let storage_entry =
2748                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2749                    if hashed_storage_cursor
2750                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2751                        .is_some_and(|s| s.key == hashed_storage_key)
2752                    {
2753                        hashed_storage_cursor.delete_current()?
2754                    }
2755
2756                    if !old_storage_value.is_zero() {
2757                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2758                    }
2759                }
2760            }
2761        } else {
2762            // This is not working for blocks that are not at tip. as plain state is not the last
2763            // state of end range. We should rename the functions or add support to access
2764            // History state. Accessing history state can be tricky but we are not gaining
2765            // anything.
2766            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2767            let mut plain_storage_cursor =
2768                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2769
2770            let (state, _) = self.populate_bundle_state(
2771                account_changeset,
2772                storage_changeset,
2773                &mut plain_accounts_cursor,
2774                &mut plain_storage_cursor,
2775            )?;
2776
2777            for (address, (old_account, new_account, storage)) in &state {
2778                if old_account != new_account {
2779                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2780                    if let Some(account) = old_account {
2781                        plain_accounts_cursor.upsert(*address, account)?;
2782                    } else if existing_entry.is_some() {
2783                        plain_accounts_cursor.delete_current()?;
2784                    }
2785                }
2786
2787                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2788                    let storage_entry =
2789                        StorageEntry { key: *storage_key, value: *old_storage_value };
2790                    if plain_storage_cursor
2791                        .seek_by_key_subkey(*address, *storage_key)?
2792                        .is_some_and(|s| s.key == *storage_key)
2793                    {
2794                        plain_storage_cursor.delete_current()?
2795                    }
2796
2797                    if !old_storage_value.is_zero() {
2798                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2799                    }
2800                }
2801            }
2802        }
2803
2804        self.remove_receipts_from(from_transaction_num, block)?;
2805
2806        Ok(())
2807    }
2808
2809    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2810    ///
2811    /// The latest state will be unwound and returned back with all the blocks
2812    ///
2813    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2814    ///    transaction ids.
2815    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2816    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2817    ///    the changesets.
2818    ///    - In order to have both the old and new values in the changesets, we also access the
2819    ///      plain state tables.
2820    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2821    ///    we:
2822    ///     1. Take the old value from the changeset
2823    ///     2. Take the new value from the plain state
2824    ///     3. Save the old value to the local state
2825    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2826    ///    have seen before we:
2827    ///     1. Take the old value from the changeset
2828    ///     2. Take the new value from the local state
2829    ///     3. Set the local state to the value in the changeset
2830    fn take_state_above(
2831        &self,
2832        block: BlockNumber,
2833    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2834        let range = block + 1..=self.last_block_number()?;
2835
2836        if range.is_empty() {
2837            return Ok(ExecutionOutcome::default())
2838        }
2839        let start_block_number = *range.start();
2840
2841        // We are not removing block meta as it is used to get block changesets.
2842        let block_bodies = self.block_body_indices_range(range.clone())?;
2843
2844        // get transaction receipts
2845        let from_transaction_num =
2846            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2847        let to_transaction_num =
2848            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2849
2850        let storage_range = BlockNumberAddress::range(range.clone());
2851        let storage_changeset = if let Some(highest_block) = self
2852            .static_file_provider
2853            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2854            self.cached_storage_settings().storage_v2
2855        {
2856            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2857            let mut changeset_writer =
2858                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2859            changeset_writer.prune_storage_changesets(block)?;
2860            changesets
2861        } else {
2862            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2863        };
2864
2865        // if there are static files for this segment, prune them.
2866        let highest_changeset_block = self
2867            .static_file_provider
2868            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2869        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2870            self.cached_storage_settings().storage_v2
2871        {
2872            // TODO: add a `take` method that removes and returns the items instead of doing this
2873            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2874            let mut changeset_writer =
2875                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2876            changeset_writer.prune_account_changesets(block)?;
2877
2878            changesets
2879        } else {
2880            // Have to remove from static files if they exist, otherwise remove using `take` for the
2881            // changeset tables
2882            self.take::<tables::AccountChangeSets>(range)?
2883        };
2884
2885        let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2886            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2887            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2888
2889            let (state, reverts) = self.populate_bundle_state_hashed(
2890                account_changeset,
2891                storage_changeset,
2892                &mut hashed_accounts_cursor,
2893                &mut hashed_storage_cursor,
2894            )?;
2895
2896            for (address, (old_account, new_account, storage)) in &state {
2897                if old_account != new_account {
2898                    let hashed_address = keccak256(address);
2899                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2900                    if let Some(account) = old_account {
2901                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2902                    } else if existing_entry.is_some() {
2903                        hashed_accounts_cursor.delete_current()?;
2904                    }
2905                }
2906
2907                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2908                    let hashed_address = keccak256(address);
2909                    let hashed_storage_key = keccak256(storage_key);
2910                    let storage_entry =
2911                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2912                    if hashed_storage_cursor
2913                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2914                        .is_some_and(|s| s.key == hashed_storage_key)
2915                    {
2916                        hashed_storage_cursor.delete_current()?
2917                    }
2918
2919                    if !old_storage_value.is_zero() {
2920                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2921                    }
2922                }
2923            }
2924
2925            (state, reverts)
2926        } else {
2927            // This is not working for blocks that are not at tip. as plain state is not the last
2928            // state of end range. We should rename the functions or add support to access
2929            // History state. Accessing history state can be tricky but we are not gaining
2930            // anything.
2931            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2932            let mut plain_storage_cursor =
2933                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2934
2935            let (state, reverts) = self.populate_bundle_state(
2936                account_changeset,
2937                storage_changeset,
2938                &mut plain_accounts_cursor,
2939                &mut plain_storage_cursor,
2940            )?;
2941
2942            for (address, (old_account, new_account, storage)) in &state {
2943                if old_account != new_account {
2944                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2945                    if let Some(account) = old_account {
2946                        plain_accounts_cursor.upsert(*address, account)?;
2947                    } else if existing_entry.is_some() {
2948                        plain_accounts_cursor.delete_current()?;
2949                    }
2950                }
2951
2952                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2953                    let storage_entry =
2954                        StorageEntry { key: *storage_key, value: *old_storage_value };
2955                    if plain_storage_cursor
2956                        .seek_by_key_subkey(*address, *storage_key)?
2957                        .is_some_and(|s| s.key == *storage_key)
2958                    {
2959                        plain_storage_cursor.delete_current()?
2960                    }
2961
2962                    if !old_storage_value.is_zero() {
2963                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2964                    }
2965                }
2966            }
2967
2968            (state, reverts)
2969        };
2970
2971        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2972        let mut receipts_iter = self
2973            .static_file_provider
2974            .get_range_with_static_file_or_database(
2975                StaticFileSegment::Receipts,
2976                from_transaction_num..to_transaction_num + 1,
2977                |static_file, range, _| {
2978                    static_file
2979                        .receipts_by_tx_range(range.clone())
2980                        .map(|r| range.into_iter().zip(r).collect())
2981                },
2982                |range, _| {
2983                    self.tx
2984                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2985                        .walk_range(range)?
2986                        .map(|r| r.map_err(Into::into))
2987                        .collect()
2988                },
2989                |_| true,
2990            )?
2991            .into_iter()
2992            .peekable();
2993
2994        let mut receipts = Vec::with_capacity(block_bodies.len());
2995        // loop break if we are at the end of the blocks.
2996        for block_body in block_bodies {
2997            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2998            for num in block_body.tx_num_range() {
2999                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3000                    block_receipts.push(receipts_iter.next().unwrap().1);
3001                }
3002            }
3003            receipts.push(block_receipts);
3004        }
3005
3006        self.remove_receipts_from(from_transaction_num, block)?;
3007
3008        Ok(ExecutionOutcome::new_init(
3009            state,
3010            reverts,
3011            Vec::new(),
3012            receipts,
3013            start_block_number,
3014            Vec::new(),
3015        ))
3016    }
3017}
3018
3019impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3020    fn write_account_trie_updates<A: TrieTableAdapter>(
3021        tx: &TX,
3022        trie_updates: &TrieUpdatesSorted,
3023        num_entries: &mut usize,
3024    ) -> ProviderResult<()>
3025    where
3026        TX: DbTxMut,
3027    {
3028        let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3029        // Process sorted account nodes
3030        for (key, updated_node) in trie_updates.account_nodes_ref() {
3031            let nibbles = A::AccountKey::from(*key);
3032            match updated_node {
3033                Some(node) => {
3034                    if !key.is_empty() {
3035                        *num_entries += 1;
3036                        account_trie_cursor.upsert(nibbles, node)?;
3037                    }
3038                }
3039                None => {
3040                    *num_entries += 1;
3041                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3042                        account_trie_cursor.delete_current()?;
3043                    }
3044                }
3045            }
3046        }
3047        Ok(())
3048    }
3049
3050    fn write_storage_tries<A: TrieTableAdapter>(
3051        tx: &TX,
3052        storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3053        num_entries: &mut usize,
3054    ) -> ProviderResult<()>
3055    where
3056        TX: DbTxMut,
3057    {
3058        let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3059        for (hashed_address, storage_trie_updates) in storage_tries {
3060            let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3061                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3062            *num_entries +=
3063                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3064            cursor = db_storage_trie_cursor.cursor;
3065        }
3066        Ok(())
3067    }
3068}
3069
3070impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3071    /// Writes trie updates to the database with already sorted updates.
3072    ///
3073    /// Returns the number of entries modified.
3074    #[instrument(level = "debug", target = "providers::db", skip_all)]
3075    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3076        if trie_updates.is_empty() {
3077            return Ok(0)
3078        }
3079
3080        // Track the number of inserted entries.
3081        let mut num_entries = 0;
3082
3083        reth_trie_db::with_adapter!(self, |A| {
3084            Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3085        });
3086
3087        num_entries +=
3088            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3089
3090        Ok(num_entries)
3091    }
3092}
3093
3094impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3095    /// Writes storage trie updates from the given storage trie map with already sorted updates.
3096    ///
3097    /// Expects the storage trie updates to already be sorted by the hashed address key.
3098    ///
3099    /// Returns the number of entries modified.
3100    fn write_storage_trie_updates_sorted<'a>(
3101        &self,
3102        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3103    ) -> ProviderResult<usize> {
3104        let mut num_entries = 0;
3105        let mut storage_tries = storage_tries.collect::<Vec<_>>();
3106        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3107        reth_trie_db::with_adapter!(self, |A| {
3108            Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3109        });
3110        Ok(num_entries)
3111    }
3112}
3113
3114impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3115    fn unwind_account_hashing<'a>(
3116        &self,
3117        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3118    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3119        // Aggregate all block changesets and make a list of accounts that have been changed.
3120        // Note that collecting and then reversing the order is necessary to ensure that the
3121        // changes are applied in the correct order.
3122        let hashed_accounts = changesets
3123            .into_iter()
3124            .map(|(_, e)| (keccak256(e.address), e.info))
3125            .collect::<Vec<_>>()
3126            .into_iter()
3127            .rev()
3128            .collect::<BTreeMap<_, _>>();
3129
3130        // Apply values to HashedState, and remove the account if it's None.
3131        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3132        for (hashed_address, account) in &hashed_accounts {
3133            if let Some(account) = account {
3134                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3135            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3136                hashed_accounts_cursor.delete_current()?;
3137            }
3138        }
3139
3140        Ok(hashed_accounts)
3141    }
3142
3143    fn unwind_account_hashing_range(
3144        &self,
3145        range: impl RangeBounds<BlockNumber>,
3146    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3147        let changesets = self.account_changesets_range(range)?;
3148        self.unwind_account_hashing(changesets.iter())
3149    }
3150
3151    fn insert_account_for_hashing(
3152        &self,
3153        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3154    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3155        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3156        let hashed_accounts =
3157            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3158        for (hashed_address, account) in &hashed_accounts {
3159            if let Some(account) = account {
3160                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3161            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3162                hashed_accounts_cursor.delete_current()?;
3163            }
3164        }
3165        Ok(hashed_accounts)
3166    }
3167
3168    fn unwind_storage_hashing(
3169        &self,
3170        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3171    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3172        // Aggregate all block changesets and make list of accounts that have been changed.
3173        let mut hashed_storages = changesets
3174            .into_iter()
3175            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3176                let hashed_key = keccak256(storage_entry.key);
3177                (keccak256(address), hashed_key, storage_entry.value)
3178            })
3179            .collect::<Vec<_>>();
3180        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3181
3182        // Apply values to HashedState, and remove the account if it's None.
3183        let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3184            B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3185        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3186        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3187            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3188
3189            if hashed_storage
3190                .seek_by_key_subkey(hashed_address, key)?
3191                .is_some_and(|entry| entry.key == key)
3192            {
3193                hashed_storage.delete_current()?;
3194            }
3195
3196            if !value.is_zero() {
3197                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3198            }
3199        }
3200        Ok(hashed_storage_keys)
3201    }
3202
3203    fn unwind_storage_hashing_range(
3204        &self,
3205        range: impl RangeBounds<BlockNumber>,
3206    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3207        let changesets = self.storage_changesets_range(range)?;
3208        self.unwind_storage_hashing(changesets.into_iter())
3209    }
3210
3211    fn insert_storage_for_hashing(
3212        &self,
3213        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3214    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3215        // hash values
3216        let hashed_storages =
3217            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3218                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3219                    map.insert(keccak256(entry.key), entry.value);
3220                    map
3221                });
3222                map.insert(keccak256(address), storage);
3223                map
3224            });
3225
3226        let hashed_storage_keys = hashed_storages
3227            .iter()
3228            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3229            .collect();
3230
3231        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3232        // Hash the address and key and apply them to HashedStorage (if Storage is None
3233        // just remove it);
3234        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3235            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3236                if hashed_storage_cursor
3237                    .seek_by_key_subkey(hashed_address, key)?
3238                    .is_some_and(|entry| entry.key == key)
3239                {
3240                    hashed_storage_cursor.delete_current()?;
3241                }
3242
3243                if !value.is_zero() {
3244                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3245                }
3246                Ok(())
3247            })
3248        })?;
3249
3250        Ok(hashed_storage_keys)
3251    }
3252}
3253
3254impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3255    fn unwind_account_history_indices<'a>(
3256        &self,
3257        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3258    ) -> ProviderResult<usize> {
3259        let mut last_indices = changesets
3260            .into_iter()
3261            .map(|(index, account)| (account.address, *index))
3262            .collect::<Vec<_>>();
3263        last_indices.sort_unstable_by_key(|(a, _)| *a);
3264
3265        if self.cached_storage_settings().storage_v2 {
3266            let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3267            self.pending_rocksdb_batches.lock().push(batch);
3268        } else {
3269            // Unwind the account history index in MDBX.
3270            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3271            for &(address, rem_index) in &last_indices {
3272                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3273                    &mut cursor,
3274                    ShardedKey::last(address),
3275                    rem_index,
3276                    |sharded_key| sharded_key.key == address,
3277                )?;
3278
3279                // Check the last returned partial shard.
3280                // If it's not empty, the shard needs to be reinserted.
3281                if !partial_shard.is_empty() {
3282                    cursor.insert(
3283                        ShardedKey::last(address),
3284                        &BlockNumberList::new_pre_sorted(partial_shard),
3285                    )?;
3286                }
3287            }
3288        }
3289
3290        let changesets = last_indices.len();
3291        Ok(changesets)
3292    }
3293
3294    fn unwind_account_history_indices_range(
3295        &self,
3296        range: impl RangeBounds<BlockNumber>,
3297    ) -> ProviderResult<usize> {
3298        let changesets = self.account_changesets_range(range)?;
3299        self.unwind_account_history_indices(changesets.iter())
3300    }
3301
3302    fn insert_account_history_index(
3303        &self,
3304        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3305    ) -> ProviderResult<()> {
3306        self.append_history_index::<_, tables::AccountsHistory>(
3307            account_transitions,
3308            ShardedKey::new,
3309        )
3310    }
3311
3312    fn unwind_storage_history_indices(
3313        &self,
3314        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3315    ) -> ProviderResult<usize> {
3316        let mut storage_changesets = changesets
3317            .into_iter()
3318            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3319            .collect::<Vec<_>>();
3320        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3321
3322        if self.cached_storage_settings().storage_v2 {
3323            let batch =
3324                self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3325            self.pending_rocksdb_batches.lock().push(batch);
3326        } else {
3327            // Unwind the storage history index in MDBX.
3328            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3329            for &(address, storage_key, rem_index) in &storage_changesets {
3330                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3331                    &mut cursor,
3332                    StorageShardedKey::last(address, storage_key),
3333                    rem_index,
3334                    |storage_sharded_key| {
3335                        storage_sharded_key.address == address &&
3336                            storage_sharded_key.sharded_key.key == storage_key
3337                    },
3338                )?;
3339
3340                // Check the last returned partial shard.
3341                // If it's not empty, the shard needs to be reinserted.
3342                if !partial_shard.is_empty() {
3343                    cursor.insert(
3344                        StorageShardedKey::last(address, storage_key),
3345                        &BlockNumberList::new_pre_sorted(partial_shard),
3346                    )?;
3347                }
3348            }
3349        }
3350
3351        let changesets = storage_changesets.len();
3352        Ok(changesets)
3353    }
3354
3355    fn unwind_storage_history_indices_range(
3356        &self,
3357        range: impl RangeBounds<BlockNumber>,
3358    ) -> ProviderResult<usize> {
3359        let changesets = self.storage_changesets_range(range)?;
3360        self.unwind_storage_history_indices(changesets.into_iter())
3361    }
3362
3363    fn insert_storage_history_index(
3364        &self,
3365        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3366    ) -> ProviderResult<()> {
3367        self.append_history_index::<_, tables::StoragesHistory>(
3368            storage_transitions,
3369            |(address, storage_key), highest_block_number| {
3370                StorageShardedKey::new(address, storage_key, highest_block_number)
3371            },
3372        )
3373    }
3374
3375    #[instrument(level = "debug", target = "providers::db", skip_all)]
3376    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3377        let storage_settings = self.cached_storage_settings();
3378        if !storage_settings.storage_v2 {
3379            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3380            self.insert_account_history_index(indices)?;
3381        }
3382
3383        if !storage_settings.storage_v2 {
3384            let indices = self.changed_storages_and_blocks_with_range(range)?;
3385            self.insert_storage_history_index(indices)?;
3386        }
3387
3388        Ok(())
3389    }
3390}
3391
3392impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3393    for DatabaseProvider<TX, N>
3394{
3395    fn take_block_and_execution_above(
3396        &self,
3397        block: BlockNumber,
3398    ) -> ProviderResult<Chain<Self::Primitives>> {
3399        let range = block + 1..=self.last_block_number()?;
3400
3401        self.unwind_trie_state_from(block + 1)?;
3402
3403        // get execution res
3404        let execution_state = self.take_state_above(block)?;
3405
3406        let blocks = self.recovered_block_range(range)?;
3407
3408        // remove block bodies it is needed for both get block range and get block execution results
3409        // that is why it is deleted afterwards.
3410        self.remove_blocks_above(block)?;
3411
3412        // Update pipeline progress
3413        self.update_pipeline_stages(block, true)?;
3414
3415        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3416    }
3417
3418    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3419        self.unwind_trie_state_from(block + 1)?;
3420
3421        // remove execution res
3422        self.remove_state_above(block)?;
3423
3424        // remove block bodies it is needed for both get block range and get block execution results
3425        // that is why it is deleted afterwards.
3426        self.remove_blocks_above(block)?;
3427
3428        // Update pipeline progress
3429        self.update_pipeline_stages(block, true)?;
3430
3431        Ok(())
3432    }
3433}
3434
3435impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3436    for DatabaseProvider<TX, N>
3437{
3438    type Block = BlockTy<N>;
3439    type Receipt = ReceiptTy<N>;
3440
3441    /// Inserts the block into the database, writing to both static files and MDBX.
3442    ///
3443    /// This is a convenience method primarily used in tests. For production use,
3444    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3445    fn insert_block(
3446        &self,
3447        block: &RecoveredBlock<Self::Block>,
3448    ) -> ProviderResult<StoredBlockBodyIndices> {
3449        let block_number = block.number();
3450
3451        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3452        let executed_block = ExecutedBlock::new(
3453            Arc::new(block.clone()),
3454            Arc::new(BlockExecutionOutput {
3455                result: BlockExecutionResult {
3456                    receipts: Default::default(),
3457                    requests: Default::default(),
3458                    gas_used: 0,
3459                    blob_gas_used: 0,
3460                },
3461                state: Default::default(),
3462            }),
3463            ComputedTrieData::default(),
3464        );
3465
3466        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3467        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3468
3469        // Return the body indices
3470        self.block_body_indices(block_number)?
3471            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3472    }
3473
3474    fn append_block_bodies(
3475        &self,
3476        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3477    ) -> ProviderResult<()> {
3478        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3479
3480        // Initialize writer if we will be writing transactions to staticfiles
3481        let mut tx_writer =
3482            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3483
3484        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3485        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3486
3487        // Get id for the next tx_num or zero if there are no transactions.
3488        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3489
3490        for (block_number, body) in &bodies {
3491            // Increment block on static file header.
3492            tx_writer.increment_block(*block_number)?;
3493
3494            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3495            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3496
3497            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3498
3499            // insert block meta
3500            block_indices_cursor.append(*block_number, &block_indices)?;
3501
3502            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3503
3504            let Some(body) = body else { continue };
3505
3506            // write transaction block index
3507            if !body.transactions().is_empty() {
3508                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3509                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3510            }
3511
3512            // write transactions
3513            for transaction in body.transactions() {
3514                tx_writer.append_transaction(next_tx_num, transaction)?;
3515
3516                // Increment transaction id for each transaction.
3517                next_tx_num += 1;
3518            }
3519        }
3520
3521        self.storage.writer().write_block_bodies(self, bodies)?;
3522
3523        Ok(())
3524    }
3525
3526    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3527        let last_block_number = self.last_block_number()?;
3528        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3529        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3530            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3531        }
3532
3533        // Get highest static file block for the total block range
3534        let highest_static_file_block = self
3535            .static_file_provider()
3536            .get_highest_static_file_block(StaticFileSegment::Headers)
3537            .expect("todo: error handling, headers should exist");
3538
3539        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3540        // we remove only what is ABOVE the block.
3541        //
3542        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3543        // will have three blocks to remove, which will be block 8, 7, and 6.
3544        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3545        self.static_file_provider()
3546            .get_writer(block, StaticFileSegment::Headers)?
3547            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3548
3549        // First transaction to be removed
3550        let unwind_tx_from = self
3551            .block_body_indices(block)?
3552            .map(|b| b.next_tx_num())
3553            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3554
3555        // Last transaction to be removed
3556        let unwind_tx_to = self
3557            .tx
3558            .cursor_read::<tables::BlockBodyIndices>()?
3559            .last()?
3560            // shouldn't happen because this was OK above
3561            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3562            .1
3563            .last_tx_num();
3564
3565        if unwind_tx_from <= unwind_tx_to {
3566            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3567            self.with_rocksdb_batch(|batch| {
3568                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3569                for (hash, _) in hashes {
3570                    writer.delete_transaction_hash_number(hash)?;
3571                }
3572                Ok(((), writer.into_raw_rocksdb_batch()))
3573            })?;
3574        }
3575
3576        // Skip sender pruning when sender_recovery is fully pruned, since no sender data
3577        // exists in static files or the database.
3578        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3579            EitherWriter::new_senders(self, last_block_number)?
3580                .prune_senders(unwind_tx_from, block)?;
3581        }
3582
3583        self.remove_bodies_above(block)?;
3584
3585        Ok(())
3586    }
3587
3588    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3589        self.storage.writer().remove_block_bodies_above(self, block)?;
3590
3591        // First transaction to be removed
3592        let unwind_tx_from = self
3593            .block_body_indices(block)?
3594            .map(|b| b.next_tx_num())
3595            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3596
3597        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3598        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3599
3600        let static_file_tx_num =
3601            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3602
3603        let to_delete = static_file_tx_num
3604            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3605            .unwrap_or_default();
3606
3607        self.static_file_provider
3608            .latest_writer(StaticFileSegment::Transactions)?
3609            .prune_transactions(to_delete, block)?;
3610
3611        Ok(())
3612    }
3613
3614    /// Appends blocks with their execution state to the database.
3615    ///
3616    /// **Note:** This function is only used in tests.
3617    ///
3618    /// History indices are written to the appropriate backend based on storage settings:
3619    /// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
3620    ///
3621    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3622    fn append_blocks_with_state(
3623        &self,
3624        blocks: Vec<RecoveredBlock<Self::Block>>,
3625        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3626        hashed_state: HashedPostStateSorted,
3627    ) -> ProviderResult<()> {
3628        if blocks.is_empty() {
3629            debug!(target: "providers::db", "Attempted to append empty block range");
3630            return Ok(())
3631        }
3632
3633        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3634        // `None`.
3635        let first_number = blocks[0].number();
3636
3637        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3638        // `None`.
3639        let last_block_number = blocks[blocks.len() - 1].number();
3640
3641        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3642
3643        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3644        // This is necessary because with edge storage, changesets are written to static files
3645        // whose index isn't updated until commit, making them invisible to subsequent reads
3646        // within the same transaction.
3647        let (account_transitions, storage_transitions) = {
3648            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3649            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3650            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3651                let block_number = first_number + block_idx as u64;
3652                for (address, account_revert) in block_reverts {
3653                    account_transitions.entry(*address).or_default().push(block_number);
3654                    for storage_key in account_revert.storage.keys() {
3655                        let key = B256::from(storage_key.to_be_bytes());
3656                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3657                    }
3658                }
3659            }
3660            (account_transitions, storage_transitions)
3661        };
3662
3663        // Insert the blocks
3664        for block in blocks {
3665            self.insert_block(&block)?;
3666            durations_recorder.record_relative(metrics::Action::InsertBlock);
3667        }
3668
3669        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3670        durations_recorder.record_relative(metrics::Action::InsertState);
3671
3672        // insert hashes and intermediate merkle nodes
3673        self.write_hashed_state(&hashed_state)?;
3674        durations_recorder.record_relative(metrics::Action::InsertHashes);
3675
3676        // Use pre-computed transitions for history indices since static file
3677        // writes aren't visible until commit.
3678        // Note: For MDBX we use insert_*_history_index. For RocksDB we use
3679        // append_*_history_shard which handles read-merge-write internally.
3680        let storage_settings = self.cached_storage_settings();
3681        if storage_settings.storage_v2 {
3682            self.with_rocksdb_batch(|mut batch| {
3683                for (address, blocks) in account_transitions {
3684                    batch.append_account_history_shard(address, blocks)?;
3685                }
3686                Ok(((), Some(batch.into_inner())))
3687            })?;
3688        } else {
3689            self.insert_account_history_index(account_transitions)?;
3690        }
3691        if storage_settings.storage_v2 {
3692            self.with_rocksdb_batch(|mut batch| {
3693                for ((address, key), blocks) in storage_transitions {
3694                    batch.append_storage_history_shard(address, key, blocks)?;
3695                }
3696                Ok(((), Some(batch.into_inner())))
3697            })?;
3698        } else {
3699            self.insert_storage_history_index(storage_transitions)?;
3700        }
3701        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3702
3703        // Update pipeline progress
3704        self.update_pipeline_stages(last_block_number, false)?;
3705        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3706
3707        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3708
3709        Ok(())
3710    }
3711}
3712
3713impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3714    fn get_prune_checkpoint(
3715        &self,
3716        segment: PruneSegment,
3717    ) -> ProviderResult<Option<PruneCheckpoint>> {
3718        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3719    }
3720
3721    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3722        Ok(PruneSegment::variants()
3723            .filter_map(|segment| {
3724                self.tx
3725                    .get::<tables::PruneCheckpoints>(segment)
3726                    .transpose()
3727                    .map(|chk| chk.map(|chk| (segment, chk)))
3728            })
3729            .collect::<Result<_, _>>()?)
3730    }
3731}
3732
3733impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3734    fn save_prune_checkpoint(
3735        &self,
3736        segment: PruneSegment,
3737        checkpoint: PruneCheckpoint,
3738    ) -> ProviderResult<()> {
3739        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3740    }
3741}
3742
3743impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3744    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3745        let db_entries = self.tx.entries::<T>()?;
3746        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3747            Ok(entries) => entries,
3748            Err(ProviderError::UnsupportedProvider) => 0,
3749            Err(err) => return Err(err),
3750        };
3751
3752        Ok(db_entries + static_file_entries)
3753    }
3754}
3755
3756impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3757    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3758        let mut finalized_blocks = self
3759            .tx
3760            .cursor_read::<tables::ChainState>()?
3761            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3762            .take(1)
3763            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3764
3765        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3766        Ok(last_finalized_block_number)
3767    }
3768
3769    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3770        let mut finalized_blocks = self
3771            .tx
3772            .cursor_read::<tables::ChainState>()?
3773            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3774            .take(1)
3775            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3776
3777        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3778        Ok(last_finalized_block_number)
3779    }
3780}
3781
3782impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3783    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3784        Ok(self
3785            .tx
3786            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3787    }
3788
3789    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3790        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3791    }
3792}
3793
3794impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3795    type Tx = TX;
3796
3797    fn tx_ref(&self) -> &Self::Tx {
3798        &self.tx
3799    }
3800
3801    fn tx_mut(&mut self) -> &mut Self::Tx {
3802        &mut self.tx
3803    }
3804
3805    fn into_tx(self) -> Self::Tx {
3806        self.tx
3807    }
3808
3809    fn prune_modes_ref(&self) -> &PruneModes {
3810        self.prune_modes_ref()
3811    }
3812
3813    /// Commit database transaction, static files, and pending `RocksDB` batches.
3814    #[instrument(
3815        name = "DatabaseProvider::commit",
3816        level = "debug",
3817        target = "providers::db",
3818        skip_all
3819    )]
3820    fn commit(self) -> ProviderResult<()> {
3821        // For unwinding it makes more sense to commit the database first, since if
3822        // it is interrupted before the static files commit, we can just
3823        // truncate the static files according to the
3824        // checkpoints on the next start-up.
3825        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3826            self.tx.commit()?;
3827
3828            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3829            for batch in batches {
3830                self.rocksdb_provider.commit_batch(batch)?;
3831            }
3832
3833            self.static_file_provider.commit()?;
3834        } else {
3835            // Normal path: finalize() will call sync_all() if not already synced
3836            let mut timings = metrics::CommitTimings::default();
3837
3838            let start = Instant::now();
3839            self.static_file_provider.finalize()?;
3840            timings.sf = start.elapsed();
3841
3842            let start = Instant::now();
3843            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3844            for batch in batches {
3845                self.rocksdb_provider.commit_batch(batch)?;
3846            }
3847            timings.rocksdb = start.elapsed();
3848
3849            let start = Instant::now();
3850            self.tx.commit()?;
3851            timings.mdbx = start.elapsed();
3852
3853            self.metrics.record_commit(&timings);
3854        }
3855
3856        Ok(())
3857    }
3858}
3859
3860impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3861    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3862        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3863    }
3864}
3865
3866impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3867    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3868        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3869    }
3870}
3871
3872impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3873    fn cached_storage_settings(&self) -> StorageSettings {
3874        *self.storage_settings.read()
3875    }
3876
3877    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3878        *self.storage_settings.write() = settings;
3879    }
3880}
3881
3882impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3883    fn storage_path(&self) -> PathBuf {
3884        self.db_path.clone()
3885    }
3886}
3887
3888#[cfg(test)]
3889mod tests {
3890    use super::*;
3891    use crate::{
3892        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3893        BlockWriter,
3894    };
3895    use alloy_consensus::Header;
3896    use alloy_primitives::{
3897        map::{AddressMap, B256Map},
3898        U256,
3899    };
3900    use reth_chain_state::ExecutedBlock;
3901    use reth_ethereum_primitives::Receipt;
3902    use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3903    use reth_primitives_traits::SealedBlock;
3904    use reth_testing_utils::generators::{self, random_block, BlockParams};
3905    use reth_trie::{
3906        HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3907    };
3908    use revm_database::BundleState;
3909    use revm_state::AccountInfo;
3910
3911    #[test]
3912    fn test_receipts_by_block_range_empty_range() {
3913        let factory = create_test_provider_factory();
3914        let provider = factory.provider().unwrap();
3915
3916        // empty range should return empty vec
3917        let start = 10u64;
3918        let end = 9u64;
3919        let result = provider.receipts_by_block_range(start..=end).unwrap();
3920        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3921    }
3922
3923    #[test]
3924    fn test_receipts_by_block_range_nonexistent_blocks() {
3925        let factory = create_test_provider_factory();
3926        let provider = factory.provider().unwrap();
3927
3928        // non-existent blocks should return empty vecs for each block
3929        let result = provider.receipts_by_block_range(10..=12).unwrap();
3930        assert_eq!(result, vec![vec![], vec![], vec![]]);
3931    }
3932
3933    #[test]
3934    fn test_receipts_by_block_range_single_block() {
3935        let factory = create_test_provider_factory();
3936        let data = BlockchainTestData::default();
3937
3938        let provider_rw = factory.provider_rw().unwrap();
3939        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3940        provider_rw
3941            .write_state(
3942                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3943                crate::OriginalValuesKnown::No,
3944                StateWriteConfig::default(),
3945            )
3946            .unwrap();
3947        provider_rw.insert_block(&data.blocks[0].0).unwrap();
3948        provider_rw
3949            .write_state(
3950                &data.blocks[0].1,
3951                crate::OriginalValuesKnown::No,
3952                StateWriteConfig::default(),
3953            )
3954            .unwrap();
3955        provider_rw.commit().unwrap();
3956
3957        let provider = factory.provider().unwrap();
3958        let result = provider.receipts_by_block_range(1..=1).unwrap();
3959
3960        // should have one vec with one receipt
3961        assert_eq!(result.len(), 1);
3962        assert_eq!(result[0].len(), 1);
3963        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3964    }
3965
3966    #[test]
3967    fn test_receipts_by_block_range_multiple_blocks() {
3968        let factory = create_test_provider_factory();
3969        let data = BlockchainTestData::default();
3970
3971        let provider_rw = factory.provider_rw().unwrap();
3972        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3973        provider_rw
3974            .write_state(
3975                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3976                crate::OriginalValuesKnown::No,
3977                StateWriteConfig::default(),
3978            )
3979            .unwrap();
3980        for i in 0..3 {
3981            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3982            provider_rw
3983                .write_state(
3984                    &data.blocks[i].1,
3985                    crate::OriginalValuesKnown::No,
3986                    StateWriteConfig::default(),
3987                )
3988                .unwrap();
3989        }
3990        provider_rw.commit().unwrap();
3991
3992        let provider = factory.provider().unwrap();
3993        let result = provider.receipts_by_block_range(1..=3).unwrap();
3994
3995        // should have 3 vecs, each with one receipt
3996        assert_eq!(result.len(), 3);
3997        for (i, block_receipts) in result.iter().enumerate() {
3998            assert_eq!(block_receipts.len(), 1);
3999            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4000        }
4001    }
4002
4003    #[test]
4004    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4005        let factory = create_test_provider_factory();
4006        let data = BlockchainTestData::default();
4007
4008        let provider_rw = factory.provider_rw().unwrap();
4009        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4010        provider_rw
4011            .write_state(
4012                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4013                crate::OriginalValuesKnown::No,
4014                StateWriteConfig::default(),
4015            )
4016            .unwrap();
4017
4018        // insert blocks 1-3 with receipts
4019        for i in 0..3 {
4020            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4021            provider_rw
4022                .write_state(
4023                    &data.blocks[i].1,
4024                    crate::OriginalValuesKnown::No,
4025                    StateWriteConfig::default(),
4026                )
4027                .unwrap();
4028        }
4029        provider_rw.commit().unwrap();
4030
4031        let provider = factory.provider().unwrap();
4032        let result = provider.receipts_by_block_range(1..=3).unwrap();
4033
4034        // verify each block has one receipt
4035        assert_eq!(result.len(), 3);
4036        for block_receipts in &result {
4037            assert_eq!(block_receipts.len(), 1);
4038        }
4039    }
4040
4041    #[test]
4042    fn test_receipts_by_block_range_partial_range() {
4043        let factory = create_test_provider_factory();
4044        let data = BlockchainTestData::default();
4045
4046        let provider_rw = factory.provider_rw().unwrap();
4047        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4048        provider_rw
4049            .write_state(
4050                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4051                crate::OriginalValuesKnown::No,
4052                StateWriteConfig::default(),
4053            )
4054            .unwrap();
4055        for i in 0..3 {
4056            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4057            provider_rw
4058                .write_state(
4059                    &data.blocks[i].1,
4060                    crate::OriginalValuesKnown::No,
4061                    StateWriteConfig::default(),
4062                )
4063                .unwrap();
4064        }
4065        provider_rw.commit().unwrap();
4066
4067        let provider = factory.provider().unwrap();
4068
4069        // request range that includes both existing and non-existing blocks
4070        let result = provider.receipts_by_block_range(2..=5).unwrap();
4071        assert_eq!(result.len(), 4);
4072
4073        // blocks 2-3 should have receipts, blocks 4-5 should be empty
4074        assert_eq!(result[0].len(), 1); // block 2
4075        assert_eq!(result[1].len(), 1); // block 3
4076        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
4077        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
4078
4079        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4080        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4081    }
4082
4083    #[test]
4084    fn test_receipts_by_block_range_all_empty_blocks() {
4085        let factory = create_test_provider_factory();
4086        let mut rng = generators::rng();
4087
4088        // create blocks with no transactions
4089        let mut blocks = Vec::new();
4090        for i in 0..3 {
4091            let block =
4092                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4093            blocks.push(block);
4094        }
4095
4096        let provider_rw = factory.provider_rw().unwrap();
4097        for block in blocks {
4098            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4099        }
4100        provider_rw.commit().unwrap();
4101
4102        let provider = factory.provider().unwrap();
4103        let result = provider.receipts_by_block_range(1..=3).unwrap();
4104
4105        assert_eq!(result.len(), 3);
4106        for block_receipts in result {
4107            assert_eq!(block_receipts.len(), 0);
4108        }
4109    }
4110
4111    #[test]
4112    fn test_receipts_by_block_range_consistency_with_individual_calls() {
4113        let factory = create_test_provider_factory();
4114        let data = BlockchainTestData::default();
4115
4116        let provider_rw = factory.provider_rw().unwrap();
4117        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4118        provider_rw
4119            .write_state(
4120                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4121                crate::OriginalValuesKnown::No,
4122                StateWriteConfig::default(),
4123            )
4124            .unwrap();
4125        for i in 0..3 {
4126            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4127            provider_rw
4128                .write_state(
4129                    &data.blocks[i].1,
4130                    crate::OriginalValuesKnown::No,
4131                    StateWriteConfig::default(),
4132                )
4133                .unwrap();
4134        }
4135        provider_rw.commit().unwrap();
4136
4137        let provider = factory.provider().unwrap();
4138
4139        // get receipts using block range method
4140        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4141
4142        // get receipts using individual block calls
4143        let mut individual_results = Vec::new();
4144        for block_num in 1..=3 {
4145            let receipts =
4146                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4147            individual_results.push(receipts);
4148        }
4149
4150        assert_eq!(range_result, individual_results);
4151    }
4152
4153    #[test]
4154    fn test_write_trie_updates_sorted() {
4155        use reth_trie::{
4156            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4157            BranchNodeCompact, StorageTrieEntry,
4158        };
4159
4160        let factory = create_test_provider_factory();
4161        let provider_rw = factory.provider_rw().unwrap();
4162
4163        // Pre-populate account trie with data that will be deleted
4164        {
4165            let tx = provider_rw.tx_ref();
4166            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4167
4168            // Add account node that will be deleted
4169            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4170            cursor
4171                .upsert(
4172                    to_delete,
4173                    &BranchNodeCompact::new(
4174                        0b1010_1010_1010_1010, // state_mask
4175                        0b0000_0000_0000_0000, // tree_mask
4176                        0b0000_0000_0000_0000, // hash_mask
4177                        vec![],
4178                        None,
4179                    ),
4180                )
4181                .unwrap();
4182
4183            // Add account node that will be updated
4184            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4185            cursor
4186                .upsert(
4187                    to_update,
4188                    &BranchNodeCompact::new(
4189                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4190                        0b0000_0000_0000_0000, // tree_mask
4191                        0b0000_0000_0000_0000, // hash_mask
4192                        vec![],
4193                        None,
4194                    ),
4195                )
4196                .unwrap();
4197        }
4198
4199        // Pre-populate storage tries with data
4200        let storage_address1 = B256::from([1u8; 32]);
4201        let storage_address2 = B256::from([2u8; 32]);
4202        {
4203            let tx = provider_rw.tx_ref();
4204            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4205
4206            // Add storage nodes for address1 (one will be deleted)
4207            storage_cursor
4208                .upsert(
4209                    storage_address1,
4210                    &StorageTrieEntry {
4211                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4212                        node: BranchNodeCompact::new(
4213                            0b0011_0011_0011_0011, // will be deleted
4214                            0b0000_0000_0000_0000,
4215                            0b0000_0000_0000_0000,
4216                            vec![],
4217                            None,
4218                        ),
4219                    },
4220                )
4221                .unwrap();
4222
4223            // Add storage nodes for address2 (will be wiped)
4224            storage_cursor
4225                .upsert(
4226                    storage_address2,
4227                    &StorageTrieEntry {
4228                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4229                        node: BranchNodeCompact::new(
4230                            0b1100_1100_1100_1100, // will be wiped
4231                            0b0000_0000_0000_0000,
4232                            0b0000_0000_0000_0000,
4233                            vec![],
4234                            None,
4235                        ),
4236                    },
4237                )
4238                .unwrap();
4239            storage_cursor
4240                .upsert(
4241                    storage_address2,
4242                    &StorageTrieEntry {
4243                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4244                        node: BranchNodeCompact::new(
4245                            0b0011_1100_0011_1100, // will be wiped
4246                            0b0000_0000_0000_0000,
4247                            0b0000_0000_0000_0000,
4248                            vec![],
4249                            None,
4250                        ),
4251                    },
4252                )
4253                .unwrap();
4254        }
4255
4256        // Create sorted account trie updates
4257        let account_nodes = vec![
4258            (
4259                Nibbles::from_nibbles([0x1, 0x2]),
4260                Some(BranchNodeCompact::new(
4261                    0b1111_1111_1111_1111, // state_mask (updated)
4262                    0b0000_0000_0000_0000, // tree_mask
4263                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4264                    vec![],
4265                    None,
4266                )),
4267            ),
4268            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4269            (
4270                Nibbles::from_nibbles([0x5, 0x6]),
4271                Some(BranchNodeCompact::new(
4272                    0b1111_1111_1111_1111, // state_mask
4273                    0b0000_0000_0000_0000, // tree_mask
4274                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4275                    vec![],
4276                    None,
4277                )),
4278            ),
4279        ];
4280
4281        // Create sorted storage trie updates
4282        let storage_trie1 = StorageTrieUpdatesSorted {
4283            is_deleted: false,
4284            storage_nodes: vec![
4285                (
4286                    Nibbles::from_nibbles([0x1, 0x0]),
4287                    Some(BranchNodeCompact::new(
4288                        0b1111_0000_0000_0000, // state_mask
4289                        0b0000_0000_0000_0000, // tree_mask
4290                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4291                        vec![],
4292                        None,
4293                    )),
4294                ),
4295                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4296            ],
4297        };
4298
4299        let storage_trie2 = StorageTrieUpdatesSorted {
4300            is_deleted: true, // Wipe all storage for this address
4301            storage_nodes: vec![],
4302        };
4303
4304        let mut storage_tries = B256Map::default();
4305        storage_tries.insert(storage_address1, storage_trie1);
4306        storage_tries.insert(storage_address2, storage_trie2);
4307
4308        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4309
4310        // Write the sorted trie updates
4311        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4312
4313        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4314        // storage deletion = 5
4315        assert_eq!(num_entries, 5);
4316
4317        // Verify account trie updates were written correctly
4318        let tx = provider_rw.tx_ref();
4319        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4320
4321        // Check first account node was updated
4322        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4323        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4324        assert!(entry1.is_some(), "Updated account node should exist");
4325        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4326        assert_eq!(
4327            entry1.unwrap().1.state_mask,
4328            expected_mask,
4329            "Account node should have updated state_mask"
4330        );
4331
4332        // Check deleted account node no longer exists
4333        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4334        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4335        assert!(entry2.is_none(), "Deleted account node should not exist");
4336
4337        // Check new account node exists
4338        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4339        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4340        assert!(entry3.is_some(), "New account node should exist");
4341
4342        // Verify storage trie updates were written correctly
4343        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4344
4345        // Check storage for address1
4346        let storage_entries1: Vec<_> = storage_cursor
4347            .walk_dup(Some(storage_address1), None)
4348            .unwrap()
4349            .collect::<Result<Vec<_>, _>>()
4350            .unwrap();
4351        assert_eq!(
4352            storage_entries1.len(),
4353            1,
4354            "Storage address1 should have 1 entry after deletion"
4355        );
4356        assert_eq!(
4357            storage_entries1[0].1.nibbles.0,
4358            Nibbles::from_nibbles([0x1, 0x0]),
4359            "Remaining entry should be [0x1, 0x0]"
4360        );
4361
4362        // Check storage for address2 was wiped
4363        let storage_entries2: Vec<_> = storage_cursor
4364            .walk_dup(Some(storage_address2), None)
4365            .unwrap()
4366            .collect::<Result<Vec<_>, _>>()
4367            .unwrap();
4368        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4369
4370        provider_rw.commit().unwrap();
4371    }
4372
4373    #[test]
4374    fn test_prunable_receipts_logic() {
4375        let insert_blocks =
4376            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4377                let mut rng = generators::rng();
4378                for block_num in 0..=tip_block {
4379                    let block = random_block(
4380                        &mut rng,
4381                        block_num,
4382                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4383                    );
4384                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4385                }
4386            };
4387
4388        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4389            let outcome = ExecutionOutcome {
4390                first_block: block,
4391                receipts: vec![vec![Receipt {
4392                    tx_type: Default::default(),
4393                    success: true,
4394                    cumulative_gas_used: block, // identifier to assert against
4395                    logs: vec![],
4396                }]],
4397                ..Default::default()
4398            };
4399            provider_rw
4400                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4401                .unwrap();
4402            provider_rw.commit().unwrap();
4403        };
4404
4405        // Legacy mode (receipts in DB) - should be prunable
4406        {
4407            let factory = create_test_provider_factory();
4408            let storage_settings = StorageSettings::v1();
4409            factory.set_storage_settings_cache(storage_settings);
4410            let factory = factory.with_prune_modes(PruneModes {
4411                receipts: Some(PruneMode::Before(100)),
4412                ..Default::default()
4413            });
4414
4415            let tip_block = 200u64;
4416            let first_block = 1u64;
4417
4418            // create chain
4419            let provider_rw = factory.provider_rw().unwrap();
4420            insert_blocks(&provider_rw, tip_block, 1);
4421            provider_rw.commit().unwrap();
4422
4423            write_receipts(
4424                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4425                first_block,
4426            );
4427            write_receipts(
4428                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4429                tip_block - 1,
4430            );
4431
4432            let provider = factory.provider().unwrap();
4433
4434            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4435                assert!(provider
4436                    .receipts_by_block(block.into())
4437                    .unwrap()
4438                    .is_some_and(|r| r.len() == num_receipts));
4439            }
4440        }
4441
4442        // Static files mode
4443        {
4444            let factory = create_test_provider_factory();
4445            let storage_settings = StorageSettings::v2();
4446            factory.set_storage_settings_cache(storage_settings);
4447            let factory = factory.with_prune_modes(PruneModes {
4448                receipts: Some(PruneMode::Before(2)),
4449                ..Default::default()
4450            });
4451
4452            let tip_block = 200u64;
4453
4454            // create chain
4455            let provider_rw = factory.provider_rw().unwrap();
4456            insert_blocks(&provider_rw, tip_block, 1);
4457            provider_rw.commit().unwrap();
4458
4459            // Attempt to write receipts for block 0 and 1 (should be skipped)
4460            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4461            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4462
4463            assert!(factory
4464                .static_file_provider()
4465                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4466                .is_none(),);
4467            assert!(factory
4468                .static_file_provider()
4469                .get_highest_static_file_block(StaticFileSegment::Receipts)
4470                .is_some_and(|b| b == 1),);
4471
4472            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4473            // static files.
4474            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4475            assert!(factory
4476                .static_file_provider()
4477                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4478                .is_some_and(|num| num == 2),);
4479
4480            // After having a receipt already in static files, attempt to skip the next receipt by
4481            // changing the prune mode. It should NOT skip it and should still write the receipt,
4482            // since static files do not support gaps.
4483            let factory = factory.with_prune_modes(PruneModes {
4484                receipts: Some(PruneMode::Before(100)),
4485                ..Default::default()
4486            });
4487            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4488            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4489            write_receipts(provider_rw, 3);
4490
4491            // Ensure we can only fetch the 2 last receipts.
4492            //
4493            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4494            // to the block number it belongs to easily identify and assert.
4495            let provider = factory.provider().unwrap();
4496            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4497            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4498                assert!(provider
4499                    .receipts_by_block(num.into())
4500                    .unwrap()
4501                    .is_some_and(|r| r.len() == num_receipts));
4502
4503                let receipt = provider.receipt(num).unwrap();
4504                if num_receipts > 0 {
4505                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4506                } else {
4507                    assert!(receipt.is_none());
4508                }
4509            }
4510        }
4511    }
4512
4513    #[test]
4514    fn test_try_into_history_rejects_unexecuted_blocks() {
4515        use reth_storage_api::TryIntoHistoricalStateProvider;
4516
4517        let factory = create_test_provider_factory();
4518
4519        // Insert genesis block to have some data
4520        let data = BlockchainTestData::default();
4521        let provider_rw = factory.provider_rw().unwrap();
4522        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4523        provider_rw
4524            .write_state(
4525                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4526                crate::OriginalValuesKnown::No,
4527                StateWriteConfig::default(),
4528            )
4529            .unwrap();
4530        provider_rw.commit().unwrap();
4531
4532        // Get a fresh provider - Execution checkpoint is 0, no receipts written beyond genesis
4533        let provider = factory.provider().unwrap();
4534
4535        // Requesting historical state for block 0 (executed) should succeed
4536        let result = provider.try_into_history_at_block(0);
4537        assert!(result.is_ok(), "Block 0 should be available");
4538
4539        // Get another provider and request state for block 100 (not executed)
4540        let provider = factory.provider().unwrap();
4541        let result = provider.try_into_history_at_block(100);
4542
4543        // Should fail with BlockNotExecuted error
4544        match result {
4545            Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4546            Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4547            Ok(_) => panic!("Expected error, got Ok"),
4548        }
4549    }
4550
4551    #[test]
4552    fn test_unwind_storage_hashing_with_hashed_state() {
4553        let factory = create_test_provider_factory();
4554        let storage_settings = StorageSettings::v2();
4555        factory.set_storage_settings_cache(storage_settings);
4556
4557        let address = Address::random();
4558        let hashed_address = keccak256(address);
4559
4560        let plain_slot = B256::random();
4561        let hashed_slot = keccak256(plain_slot);
4562
4563        let current_value = U256::from(100);
4564        let old_value = U256::from(42);
4565
4566        let provider_rw = factory.provider_rw().unwrap();
4567        provider_rw
4568            .tx
4569            .cursor_dup_write::<tables::HashedStorages>()
4570            .unwrap()
4571            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4572            .unwrap();
4573
4574        let changesets = vec![(
4575            BlockNumberAddress((1, address)),
4576            StorageEntry { key: plain_slot, value: old_value },
4577        )];
4578
4579        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4580
4581        assert_eq!(result.len(), 1);
4582        assert!(result.contains_key(&hashed_address));
4583        assert!(result[&hashed_address].contains(&hashed_slot));
4584
4585        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4586        let entry = cursor
4587            .seek_by_key_subkey(hashed_address, hashed_slot)
4588            .unwrap()
4589            .expect("entry should exist");
4590        assert_eq!(entry.key, hashed_slot);
4591        assert_eq!(entry.value, old_value);
4592    }
4593
4594    #[test]
4595    fn test_write_and_remove_state_roundtrip_legacy() {
4596        let factory = create_test_provider_factory();
4597        let storage_settings = StorageSettings::v1();
4598        assert!(!storage_settings.use_hashed_state());
4599        factory.set_storage_settings_cache(storage_settings);
4600
4601        let address = Address::with_last_byte(1);
4602        let hashed_address = keccak256(address);
4603        let slot = U256::from(5);
4604        let slot_key = B256::from(slot);
4605        let hashed_slot = keccak256(slot_key);
4606
4607        let mut rng = generators::rng();
4608        let block0 =
4609            random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4610        let block1 =
4611            random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4612
4613        {
4614            let provider_rw = factory.provider_rw().unwrap();
4615            provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4616            provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4617            provider_rw
4618                .tx
4619                .cursor_write::<tables::PlainAccountState>()
4620                .unwrap()
4621                .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4622                .unwrap();
4623            provider_rw.commit().unwrap();
4624        }
4625
4626        let provider_rw = factory.provider_rw().unwrap();
4627
4628        let mut state_init: BundleStateInit = AddressMap::default();
4629        let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4630        storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4631        state_init.insert(
4632            address,
4633            (
4634                Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4635                Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4636                storage_map,
4637            ),
4638        );
4639
4640        let mut reverts_init: RevertsInit = HashMap::default();
4641        let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4642        block_reverts.insert(
4643            address,
4644            (
4645                Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4646                vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4647            ),
4648        );
4649        reverts_init.insert(1, block_reverts);
4650
4651        let execution_outcome =
4652            ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4653
4654        provider_rw
4655            .write_state(
4656                &execution_outcome,
4657                OriginalValuesKnown::Yes,
4658                StateWriteConfig {
4659                    write_receipts: false,
4660                    write_account_changesets: true,
4661                    write_storage_changesets: true,
4662                },
4663            )
4664            .unwrap();
4665
4666        let hashed_state =
4667            execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4668        provider_rw.write_hashed_state(&hashed_state).unwrap();
4669
4670        let account = provider_rw
4671            .tx
4672            .cursor_read::<tables::PlainAccountState>()
4673            .unwrap()
4674            .seek_exact(address)
4675            .unwrap()
4676            .unwrap()
4677            .1;
4678        assert_eq!(account.nonce, 1);
4679
4680        let storage_entry = provider_rw
4681            .tx
4682            .cursor_dup_read::<tables::PlainStorageState>()
4683            .unwrap()
4684            .seek_by_key_subkey(address, slot_key)
4685            .unwrap()
4686            .unwrap();
4687        assert_eq!(storage_entry.key, slot_key);
4688        assert_eq!(storage_entry.value, U256::from(10));
4689
4690        let hashed_entry = provider_rw
4691            .tx
4692            .cursor_dup_read::<tables::HashedStorages>()
4693            .unwrap()
4694            .seek_by_key_subkey(hashed_address, hashed_slot)
4695            .unwrap()
4696            .unwrap();
4697        assert_eq!(hashed_entry.key, hashed_slot);
4698        assert_eq!(hashed_entry.value, U256::from(10));
4699
4700        let account_cs_entries = provider_rw
4701            .tx
4702            .cursor_dup_read::<tables::AccountChangeSets>()
4703            .unwrap()
4704            .walk(Some(1))
4705            .unwrap()
4706            .collect::<Result<Vec<_>, _>>()
4707            .unwrap();
4708        assert!(!account_cs_entries.is_empty());
4709
4710        let storage_cs_entries = provider_rw
4711            .tx
4712            .cursor_read::<tables::StorageChangeSets>()
4713            .unwrap()
4714            .walk(Some(BlockNumberAddress((1, address))))
4715            .unwrap()
4716            .collect::<Result<Vec<_>, _>>()
4717            .unwrap();
4718        assert!(!storage_cs_entries.is_empty());
4719        assert_eq!(storage_cs_entries[0].1.key, slot_key);
4720
4721        provider_rw.remove_state_above(0).unwrap();
4722
4723        let restored_account = provider_rw
4724            .tx
4725            .cursor_read::<tables::PlainAccountState>()
4726            .unwrap()
4727            .seek_exact(address)
4728            .unwrap()
4729            .unwrap()
4730            .1;
4731        assert_eq!(restored_account.nonce, 0);
4732
4733        let storage_gone = provider_rw
4734            .tx
4735            .cursor_dup_read::<tables::PlainStorageState>()
4736            .unwrap()
4737            .seek_by_key_subkey(address, slot_key)
4738            .unwrap();
4739        assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4740
4741        let account_cs_after = provider_rw
4742            .tx
4743            .cursor_dup_read::<tables::AccountChangeSets>()
4744            .unwrap()
4745            .walk(Some(1))
4746            .unwrap()
4747            .collect::<Result<Vec<_>, _>>()
4748            .unwrap();
4749        assert!(account_cs_after.is_empty());
4750
4751        let storage_cs_after = provider_rw
4752            .tx
4753            .cursor_read::<tables::StorageChangeSets>()
4754            .unwrap()
4755            .walk(Some(BlockNumberAddress((1, address))))
4756            .unwrap()
4757            .collect::<Result<Vec<_>, _>>()
4758            .unwrap();
4759        assert!(storage_cs_after.is_empty());
4760    }
4761
4762    #[test]
4763    fn test_unwind_storage_hashing_legacy() {
4764        let factory = create_test_provider_factory();
4765        let storage_settings = StorageSettings::v1();
4766        assert!(!storage_settings.use_hashed_state());
4767        factory.set_storage_settings_cache(storage_settings);
4768
4769        let address = Address::random();
4770        let hashed_address = keccak256(address);
4771
4772        let plain_slot = B256::random();
4773        let hashed_slot = keccak256(plain_slot);
4774
4775        let current_value = U256::from(100);
4776        let old_value = U256::from(42);
4777
4778        let provider_rw = factory.provider_rw().unwrap();
4779        provider_rw
4780            .tx
4781            .cursor_dup_write::<tables::HashedStorages>()
4782            .unwrap()
4783            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4784            .unwrap();
4785
4786        let changesets = vec![(
4787            BlockNumberAddress((1, address)),
4788            StorageEntry { key: plain_slot, value: old_value },
4789        )];
4790
4791        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4792
4793        assert_eq!(result.len(), 1);
4794        assert!(result.contains_key(&hashed_address));
4795        assert!(result[&hashed_address].contains(&hashed_slot));
4796
4797        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4798        let entry = cursor
4799            .seek_by_key_subkey(hashed_address, hashed_slot)
4800            .unwrap()
4801            .expect("entry should exist");
4802        assert_eq!(entry.key, hashed_slot);
4803        assert_eq!(entry.value, old_value);
4804    }
4805
4806    #[test]
4807    fn test_write_state_and_historical_read_hashed() {
4808        use reth_storage_api::StateProvider;
4809        use reth_trie::{HashedPostState, KeccakKeyHasher};
4810        use revm_database::BundleState;
4811        use revm_state::AccountInfo;
4812
4813        let factory = create_test_provider_factory();
4814        factory.set_storage_settings_cache(StorageSettings::v2());
4815
4816        let address = Address::with_last_byte(1);
4817        let slot = U256::from(5);
4818        let slot_key = B256::from(slot);
4819        let hashed_address = keccak256(address);
4820        let hashed_slot = keccak256(slot_key);
4821
4822        {
4823            let sf = factory.static_file_provider();
4824            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4825            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4826            hw.append_header(&h0, &B256::ZERO).unwrap();
4827            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4828            hw.append_header(&h1, &B256::ZERO).unwrap();
4829            hw.commit().unwrap();
4830
4831            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4832            aw.append_account_changeset(vec![], 0).unwrap();
4833            aw.commit().unwrap();
4834
4835            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4836            sw.append_storage_changeset(vec![], 0).unwrap();
4837            sw.commit().unwrap();
4838        }
4839
4840        let provider_rw = factory.provider_rw().unwrap();
4841
4842        let bundle = BundleState::builder(1..=1)
4843            .state_present_account_info(
4844                address,
4845                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4846            )
4847            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4848            .revert_account_info(1, address, Some(None))
4849            .revert_storage(1, address, vec![(slot, U256::ZERO)])
4850            .build();
4851
4852        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4853
4854        provider_rw
4855            .tx
4856            .put::<tables::BlockBodyIndices>(
4857                1,
4858                StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4859            )
4860            .unwrap();
4861
4862        provider_rw
4863            .write_state(
4864                &execution_outcome,
4865                OriginalValuesKnown::Yes,
4866                StateWriteConfig {
4867                    write_receipts: false,
4868                    write_account_changesets: true,
4869                    write_storage_changesets: true,
4870                },
4871            )
4872            .unwrap();
4873
4874        let hashed_state =
4875            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4876        provider_rw.write_hashed_state(&hashed_state).unwrap();
4877
4878        let plain_storage_entries = provider_rw
4879            .tx
4880            .cursor_dup_read::<tables::PlainStorageState>()
4881            .unwrap()
4882            .walk(None)
4883            .unwrap()
4884            .collect::<Result<Vec<_>, _>>()
4885            .unwrap();
4886        assert!(plain_storage_entries.is_empty());
4887
4888        let hashed_entry = provider_rw
4889            .tx
4890            .cursor_dup_read::<tables::HashedStorages>()
4891            .unwrap()
4892            .seek_by_key_subkey(hashed_address, hashed_slot)
4893            .unwrap()
4894            .unwrap();
4895        assert_eq!(hashed_entry.key, hashed_slot);
4896        assert_eq!(hashed_entry.value, U256::from(10));
4897
4898        provider_rw.static_file_provider().commit().unwrap();
4899
4900        let sf = factory.static_file_provider();
4901        let storage_cs = sf.storage_changeset(1).unwrap();
4902        assert!(!storage_cs.is_empty());
4903        assert_eq!(storage_cs[0].1.key, slot_key);
4904
4905        let account_cs = sf.account_block_changeset(1).unwrap();
4906        assert!(!account_cs.is_empty());
4907        assert_eq!(account_cs[0].address, address);
4908
4909        let historical_value =
4910            HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4911        assert_eq!(historical_value, None);
4912    }
4913
4914    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4915    enum StorageMode {
4916        V1,
4917        V2,
4918    }
4919
4920    fn run_save_blocks_and_verify(mode: StorageMode) {
4921        use alloy_primitives::map::{FbBuildHasher, HashMap};
4922
4923        let factory = create_test_provider_factory();
4924
4925        match mode {
4926            StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4927            StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4928        }
4929
4930        let num_blocks = 3u64;
4931        let accounts_per_block = 5usize;
4932        let slots_per_account = 3usize;
4933
4934        let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4935            SealedHeader::new(
4936                Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4937                B256::ZERO,
4938            ),
4939            Default::default(),
4940        );
4941
4942        let genesis_executed = ExecutedBlock::new(
4943            Arc::new(genesis.try_recover().unwrap()),
4944            Arc::new(BlockExecutionOutput {
4945                result: BlockExecutionResult {
4946                    receipts: vec![],
4947                    requests: Default::default(),
4948                    gas_used: 0,
4949                    blob_gas_used: 0,
4950                },
4951                state: Default::default(),
4952            }),
4953            ComputedTrieData::default(),
4954        );
4955        let provider_rw = factory.provider_rw().unwrap();
4956        provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
4957        provider_rw.commit().unwrap();
4958
4959        let mut blocks: Vec<ExecutedBlock> = Vec::new();
4960        let mut parent_hash = B256::ZERO;
4961
4962        for block_num in 1..=num_blocks {
4963            let mut builder = BundleState::builder(block_num..=block_num);
4964
4965            for acct_idx in 0..accounts_per_block {
4966                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
4967                let info = AccountInfo {
4968                    nonce: block_num,
4969                    balance: U256::from(block_num * 100 + acct_idx as u64),
4970                    ..Default::default()
4971                };
4972
4973                let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
4974                    slots_per_account as u64)
4975                    .map(|s| {
4976                        (
4977                            U256::from(s + acct_idx as u64 * 100),
4978                            (U256::ZERO, U256::from(block_num * 1000 + s)),
4979                        )
4980                    })
4981                    .collect();
4982
4983                let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
4984                    .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
4985                    .collect();
4986
4987                builder = builder
4988                    .state_present_account_info(address, info)
4989                    .revert_account_info(block_num, address, Some(None))
4990                    .state_storage(address, storage)
4991                    .revert_storage(block_num, address, revert_storage);
4992            }
4993
4994            let bundle = builder.build();
4995
4996            let hashed_state =
4997                HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4998
4999            let header = Header {
5000                number: block_num,
5001                parent_hash,
5002                difficulty: U256::from(1),
5003                ..Default::default()
5004            };
5005            let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5006                header,
5007                Default::default(),
5008            );
5009            parent_hash = block.hash();
5010
5011            let executed = ExecutedBlock::new(
5012                Arc::new(block.try_recover().unwrap()),
5013                Arc::new(BlockExecutionOutput {
5014                    result: BlockExecutionResult {
5015                        receipts: vec![],
5016                        requests: Default::default(),
5017                        gas_used: 0,
5018                        blob_gas_used: 0,
5019                    },
5020                    state: bundle,
5021                }),
5022                ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5023            );
5024            blocks.push(executed);
5025        }
5026
5027        let provider_rw = factory.provider_rw().unwrap();
5028        provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5029        provider_rw.commit().unwrap();
5030
5031        let provider = factory.provider().unwrap();
5032
5033        for block_num in 1..=num_blocks {
5034            for acct_idx in 0..accounts_per_block {
5035                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5036                let hashed_address = keccak256(address);
5037
5038                let ha_entry = provider
5039                    .tx_ref()
5040                    .cursor_read::<tables::HashedAccounts>()
5041                    .unwrap()
5042                    .seek_exact(hashed_address)
5043                    .unwrap();
5044                assert!(
5045                    ha_entry.is_some(),
5046                    "HashedAccounts missing for block {block_num} acct {acct_idx}"
5047                );
5048
5049                for s in 1..=slots_per_account as u64 {
5050                    let slot = U256::from(s + acct_idx as u64 * 100);
5051                    let slot_key = B256::from(slot);
5052                    let hashed_slot = keccak256(slot_key);
5053
5054                    let hs_entry = provider
5055                        .tx_ref()
5056                        .cursor_dup_read::<tables::HashedStorages>()
5057                        .unwrap()
5058                        .seek_by_key_subkey(hashed_address, hashed_slot)
5059                        .unwrap();
5060                    assert!(
5061                        hs_entry.is_some(),
5062                        "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5063                    );
5064                    let entry = hs_entry.unwrap();
5065                    assert_eq!(entry.key, hashed_slot);
5066                    assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5067                }
5068            }
5069        }
5070
5071        for block_num in 1..=num_blocks {
5072            let header = provider.header_by_number(block_num).unwrap();
5073            assert!(header.is_some(), "Header missing for block {block_num}");
5074
5075            let indices = provider.block_body_indices(block_num).unwrap();
5076            assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5077        }
5078
5079        let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5080        let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5081
5082        if mode == StorageMode::V2 {
5083            assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5084            assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5085
5086            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5087            assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5088
5089            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5090            assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5091
5092            provider.static_file_provider().commit().unwrap();
5093            let sf = factory.static_file_provider();
5094
5095            for block_num in 1..=num_blocks {
5096                let account_cs = sf.account_block_changeset(block_num).unwrap();
5097                assert!(
5098                    !account_cs.is_empty(),
5099                    "v2: static file AccountChangeSets should exist for block {block_num}"
5100                );
5101
5102                let storage_cs = sf.storage_changeset(block_num).unwrap();
5103                assert!(
5104                    !storage_cs.is_empty(),
5105                    "v2: static file StorageChangeSets should exist for block {block_num}"
5106                );
5107
5108                for (_, entry) in &storage_cs {
5109                    assert!(
5110                        entry.key != keccak256(entry.key),
5111                        "v2: static file storage changeset should have plain slot keys"
5112                    );
5113                }
5114            }
5115
5116            let rocksdb = factory.rocksdb_provider();
5117            for block_num in 1..=num_blocks {
5118                for acct_idx in 0..accounts_per_block {
5119                    let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5120                    let shards = rocksdb.account_history_shards(address).unwrap();
5121                    assert!(
5122                        !shards.is_empty(),
5123                        "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5124                    );
5125
5126                    for s in 1..=slots_per_account as u64 {
5127                        let slot = U256::from(s + acct_idx as u64 * 100);
5128                        let slot_key = B256::from(slot);
5129                        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5130                        assert!(
5131                            !shards.is_empty(),
5132                            "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5133                        );
5134                    }
5135                }
5136            }
5137        } else {
5138            assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5139            assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5140
5141            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5142            assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5143
5144            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5145            assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5146
5147            for block_num in 1..=num_blocks {
5148                let storage_entries: Vec<_> = provider
5149                    .tx_ref()
5150                    .cursor_dup_read::<tables::StorageChangeSets>()
5151                    .unwrap()
5152                    .walk_range(BlockNumberAddress::range(block_num..=block_num))
5153                    .unwrap()
5154                    .collect::<Result<Vec<_>, _>>()
5155                    .unwrap();
5156                assert!(
5157                    !storage_entries.is_empty(),
5158                    "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5159                );
5160
5161                for (_, entry) in &storage_entries {
5162                    let slot_key = B256::from(entry.key);
5163                    assert!(
5164                        slot_key != keccak256(slot_key),
5165                        "v1: storage changeset keys should be plain (not hashed)"
5166                    );
5167                }
5168            }
5169
5170            let mdbx_account_history =
5171                provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5172            assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5173
5174            let mdbx_storage_history =
5175                provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5176            assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5177        }
5178    }
5179
5180    #[test]
5181    fn test_save_blocks_v1_table_assertions() {
5182        run_save_blocks_and_verify(StorageMode::V1);
5183    }
5184
5185    #[test]
5186    fn test_save_blocks_v2_table_assertions() {
5187        run_save_blocks_and_verify(StorageMode::V2);
5188    }
5189
5190    #[test]
5191    fn test_write_and_remove_state_roundtrip_v2() {
5192        let factory = create_test_provider_factory();
5193        let storage_settings = StorageSettings::v2();
5194        assert!(storage_settings.use_hashed_state());
5195        factory.set_storage_settings_cache(storage_settings);
5196
5197        let address = Address::with_last_byte(1);
5198        let hashed_address = keccak256(address);
5199        let slot = U256::from(5);
5200        let slot_key = B256::from(slot);
5201        let hashed_slot = keccak256(slot_key);
5202
5203        {
5204            let sf = factory.static_file_provider();
5205            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5206            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5207            hw.append_header(&h0, &B256::ZERO).unwrap();
5208            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5209            hw.append_header(&h1, &B256::ZERO).unwrap();
5210            hw.commit().unwrap();
5211
5212            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5213            aw.append_account_changeset(vec![], 0).unwrap();
5214            aw.commit().unwrap();
5215
5216            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5217            sw.append_storage_changeset(vec![], 0).unwrap();
5218            sw.commit().unwrap();
5219        }
5220
5221        {
5222            let provider_rw = factory.provider_rw().unwrap();
5223            provider_rw
5224                .tx
5225                .put::<tables::BlockBodyIndices>(
5226                    0,
5227                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5228                )
5229                .unwrap();
5230            provider_rw
5231                .tx
5232                .put::<tables::BlockBodyIndices>(
5233                    1,
5234                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5235                )
5236                .unwrap();
5237            provider_rw
5238                .tx
5239                .cursor_write::<tables::HashedAccounts>()
5240                .unwrap()
5241                .upsert(
5242                    hashed_address,
5243                    &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5244                )
5245                .unwrap();
5246            provider_rw.commit().unwrap();
5247        }
5248
5249        let provider_rw = factory.provider_rw().unwrap();
5250
5251        let bundle = BundleState::builder(1..=1)
5252            .state_present_account_info(
5253                address,
5254                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5255            )
5256            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5257            .revert_account_info(1, address, Some(None))
5258            .revert_storage(1, address, vec![(slot, U256::ZERO)])
5259            .build();
5260
5261        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5262
5263        provider_rw
5264            .write_state(
5265                &execution_outcome,
5266                OriginalValuesKnown::Yes,
5267                StateWriteConfig {
5268                    write_receipts: false,
5269                    write_account_changesets: true,
5270                    write_storage_changesets: true,
5271                },
5272            )
5273            .unwrap();
5274
5275        let hashed_state =
5276            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5277        provider_rw.write_hashed_state(&hashed_state).unwrap();
5278
5279        let hashed_account = provider_rw
5280            .tx
5281            .cursor_read::<tables::HashedAccounts>()
5282            .unwrap()
5283            .seek_exact(hashed_address)
5284            .unwrap()
5285            .unwrap()
5286            .1;
5287        assert_eq!(hashed_account.nonce, 1);
5288
5289        let hashed_entry = provider_rw
5290            .tx
5291            .cursor_dup_read::<tables::HashedStorages>()
5292            .unwrap()
5293            .seek_by_key_subkey(hashed_address, hashed_slot)
5294            .unwrap()
5295            .unwrap();
5296        assert_eq!(hashed_entry.key, hashed_slot);
5297        assert_eq!(hashed_entry.value, U256::from(10));
5298
5299        let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5300        assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5301
5302        let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5303        assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5304
5305        provider_rw.static_file_provider().commit().unwrap();
5306
5307        let sf = factory.static_file_provider();
5308        let storage_cs = sf.storage_changeset(1).unwrap();
5309        assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5310        assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5311
5312        provider_rw.remove_state_above(0).unwrap();
5313
5314        let restored_account = provider_rw
5315            .tx
5316            .cursor_read::<tables::HashedAccounts>()
5317            .unwrap()
5318            .seek_exact(hashed_address)
5319            .unwrap();
5320        assert!(
5321            restored_account.is_none(),
5322            "v2: account should be removed (didn't exist before block 1)"
5323        );
5324
5325        let storage_gone = provider_rw
5326            .tx
5327            .cursor_dup_read::<tables::HashedStorages>()
5328            .unwrap()
5329            .seek_by_key_subkey(hashed_address, hashed_slot)
5330            .unwrap();
5331        assert!(
5332            storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5333            "v2: storage should be reverted (removed or different key)"
5334        );
5335
5336        let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5337        assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5338
5339        let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5340        assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5341    }
5342
5343    #[test]
5344    fn test_unwind_storage_history_indices_v2() {
5345        let factory = create_test_provider_factory();
5346        factory.set_storage_settings_cache(StorageSettings::v2());
5347
5348        let address = Address::with_last_byte(1);
5349        let slot_key = B256::from(U256::from(42));
5350
5351        {
5352            let rocksdb = factory.rocksdb_provider();
5353            let mut batch = rocksdb.batch();
5354            batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5355            batch.commit().unwrap();
5356
5357            let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5358            assert!(!shards.is_empty(), "history should be written to rocksdb");
5359        }
5360
5361        let provider_rw = factory.provider_rw().unwrap();
5362
5363        let changesets = vec![
5364            (
5365                BlockNumberAddress((7, address)),
5366                StorageEntry { key: slot_key, value: U256::from(5) },
5367            ),
5368            (
5369                BlockNumberAddress((10, address)),
5370                StorageEntry { key: slot_key, value: U256::from(8) },
5371            ),
5372        ];
5373
5374        let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5375        assert_eq!(count, 2);
5376
5377        provider_rw.commit().unwrap();
5378
5379        let rocksdb = factory.rocksdb_provider();
5380        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5381
5382        assert!(
5383            !shards.is_empty(),
5384            "history shards should still exist with block 3 after partial unwind"
5385        );
5386
5387        let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5388        assert!(all_blocks.contains(&3), "block 3 should remain");
5389        assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5390        assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5391    }
5392}