Skip to main content

reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, AddressSet, B256Map, HashMap},
31    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::Database,
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44        StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
55    StorageSlotKey,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, ChangesetEntry, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader,
65    StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    HashedPostStateSorted, StoredNibbles,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor};
73use revm_database::states::{
74    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use std::{
77    cmp::Ordering,
78    collections::{BTreeMap, BTreeSet},
79    fmt::Debug,
80    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
81    sync::Arc,
82    time::Instant,
83};
84use tracing::{debug, instrument, trace};
85
86/// Determines the commit order for database operations.
87#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
88pub enum CommitOrder {
89    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
90    #[default]
91    Normal,
92    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
93    /// Used for unwind operations to allow recovery by truncating static files on restart.
94    Unwind,
95}
96
97impl CommitOrder {
98    /// Returns true if this is unwind commit order.
99    pub const fn is_unwind(&self) -> bool {
100        matches!(self, Self::Unwind)
101    }
102}
103
104/// A [`DatabaseProvider`] that holds a read-only database transaction.
105pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
106
107/// A [`DatabaseProvider`] that holds a read-write database transaction.
108///
109/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
110/// Once that issue is solved, we can probably revert back to being an alias type.
111#[derive(Debug)]
112pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
113    pub DatabaseProvider<<DB as Database>::TXMut, N>,
114);
115
116impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
117    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
118
119    fn deref(&self) -> &Self::Target {
120        &self.0
121    }
122}
123
124impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
125    fn deref_mut(&mut self) -> &mut Self::Target {
126        &mut self.0
127    }
128}
129
130impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
131    for DatabaseProviderRW<DB, N>
132{
133    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
134        &self.0
135    }
136}
137
138impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
139    /// Commit database transaction and static file if it exists.
140    pub fn commit(self) -> ProviderResult<()> {
141        self.0.commit()
142    }
143
144    /// Consume `DbTx` or `DbTxMut`.
145    pub fn into_tx(self) -> <DB as Database>::TXMut {
146        self.0.into_tx()
147    }
148
149    /// Override the minimum pruning distance for testing purposes.
150    #[cfg(any(test, feature = "test-utils"))]
151    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
152        self.0.minimum_pruning_distance = distance;
153        self
154    }
155}
156
157impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
158    for DatabaseProvider<<DB as Database>::TXMut, N>
159{
160    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
161        provider.0
162    }
163}
164
165/// Mode for [`DatabaseProvider::save_blocks`].
166#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum SaveBlocksMode {
168    /// Full mode: write block structure + receipts + state + trie.
169    /// Used by engine/production code.
170    Full,
171    /// Blocks only: write block structure (headers, txs, senders, indices).
172    /// Receipts/state/trie are skipped - they may come later via separate calls.
173    /// Used by `insert_block`.
174    BlocksOnly,
175}
176
177impl SaveBlocksMode {
178    /// Returns `true` if this is [`SaveBlocksMode::Full`].
179    pub const fn with_state(self) -> bool {
180        matches!(self, Self::Full)
181    }
182}
183
184/// A provider struct that fetches data from the database.
185/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
186pub struct DatabaseProvider<TX, N: NodeTypes> {
187    /// Database transaction.
188    tx: TX,
189    /// Chain spec
190    chain_spec: Arc<N::ChainSpec>,
191    /// Static File provider
192    static_file_provider: StaticFileProvider<N::Primitives>,
193    /// Pruning configuration
194    prune_modes: PruneModes,
195    /// Node storage handler.
196    storage: Arc<N::Storage>,
197    /// Storage configuration settings for this node
198    storage_settings: Arc<RwLock<StorageSettings>>,
199    /// `RocksDB` provider
200    rocksdb_provider: RocksDBProvider,
201    /// Changeset cache for trie unwinding
202    changeset_cache: ChangesetCache,
203    /// Task runtime for spawning parallel I/O work.
204    runtime: reth_tasks::Runtime,
205    /// Pending `RocksDB` batches to be committed at provider commit time.
206    #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
207    pending_rocksdb_batches: PendingRocksDBBatches,
208    /// Commit order for database operations.
209    commit_order: CommitOrder,
210    /// Minimum distance from tip required for pruning
211    minimum_pruning_distance: u64,
212    /// Database provider metrics
213    metrics: metrics::DatabaseProviderMetrics,
214}
215
216impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        let mut s = f.debug_struct("DatabaseProvider");
219        s.field("tx", &self.tx)
220            .field("chain_spec", &self.chain_spec)
221            .field("static_file_provider", &self.static_file_provider)
222            .field("prune_modes", &self.prune_modes)
223            .field("storage", &self.storage)
224            .field("storage_settings", &self.storage_settings)
225            .field("rocksdb_provider", &self.rocksdb_provider)
226            .field("changeset_cache", &self.changeset_cache)
227            .field("runtime", &self.runtime)
228            .field("pending_rocksdb_batches", &"<pending batches>")
229            .field("commit_order", &self.commit_order)
230            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
231            .finish()
232    }
233}
234
235impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
236    /// Returns reference to prune modes.
237    pub const fn prune_modes_ref(&self) -> &PruneModes {
238        &self.prune_modes
239    }
240}
241
242impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
243    /// State provider for latest state
244    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
245        trace!(target: "providers::db", "Returning latest state provider");
246        Box::new(LatestStateProviderRef::new(self))
247    }
248
249    /// Storage provider for state at that given block hash
250    pub fn history_by_block_hash<'a>(
251        &'a self,
252        block_hash: BlockHash,
253    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
254        let mut block_number =
255            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
256        if block_number == self.best_block_number().unwrap_or_default() &&
257            block_number == self.last_block_number().unwrap_or_default()
258        {
259            return Ok(Box::new(LatestStateProviderRef::new(self)))
260        }
261
262        // +1 as the changeset that we want is the one that was applied after this block.
263        block_number += 1;
264
265        let account_history_prune_checkpoint =
266            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
267        let storage_history_prune_checkpoint =
268            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
269
270        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
271
272        // If we pruned account or storage history, we can't return state on every historical block.
273        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
274        if let Some(prune_checkpoint_block_number) =
275            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
276        {
277            state_provider = state_provider.with_lowest_available_account_history_block_number(
278                prune_checkpoint_block_number + 1,
279            );
280        }
281        if let Some(prune_checkpoint_block_number) =
282            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
283        {
284            state_provider = state_provider.with_lowest_available_storage_history_block_number(
285                prune_checkpoint_block_number + 1,
286            );
287        }
288
289        Ok(Box::new(state_provider))
290    }
291
292    #[cfg(feature = "test-utils")]
293    /// Sets the prune modes for provider.
294    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
295        self.prune_modes = prune_modes;
296    }
297}
298
299impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
300    type Primitives = N::Primitives;
301}
302
303impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
304    /// Returns a static file provider
305    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
306        self.static_file_provider.clone()
307    }
308
309    fn get_static_file_writer(
310        &self,
311        block: BlockNumber,
312        segment: StaticFileSegment,
313    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
314        self.static_file_provider.get_writer(block, segment)
315    }
316}
317
318impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
319    /// Returns the `RocksDB` provider.
320    fn rocksdb_provider(&self) -> RocksDBProvider {
321        self.rocksdb_provider.clone()
322    }
323
324    #[cfg(all(unix, feature = "rocksdb"))]
325    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
326        self.pending_rocksdb_batches.lock().push(batch);
327    }
328
329    #[cfg(all(unix, feature = "rocksdb"))]
330    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
331        let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
332        for batch in batches {
333            self.rocksdb_provider.commit_batch(batch)?;
334        }
335        Ok(())
336    }
337}
338
339impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
340    for DatabaseProvider<TX, N>
341{
342    type ChainSpec = N::ChainSpec;
343
344    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
345        self.chain_spec.clone()
346    }
347}
348
349impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
350    /// Creates a provider with an inner read-write transaction.
351    #[allow(clippy::too_many_arguments)]
352    fn new_rw_inner(
353        tx: TX,
354        chain_spec: Arc<N::ChainSpec>,
355        static_file_provider: StaticFileProvider<N::Primitives>,
356        prune_modes: PruneModes,
357        storage: Arc<N::Storage>,
358        storage_settings: Arc<RwLock<StorageSettings>>,
359        rocksdb_provider: RocksDBProvider,
360        changeset_cache: ChangesetCache,
361        runtime: reth_tasks::Runtime,
362        commit_order: CommitOrder,
363    ) -> Self {
364        Self {
365            tx,
366            chain_spec,
367            static_file_provider,
368            prune_modes,
369            storage,
370            storage_settings,
371            rocksdb_provider,
372            changeset_cache,
373            runtime,
374            pending_rocksdb_batches: Default::default(),
375            commit_order,
376            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
377            metrics: metrics::DatabaseProviderMetrics::default(),
378        }
379    }
380
381    /// Creates a provider with an inner read-write transaction using normal commit order.
382    #[allow(clippy::too_many_arguments)]
383    pub fn new_rw(
384        tx: TX,
385        chain_spec: Arc<N::ChainSpec>,
386        static_file_provider: StaticFileProvider<N::Primitives>,
387        prune_modes: PruneModes,
388        storage: Arc<N::Storage>,
389        storage_settings: Arc<RwLock<StorageSettings>>,
390        rocksdb_provider: RocksDBProvider,
391        changeset_cache: ChangesetCache,
392        runtime: reth_tasks::Runtime,
393    ) -> Self {
394        Self::new_rw_inner(
395            tx,
396            chain_spec,
397            static_file_provider,
398            prune_modes,
399            storage,
400            storage_settings,
401            rocksdb_provider,
402            changeset_cache,
403            runtime,
404            CommitOrder::Normal,
405        )
406    }
407
408    /// Creates a provider with an inner read-write transaction using unwind commit order.
409    #[allow(clippy::too_many_arguments)]
410    pub fn new_unwind_rw(
411        tx: TX,
412        chain_spec: Arc<N::ChainSpec>,
413        static_file_provider: StaticFileProvider<N::Primitives>,
414        prune_modes: PruneModes,
415        storage: Arc<N::Storage>,
416        storage_settings: Arc<RwLock<StorageSettings>>,
417        rocksdb_provider: RocksDBProvider,
418        changeset_cache: ChangesetCache,
419        runtime: reth_tasks::Runtime,
420    ) -> Self {
421        Self::new_rw_inner(
422            tx,
423            chain_spec,
424            static_file_provider,
425            prune_modes,
426            storage,
427            storage_settings,
428            rocksdb_provider,
429            changeset_cache,
430            runtime,
431            CommitOrder::Unwind,
432        )
433    }
434}
435
436impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
437    fn as_ref(&self) -> &Self {
438        self
439    }
440}
441
442impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
443    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
444    ///
445    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
446    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
447    where
448        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
449    {
450        #[cfg(all(unix, feature = "rocksdb"))]
451        let rocksdb = self.rocksdb_provider();
452        #[cfg(all(unix, feature = "rocksdb"))]
453        let rocksdb_batch = rocksdb.batch();
454        #[cfg(not(all(unix, feature = "rocksdb")))]
455        let rocksdb_batch = ();
456
457        let (result, raw_batch) = f(rocksdb_batch)?;
458
459        #[cfg(all(unix, feature = "rocksdb"))]
460        if let Some(batch) = raw_batch {
461            self.set_pending_rocksdb_batch(batch);
462        }
463        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
464
465        Ok(result)
466    }
467
468    /// Creates the context for static file writes.
469    fn static_file_write_ctx(
470        &self,
471        save_mode: SaveBlocksMode,
472        first_block: BlockNumber,
473        last_block: BlockNumber,
474    ) -> ProviderResult<StaticFileWriteCtx> {
475        let tip = self.last_block_number()?.max(last_block);
476        Ok(StaticFileWriteCtx {
477            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
478                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
479            write_receipts: save_mode.with_state() &&
480                EitherWriter::receipts_destination(self).is_static_file(),
481            write_account_changesets: save_mode.with_state() &&
482                EitherWriterDestination::account_changesets(self).is_static_file(),
483            write_storage_changesets: save_mode.with_state() &&
484                EitherWriterDestination::storage_changesets(self).is_static_file(),
485            tip,
486            receipts_prune_mode: self.prune_modes.receipts,
487            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
488            receipts_prunable: self
489                .static_file_provider
490                .get_highest_static_file_tx(StaticFileSegment::Receipts)
491                .is_none() &&
492                PruneMode::Distance(self.minimum_pruning_distance)
493                    .should_prune(first_block, tip),
494        })
495    }
496
497    /// Creates the context for `RocksDB` writes.
498    #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
499    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
500        RocksDBWriteCtx {
501            first_block_number: first_block,
502            prune_tx_lookup: self.prune_modes.transaction_lookup,
503            storage_settings: self.cached_storage_settings(),
504            pending_batches: self.pending_rocksdb_batches.clone(),
505        }
506    }
507
508    /// Writes executed blocks and state to storage.
509    ///
510    /// This method parallelizes static file (SF) writes with MDBX writes.
511    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
512    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
513    ///
514    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
515    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
516    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
517    pub fn save_blocks(
518        &self,
519        blocks: Vec<ExecutedBlock<N::Primitives>>,
520        save_mode: SaveBlocksMode,
521    ) -> ProviderResult<()> {
522        if blocks.is_empty() {
523            debug!(target: "providers::db", "Attempted to write empty block range");
524            return Ok(())
525        }
526
527        let total_start = Instant::now();
528        let block_count = blocks.len() as u64;
529        let first_number = blocks.first().unwrap().recovered_block().number();
530        let last_block_number = blocks.last().unwrap().recovered_block().number();
531
532        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
533
534        // Compute tx_nums upfront (both threads need these)
535        let first_tx_num = self
536            .tx
537            .cursor_read::<tables::TransactionBlocks>()?
538            .last()?
539            .map(|(n, _)| n + 1)
540            .unwrap_or_default();
541
542        let tx_nums: Vec<TxNumber> = {
543            let mut nums = Vec::with_capacity(blocks.len());
544            let mut current = first_tx_num;
545            for block in &blocks {
546                nums.push(current);
547                current += block.recovered_block().body().transaction_count() as u64;
548            }
549            nums
550        };
551
552        let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
553
554        // avoid capturing &self.tx in scope below.
555        let sf_provider = &self.static_file_provider;
556        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
557        #[cfg(all(unix, feature = "rocksdb"))]
558        let rocksdb_provider = self.rocksdb_provider.clone();
559        #[cfg(all(unix, feature = "rocksdb"))]
560        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
561        #[cfg(all(unix, feature = "rocksdb"))]
562        let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
563
564        let mut sf_result = None;
565        #[cfg(all(unix, feature = "rocksdb"))]
566        let mut rocksdb_result = None;
567
568        // Write to all backends in parallel.
569        let runtime = &self.runtime;
570        runtime.storage_pool().in_place_scope(|s| {
571            // SF writes
572            s.spawn(|_| {
573                let start = Instant::now();
574                sf_result = Some(
575                    sf_provider
576                        .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
577                        .map(|()| start.elapsed()),
578                );
579            });
580
581            // RocksDB writes
582            #[cfg(all(unix, feature = "rocksdb"))]
583            if rocksdb_enabled {
584                s.spawn(|_| {
585                    let start = Instant::now();
586                    rocksdb_result = Some(
587                        rocksdb_provider
588                            .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
589                            .map(|()| start.elapsed()),
590                    );
591                });
592            }
593
594            // MDBX writes
595            let mdbx_start = Instant::now();
596
597            // Collect all transaction hashes across all blocks, sort them, and write in batch
598            if !self.cached_storage_settings().storage_v2 &&
599                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
600            {
601                let start = Instant::now();
602                let total_tx_count: usize =
603                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
604                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
605                for (i, block) in blocks.iter().enumerate() {
606                    let recovered_block = block.recovered_block();
607                    let mut tx_num = tx_nums[i];
608                    for transaction in recovered_block.body().transactions_iter() {
609                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
610                        tx_num += 1;
611                    }
612                }
613
614                // Sort by hash for optimal MDBX insertion performance
615                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
616
617                // Write all transaction hash numbers in a single batch
618                self.with_rocksdb_batch(|batch| {
619                    let mut tx_hash_writer =
620                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
621                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
622                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
623                    Ok(((), raw_batch))
624                })?;
625                self.metrics.record_duration(
626                    metrics::Action::InsertTransactionHashNumbers,
627                    start.elapsed(),
628                );
629            }
630
631            for (i, block) in blocks.iter().enumerate() {
632                let recovered_block = block.recovered_block();
633
634                let start = Instant::now();
635                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
636                timings.insert_block += start.elapsed();
637
638                if save_mode.with_state() {
639                    let execution_output = block.execution_outcome();
640
641                    // Write state and changesets to the database.
642                    // Must be written after blocks because of the receipt lookup.
643                    // Skip receipts/account changesets if they're being written to static files.
644                    let start = Instant::now();
645                    self.write_state(
646                        WriteStateInput::Single {
647                            outcome: execution_output,
648                            block: recovered_block.number(),
649                        },
650                        OriginalValuesKnown::No,
651                        StateWriteConfig {
652                            write_receipts: !sf_ctx.write_receipts,
653                            write_account_changesets: !sf_ctx.write_account_changesets,
654                            write_storage_changesets: !sf_ctx.write_storage_changesets,
655                        },
656                    )?;
657                    timings.write_state += start.elapsed();
658                }
659            }
660
661            // Write all hashed state and trie updates in single batches.
662            // This reduces cursor open/close overhead from N calls to 1.
663            if save_mode.with_state() {
664                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
665                let start = Instant::now();
666                let merged_hashed_state = HashedPostStateSorted::merge_batch(
667                    blocks.iter().rev().map(|b| b.trie_data().hashed_state),
668                );
669                if !merged_hashed_state.is_empty() {
670                    self.write_hashed_state(&merged_hashed_state)?;
671                }
672                timings.write_hashed_state += start.elapsed();
673
674                let start = Instant::now();
675                let merged_trie =
676                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
677                if !merged_trie.is_empty() {
678                    self.write_trie_updates_sorted(&merged_trie)?;
679                }
680                timings.write_trie_updates += start.elapsed();
681            }
682
683            // Full mode: update history indices
684            if save_mode.with_state() {
685                let start = Instant::now();
686                self.update_history_indices(first_number..=last_block_number)?;
687                timings.update_history_indices = start.elapsed();
688            }
689
690            // Update pipeline progress
691            let start = Instant::now();
692            self.update_pipeline_stages(last_block_number, false)?;
693            timings.update_pipeline_stages = start.elapsed();
694
695            timings.mdbx = mdbx_start.elapsed();
696
697            Ok::<_, ProviderError>(())
698        })?;
699
700        // Collect results from spawned tasks
701        timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
702
703        #[cfg(all(unix, feature = "rocksdb"))]
704        if rocksdb_enabled {
705            timings.rocksdb = rocksdb_result.ok_or_else(|| {
706                ProviderError::Database(reth_db_api::DatabaseError::Other(
707                    "RocksDB thread panicked".into(),
708                ))
709            })??;
710        }
711
712        timings.total = total_start.elapsed();
713
714        self.metrics.record_save_blocks(&timings);
715        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
716
717        Ok(())
718    }
719
720    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
721    ///
722    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
723    #[instrument(level = "debug", target = "providers::db", skip_all)]
724    fn insert_block_mdbx_only(
725        &self,
726        block: &RecoveredBlock<BlockTy<N>>,
727        first_tx_num: TxNumber,
728    ) -> ProviderResult<StoredBlockBodyIndices> {
729        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
730            EitherWriterDestination::senders(self).is_database()
731        {
732            let start = Instant::now();
733            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
734            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
735            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
736                cursor.append(tx_num, &sender)?;
737            }
738            self.metrics
739                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
740        }
741
742        let block_number = block.number();
743        let tx_count = block.body().transaction_count() as u64;
744
745        let start = Instant::now();
746        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
747        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
748
749        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
750
751        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
752    }
753
754    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
755    /// `Ommers`/`Withdrawals`).
756    fn write_block_body_indices(
757        &self,
758        block_number: BlockNumber,
759        body: &BodyTy<N>,
760        first_tx_num: TxNumber,
761        tx_count: u64,
762    ) -> ProviderResult<()> {
763        // MDBX: BlockBodyIndices
764        let start = Instant::now();
765        self.tx
766            .cursor_write::<tables::BlockBodyIndices>()?
767            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
768        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
769
770        // MDBX: TransactionBlocks (last tx -> block mapping)
771        if tx_count > 0 {
772            let start = Instant::now();
773            self.tx
774                .cursor_write::<tables::TransactionBlocks>()?
775                .append(first_tx_num + tx_count - 1, &block_number)?;
776            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
777        }
778
779        // MDBX: Ommers/Withdrawals
780        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
781
782        Ok(())
783    }
784
785    /// Unwinds trie state starting at and including the given block.
786    ///
787    /// This includes calculating the resulted state root and comparing it with the parent block
788    /// state root.
789    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
790        let changed_accounts = self.account_changesets_range(from..)?;
791
792        // Unwind account hashes.
793        self.unwind_account_hashing(changed_accounts.iter())?;
794
795        // Unwind account history indices.
796        self.unwind_account_history_indices(changed_accounts.iter())?;
797
798        let changed_storages = self.storage_changesets_range(from..)?;
799
800        // Unwind storage hashes.
801        self.unwind_storage_hashing(changed_storages.iter().copied())?;
802
803        // Unwind storage history indices.
804        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
805
806        // Unwind accounts/storages trie tables using the revert.
807        // Get the database tip block number
808        let db_tip_block = self
809            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
810            .as_ref()
811            .map(|chk| chk.block_number)
812            .ok_or_else(|| ProviderError::InsufficientChangesets {
813                requested: from,
814                available: 0..=0,
815            })?;
816
817        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
818        self.write_trie_updates_sorted(&trie_revert)?;
819
820        Ok(())
821    }
822
823    /// Removes receipts from all transactions starting with provided number (inclusive).
824    fn remove_receipts_from(
825        &self,
826        from_tx: TxNumber,
827        last_block: BlockNumber,
828    ) -> ProviderResult<()> {
829        // iterate over block body and remove receipts
830        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
831
832        if EitherWriter::receipts_destination(self).is_static_file() {
833            let static_file_receipt_num =
834                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
835
836            let to_delete = static_file_receipt_num
837                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
838                .unwrap_or_default();
839
840            self.static_file_provider
841                .latest_writer(StaticFileSegment::Receipts)?
842                .prune_receipts(to_delete, last_block)?;
843        }
844
845        Ok(())
846    }
847}
848
849impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
850    fn try_into_history_at_block(
851        self,
852        mut block_number: BlockNumber,
853    ) -> ProviderResult<StateProviderBox> {
854        let best_block = self.best_block_number().unwrap_or_default();
855
856        // Reject requests for blocks beyond the best block
857        if block_number > best_block {
858            return Err(ProviderError::BlockNotExecuted {
859                requested: block_number,
860                executed: best_block,
861            });
862        }
863
864        // If requesting state at the best block, use the latest state provider
865        if block_number == best_block {
866            return Ok(Box::new(LatestStateProvider::new(self)));
867        }
868
869        // +1 as the changeset that we want is the one that was applied after this block.
870        block_number += 1;
871
872        let account_history_prune_checkpoint =
873            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
874        let storage_history_prune_checkpoint =
875            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
876
877        let mut state_provider = HistoricalStateProvider::new(self, block_number);
878
879        // If we pruned account or storage history, we can't return state on every historical block.
880        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
881        if let Some(prune_checkpoint_block_number) =
882            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
883        {
884            state_provider = state_provider.with_lowest_available_account_history_block_number(
885                prune_checkpoint_block_number + 1,
886            );
887        }
888        if let Some(prune_checkpoint_block_number) =
889            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
890        {
891            state_provider = state_provider.with_lowest_available_storage_history_block_number(
892                prune_checkpoint_block_number + 1,
893            );
894        }
895
896        Ok(Box::new(state_provider))
897    }
898}
899
900/// For a given key, unwind all history shards that contain block numbers at or above the given
901/// block number.
902///
903/// S - Sharded key subtype.
904/// T - Table to walk over.
905/// C - Cursor implementation.
906///
907/// This function walks the entries from the given start key and deletes all shards that belong to
908/// the key and contain block numbers at or above the given block number. Shards entirely below
909/// the block number are preserved.
910///
911/// The boundary shard (the shard that spans across the block number) is removed from the database.
912/// Any indices that are below the block number are filtered out and returned for reinsertion.
913/// The boundary shard is returned for reinsertion (if it's not empty).
914fn unwind_history_shards<S, T, C>(
915    cursor: &mut C,
916    start_key: T::Key,
917    block_number: BlockNumber,
918    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
919) -> ProviderResult<Vec<u64>>
920where
921    T: Table<Value = BlockNumberList>,
922    T::Key: AsRef<ShardedKey<S>>,
923    C: DbCursorRO<T> + DbCursorRW<T>,
924{
925    // Start from the given key and iterate through shards
926    let mut item = cursor.seek_exact(start_key)?;
927    while let Some((sharded_key, list)) = item {
928        // If the shard does not belong to the key, break.
929        if !shard_belongs_to_key(&sharded_key) {
930            break
931        }
932
933        // Always delete the current shard from the database first
934        // We'll decide later what (if anything) to reinsert
935        cursor.delete_current()?;
936
937        // Get the first (lowest) block number in this shard
938        // All block numbers in a shard are sorted in ascending order
939        let first = list.iter().next().expect("List can't be empty");
940
941        // Case 1: Entire shard is at or above the unwinding point
942        // Keep it deleted (don't return anything for reinsertion)
943        if first >= block_number {
944            item = cursor.prev()?;
945            continue
946        }
947        // Case 2: This is a boundary shard (spans across the unwinding point)
948        // The shard contains some blocks below and some at/above the unwinding point
949        else if block_number <= sharded_key.as_ref().highest_block_number {
950            // Return only the block numbers that are below the unwinding point
951            // These will be reinserted to preserve the historical data
952            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
953        }
954        // Case 3: Entire shard is below the unwinding point
955        // Return all block numbers for reinsertion (preserve entire shard)
956        return Ok(list.iter().collect::<Vec<_>>())
957    }
958
959    // No shards found or all processed
960    Ok(Vec::new())
961}
962
963impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
964    /// Creates a provider with an inner read-only transaction.
965    #[allow(clippy::too_many_arguments)]
966    pub fn new(
967        tx: TX,
968        chain_spec: Arc<N::ChainSpec>,
969        static_file_provider: StaticFileProvider<N::Primitives>,
970        prune_modes: PruneModes,
971        storage: Arc<N::Storage>,
972        storage_settings: Arc<RwLock<StorageSettings>>,
973        rocksdb_provider: RocksDBProvider,
974        changeset_cache: ChangesetCache,
975        runtime: reth_tasks::Runtime,
976    ) -> Self {
977        Self {
978            tx,
979            chain_spec,
980            static_file_provider,
981            prune_modes,
982            storage,
983            storage_settings,
984            rocksdb_provider,
985            changeset_cache,
986            runtime,
987            pending_rocksdb_batches: Default::default(),
988            commit_order: CommitOrder::Normal,
989            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
990            metrics: metrics::DatabaseProviderMetrics::default(),
991        }
992    }
993
994    /// Consume `DbTx` or `DbTxMut`.
995    pub fn into_tx(self) -> TX {
996        self.tx
997    }
998
999    /// Pass `DbTx` or `DbTxMut` mutable reference.
1000    pub const fn tx_mut(&mut self) -> &mut TX {
1001        &mut self.tx
1002    }
1003
1004    /// Pass `DbTx` or `DbTxMut` immutable reference.
1005    pub const fn tx_ref(&self) -> &TX {
1006        &self.tx
1007    }
1008
1009    /// Returns a reference to the chain specification.
1010    pub fn chain_spec(&self) -> &N::ChainSpec {
1011        &self.chain_spec
1012    }
1013}
1014
1015impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1016    fn recovered_block<H, HF, B, BF>(
1017        &self,
1018        id: BlockHashOrNumber,
1019        _transaction_kind: TransactionVariant,
1020        header_by_number: HF,
1021        construct_block: BF,
1022    ) -> ProviderResult<Option<B>>
1023    where
1024        H: AsRef<HeaderTy<N>>,
1025        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1026        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1027    {
1028        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1029        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1030
1031        // Get the block body
1032        //
1033        // If the body indices are not found, this means that the transactions either do not exist
1034        // in the database yet, or they do exit but are not indexed. If they exist but are not
1035        // indexed, we don't have enough information to return the block anyways, so we return
1036        // `None`.
1037        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1038
1039        let tx_range = body.tx_num_range();
1040
1041        let transactions = if tx_range.is_empty() {
1042            vec![]
1043        } else {
1044            self.transactions_by_tx_range(tx_range.clone())?
1045        };
1046
1047        let body = self
1048            .storage
1049            .reader()
1050            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1051            .pop()
1052            .ok_or(ProviderError::InvalidStorageOutput)?;
1053
1054        let senders = if tx_range.is_empty() {
1055            vec![]
1056        } else {
1057            let known_senders: HashMap<TxNumber, Address> =
1058                EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1059
1060            let mut senders = Vec::with_capacity(body.transactions().len());
1061            for (tx_num, tx) in tx_range.zip(body.transactions()) {
1062                match known_senders.get(&tx_num) {
1063                    None => {
1064                        let sender = tx.recover_signer_unchecked()?;
1065                        senders.push(sender);
1066                    }
1067                    Some(sender) => senders.push(*sender),
1068                }
1069            }
1070            senders
1071        };
1072
1073        construct_block(header, body, senders)
1074    }
1075
1076    /// Returns a range of blocks from the database.
1077    ///
1078    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1079    /// construct blocks from the following inputs:
1080    ///     – Header
1081    ///     - Range of transaction numbers
1082    ///     – Ommers
1083    ///     – Withdrawals
1084    ///     – Senders
1085    fn block_range<F, H, HF, R>(
1086        &self,
1087        range: RangeInclusive<BlockNumber>,
1088        headers_range: HF,
1089        mut assemble_block: F,
1090    ) -> ProviderResult<Vec<R>>
1091    where
1092        H: AsRef<HeaderTy<N>>,
1093        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1094        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1095    {
1096        if range.is_empty() {
1097            return Ok(Vec::new())
1098        }
1099
1100        let len = range.end().saturating_sub(*range.start()) as usize;
1101        let mut blocks = Vec::with_capacity(len);
1102
1103        let headers = headers_range(range.clone())?;
1104
1105        // If the body indices are not found, this means that the transactions either do
1106        // not exist in the database yet, or they do exit but are
1107        // not indexed. If they exist but are not indexed, we don't
1108        // have enough information to return the block anyways, so
1109        // we skip the block.
1110        let present_headers = self
1111            .block_body_indices_range(range)?
1112            .into_iter()
1113            .map(|b| b.tx_num_range())
1114            .zip(headers)
1115            .collect::<Vec<_>>();
1116
1117        let mut inputs = Vec::with_capacity(present_headers.len());
1118        for (tx_range, header) in &present_headers {
1119            let transactions = if tx_range.is_empty() {
1120                Vec::new()
1121            } else {
1122                self.transactions_by_tx_range(tx_range.clone())?
1123            };
1124
1125            inputs.push((header.as_ref(), transactions));
1126        }
1127
1128        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1129
1130        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1131            blocks.push(assemble_block(header, body, tx_range)?);
1132        }
1133
1134        Ok(blocks)
1135    }
1136
1137    /// Returns a range of blocks from the database, along with the senders of each
1138    /// transaction in the blocks.
1139    ///
1140    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1141    /// construct blocks from the following inputs:
1142    ///     – Header
1143    ///     - Transactions
1144    ///     – Ommers
1145    ///     – Withdrawals
1146    ///     – Senders
1147    fn block_with_senders_range<H, HF, B, BF>(
1148        &self,
1149        range: RangeInclusive<BlockNumber>,
1150        headers_range: HF,
1151        assemble_block: BF,
1152    ) -> ProviderResult<Vec<B>>
1153    where
1154        H: AsRef<HeaderTy<N>>,
1155        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1156        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1157    {
1158        self.block_range(range, headers_range, |header, body, tx_range| {
1159            let senders = if tx_range.is_empty() {
1160                Vec::new()
1161            } else {
1162                let known_senders: HashMap<TxNumber, Address> =
1163                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1164
1165                let mut senders = Vec::with_capacity(body.transactions().len());
1166                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1167                    match known_senders.get(&tx_num) {
1168                        None => {
1169                            // recover the sender from the transaction if not found
1170                            let sender = tx.recover_signer_unchecked()?;
1171                            senders.push(sender);
1172                        }
1173                        Some(sender) => senders.push(*sender),
1174                    }
1175                }
1176
1177                senders
1178            };
1179
1180            assemble_block(header, body, senders)
1181        })
1182    }
1183
1184    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1185    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
1186    /// account changesets.
1187    fn populate_bundle_state<A, S>(
1188        &self,
1189        account_changeset: Vec<(u64, AccountBeforeTx)>,
1190        storage_changeset: Vec<(BlockNumberAddress, ChangesetEntry)>,
1191        plain_accounts_cursor: &mut A,
1192        plain_storage_cursor: &mut S,
1193    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1194    where
1195        A: DbCursorRO<PlainAccountState>,
1196        S: DbDupCursorRO<PlainStorageState>,
1197    {
1198        // iterate previous value and get plain state value to create changeset
1199        // Double option around Account represent if Account state is know (first option) and
1200        // account is removed (Second Option)
1201        let mut state: BundleStateInit = HashMap::default();
1202
1203        // This is not working for blocks that are not at tip. as plain state is not the last
1204        // state of end range. We should rename the functions or add support to access
1205        // History state. Accessing history state can be tricky but we are not gaining
1206        // anything.
1207
1208        let mut reverts: RevertsInit = HashMap::default();
1209
1210        // add account changeset changes
1211        for (block_number, account_before) in account_changeset.into_iter().rev() {
1212            let AccountBeforeTx { info: old_info, address } = account_before;
1213            match state.entry(address) {
1214                hash_map::Entry::Vacant(entry) => {
1215                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1216                    entry.insert((old_info, new_info, HashMap::default()));
1217                }
1218                hash_map::Entry::Occupied(mut entry) => {
1219                    // overwrite old account state.
1220                    entry.get_mut().0 = old_info;
1221                }
1222            }
1223            // insert old info into reverts.
1224            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1225        }
1226
1227        // add storage changeset changes
1228        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1229            let BlockNumberAddress((block_number, address)) = block_and_address;
1230            // get account state or insert from plain state.
1231            let account_state = match state.entry(address) {
1232                hash_map::Entry::Vacant(entry) => {
1233                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1234                    entry.insert((present_info, present_info, HashMap::default()))
1235                }
1236                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1237            };
1238
1239            // match storage.
1240            let storage_key = old_storage.key.as_b256();
1241            match account_state.2.entry(storage_key) {
1242                hash_map::Entry::Vacant(entry) => {
1243                    let new_storage = plain_storage_cursor
1244                        .seek_by_key_subkey(address, storage_key)?
1245                        .filter(|storage| storage.key == storage_key)
1246                        .unwrap_or_default();
1247                    entry.insert((old_storage.value, new_storage.value));
1248                }
1249                hash_map::Entry::Occupied(mut entry) => {
1250                    entry.get_mut().0 = old_storage.value;
1251                }
1252            };
1253
1254            reverts
1255                .entry(block_number)
1256                .or_default()
1257                .entry(address)
1258                .or_default()
1259                .1
1260                .push(old_storage.into_storage_entry());
1261        }
1262
1263        Ok((state, reverts))
1264    }
1265
1266    /// Like [`populate_bundle_state`](Self::populate_bundle_state), but reads current values from
1267    /// `HashedAccounts`/`HashedStorages`. Addresses are hashed via `keccak256` for DB lookups,
1268    /// while storage keys from changesets are assumed to already be hashed and are used as-is.
1269    /// The output `BundleStateInit`/`RevertsInit` structures remain keyed by plain address.
1270    fn populate_bundle_state_hashed(
1271        &self,
1272        account_changeset: Vec<(u64, AccountBeforeTx)>,
1273        storage_changeset: Vec<(BlockNumberAddress, ChangesetEntry)>,
1274        hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1275        hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1276    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1277        let mut state: BundleStateInit = HashMap::default();
1278        let mut reverts: RevertsInit = HashMap::default();
1279
1280        // add account changeset changes
1281        for (block_number, account_before) in account_changeset.into_iter().rev() {
1282            let AccountBeforeTx { info: old_info, address } = account_before;
1283            match state.entry(address) {
1284                hash_map::Entry::Vacant(entry) => {
1285                    let hashed_address = keccak256(address);
1286                    let new_info =
1287                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1288                    entry.insert((old_info, new_info, HashMap::default()));
1289                }
1290                hash_map::Entry::Occupied(mut entry) => {
1291                    entry.get_mut().0 = old_info;
1292                }
1293            }
1294            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1295        }
1296
1297        // add storage changeset changes
1298        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1299            let BlockNumberAddress((block_number, address)) = block_and_address;
1300            let account_state = match state.entry(address) {
1301                hash_map::Entry::Vacant(entry) => {
1302                    let hashed_address = keccak256(address);
1303                    let present_info =
1304                        hashed_accounts_cursor.seek_exact(hashed_address)?.map(|kv| kv.1);
1305                    entry.insert((present_info, present_info, HashMap::default()))
1306                }
1307                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1308            };
1309
1310            let storage_key = old_storage.key.as_b256();
1311            match account_state.2.entry(storage_key) {
1312                hash_map::Entry::Vacant(entry) => {
1313                    let hashed_address = keccak256(address);
1314                    let new_storage = hashed_storage_cursor
1315                        .seek_by_key_subkey(hashed_address, storage_key)?
1316                        .filter(|storage| storage.key == storage_key)
1317                        .unwrap_or_default();
1318                    entry.insert((old_storage.value, new_storage.value));
1319                }
1320                hash_map::Entry::Occupied(mut entry) => {
1321                    entry.get_mut().0 = old_storage.value;
1322                }
1323            };
1324
1325            reverts
1326                .entry(block_number)
1327                .or_default()
1328                .entry(address)
1329                .or_default()
1330                .1
1331                .push(old_storage.into_storage_entry());
1332        }
1333
1334        Ok((state, reverts))
1335    }
1336}
1337
1338impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1339    /// Insert history index to the database.
1340    ///
1341    /// For each updated partial key, this function retrieves the last shard from the database
1342    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1343    /// the shards back into the database.
1344    ///
1345    /// This function is used by history indexing stages.
1346    fn append_history_index<P, T>(
1347        &self,
1348        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1349        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1350    ) -> ProviderResult<()>
1351    where
1352        P: Copy,
1353        T: Table<Value = BlockNumberList>,
1354    {
1355        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1356        // will append duplicate entries instead of updating existing ones, causing data corruption.
1357        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1358
1359        let mut cursor = self.tx.cursor_write::<T>()?;
1360
1361        for (partial_key, indices) in index_updates {
1362            let last_key = sharded_key_factory(partial_key, u64::MAX);
1363            let mut last_shard = cursor
1364                .seek_exact(last_key.clone())?
1365                .map(|(_, list)| list)
1366                .unwrap_or_else(BlockNumberList::empty);
1367
1368            last_shard.append(indices).map_err(ProviderError::other)?;
1369
1370            // fast path: all indices fit in one shard
1371            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1372                cursor.upsert(last_key, &last_shard)?;
1373                continue;
1374            }
1375
1376            // slow path: rechunk into multiple shards
1377            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1378            let mut chunks_peekable = chunks.into_iter().peekable();
1379
1380            while let Some(chunk) = chunks_peekable.next() {
1381                let shard = BlockNumberList::new_pre_sorted(chunk);
1382                let highest_block_number = if chunks_peekable.peek().is_some() {
1383                    shard.iter().next_back().expect("`chunks` does not return empty list")
1384                } else {
1385                    // Insert last list with `u64::MAX`.
1386                    u64::MAX
1387                };
1388
1389                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1390            }
1391        }
1392
1393        Ok(())
1394    }
1395}
1396
1397impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1398    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1399        if self.cached_storage_settings().use_hashed_state() {
1400            let hashed_address = keccak256(address);
1401            Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1402        } else {
1403            Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1404        }
1405    }
1406}
1407
1408impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1409    fn changed_accounts_with_range(
1410        &self,
1411        range: RangeInclusive<BlockNumber>,
1412    ) -> ProviderResult<BTreeSet<Address>> {
1413        let mut reader = EitherReader::new_account_changesets(self)?;
1414
1415        reader.changed_accounts_with_range(range)
1416    }
1417
1418    fn basic_accounts(
1419        &self,
1420        iter: impl IntoIterator<Item = Address>,
1421    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1422        if self.cached_storage_settings().use_hashed_state() {
1423            let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1424            Ok(iter
1425                .into_iter()
1426                .map(|address| {
1427                    let hashed_address = keccak256(address);
1428                    hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1429                })
1430                .collect::<Result<Vec<_>, _>>()?)
1431        } else {
1432            let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1433            Ok(iter
1434                .into_iter()
1435                .map(|address| {
1436                    plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1437                })
1438                .collect::<Result<Vec<_>, _>>()?)
1439        }
1440    }
1441
1442    fn changed_accounts_and_blocks_with_range(
1443        &self,
1444        range: RangeInclusive<BlockNumber>,
1445    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1446        let highest_static_block = self
1447            .static_file_provider
1448            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1449
1450        if let Some(highest) = highest_static_block &&
1451            self.cached_storage_settings().storage_v2
1452        {
1453            let start = *range.start();
1454            let static_end = (*range.end()).min(highest);
1455
1456            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1457            if start <= static_end {
1458                for block in start..=static_end {
1459                    let block_changesets = self.account_block_changeset(block)?;
1460                    for changeset in block_changesets {
1461                        changed_accounts_and_blocks
1462                            .entry(changeset.address)
1463                            .or_default()
1464                            .push(block);
1465                    }
1466                }
1467            }
1468
1469            Ok(changed_accounts_and_blocks)
1470        } else {
1471            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1472
1473            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1474                BTreeMap::new(),
1475                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1476                    let (index, account) = entry?;
1477                    accounts.entry(account.address).or_default().push(index);
1478                    Ok(accounts)
1479                },
1480            )?;
1481
1482            Ok(account_transitions)
1483        }
1484    }
1485}
1486
1487impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1488    fn storage_changeset(
1489        &self,
1490        block_number: BlockNumber,
1491    ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1492        if self.cached_storage_settings().storage_v2 {
1493            self.static_file_provider.storage_changeset(block_number)
1494        } else {
1495            let range = block_number..=block_number;
1496            let storage_range = BlockNumberAddress::range(range);
1497            self.tx
1498                .cursor_dup_read::<tables::StorageChangeSets>()?
1499                .walk_range(storage_range)?
1500                .map(|r| {
1501                    let (bna, entry) = r?;
1502                    Ok((
1503                        bna,
1504                        ChangesetEntry {
1505                            key: StorageSlotKey::plain(entry.key),
1506                            value: entry.value,
1507                        },
1508                    ))
1509                })
1510                .collect()
1511        }
1512    }
1513
1514    fn get_storage_before_block(
1515        &self,
1516        block_number: BlockNumber,
1517        address: Address,
1518        storage_key: B256,
1519    ) -> ProviderResult<Option<ChangesetEntry>> {
1520        if self.cached_storage_settings().storage_v2 {
1521            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1522        } else {
1523            Ok(self
1524                .tx
1525                .cursor_dup_read::<tables::StorageChangeSets>()?
1526                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1527                .filter(|entry| entry.key == storage_key)
1528                .map(|entry| ChangesetEntry {
1529                    key: StorageSlotKey::plain(entry.key),
1530                    value: entry.value,
1531                }))
1532        }
1533    }
1534
1535    fn storage_changesets_range(
1536        &self,
1537        range: impl RangeBounds<BlockNumber>,
1538    ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1539        if self.cached_storage_settings().storage_v2 {
1540            self.static_file_provider.storage_changesets_range(range)
1541        } else {
1542            self.tx
1543                .cursor_dup_read::<tables::StorageChangeSets>()?
1544                .walk_range(BlockNumberAddressRange::from(range))?
1545                .map(|r| {
1546                    let (bna, entry) = r?;
1547                    Ok((
1548                        bna,
1549                        ChangesetEntry {
1550                            key: StorageSlotKey::plain(entry.key),
1551                            value: entry.value,
1552                        },
1553                    ))
1554                })
1555                .collect()
1556        }
1557    }
1558
1559    fn storage_changeset_count(&self) -> ProviderResult<usize> {
1560        if self.cached_storage_settings().storage_v2 {
1561            self.static_file_provider.storage_changeset_count()
1562        } else {
1563            Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1564        }
1565    }
1566}
1567
1568impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1569    fn account_block_changeset(
1570        &self,
1571        block_number: BlockNumber,
1572    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1573        if self.cached_storage_settings().storage_v2 {
1574            let static_changesets =
1575                self.static_file_provider.account_block_changeset(block_number)?;
1576            Ok(static_changesets)
1577        } else {
1578            let range = block_number..=block_number;
1579            self.tx
1580                .cursor_read::<tables::AccountChangeSets>()?
1581                .walk_range(range)?
1582                .map(|result| -> ProviderResult<_> {
1583                    let (_, account_before) = result?;
1584                    Ok(account_before)
1585                })
1586                .collect()
1587        }
1588    }
1589
1590    fn get_account_before_block(
1591        &self,
1592        block_number: BlockNumber,
1593        address: Address,
1594    ) -> ProviderResult<Option<AccountBeforeTx>> {
1595        if self.cached_storage_settings().storage_v2 {
1596            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1597        } else {
1598            self.tx
1599                .cursor_dup_read::<tables::AccountChangeSets>()?
1600                .seek_by_key_subkey(block_number, address)?
1601                .filter(|acc| acc.address == address)
1602                .map(Ok)
1603                .transpose()
1604        }
1605    }
1606
1607    fn account_changesets_range(
1608        &self,
1609        range: impl core::ops::RangeBounds<BlockNumber>,
1610    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1611        if self.cached_storage_settings().storage_v2 {
1612            self.static_file_provider.account_changesets_range(range)
1613        } else {
1614            self.tx
1615                .cursor_read::<tables::AccountChangeSets>()?
1616                .walk_range(to_range(range))?
1617                .map(|r| r.map_err(Into::into))
1618                .collect()
1619        }
1620    }
1621
1622    fn account_changeset_count(&self) -> ProviderResult<usize> {
1623        // check if account changesets are in static files, otherwise just count the changeset
1624        // entries in the DB
1625        if self.cached_storage_settings().storage_v2 {
1626            self.static_file_provider.account_changeset_count()
1627        } else {
1628            Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1629        }
1630    }
1631}
1632
1633impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1634    for DatabaseProvider<TX, N>
1635{
1636    type Header = HeaderTy<N>;
1637
1638    fn local_tip_header(
1639        &self,
1640        highest_uninterrupted_block: BlockNumber,
1641    ) -> ProviderResult<SealedHeader<Self::Header>> {
1642        let static_file_provider = self.static_file_provider();
1643
1644        // Make sure Headers static file is at the same height. If it's further, this
1645        // input execution was interrupted previously and we need to unwind the static file.
1646        let next_static_file_block_num = static_file_provider
1647            .get_highest_static_file_block(StaticFileSegment::Headers)
1648            .map(|id| id + 1)
1649            .unwrap_or_default();
1650        let next_block = highest_uninterrupted_block + 1;
1651
1652        match next_static_file_block_num.cmp(&next_block) {
1653            // The node shutdown between an executed static file commit and before the database
1654            // commit, so we need to unwind the static files.
1655            Ordering::Greater => {
1656                let mut static_file_producer =
1657                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1658                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1659                // Since this is a database <-> static file inconsistency, we commit the change
1660                // straight away.
1661                static_file_producer.commit()?
1662            }
1663            Ordering::Less => {
1664                // There's either missing or corrupted files.
1665                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1666            }
1667            Ordering::Equal => {}
1668        }
1669
1670        let local_head = static_file_provider
1671            .sealed_header(highest_uninterrupted_block)?
1672            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1673
1674        Ok(local_head)
1675    }
1676}
1677
1678impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1679    type Header = HeaderTy<N>;
1680
1681    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1682        if let Some(num) = self.block_number(block_hash)? {
1683            Ok(self.header_by_number(num)?)
1684        } else {
1685            Ok(None)
1686        }
1687    }
1688
1689    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1690        self.static_file_provider.header_by_number(num)
1691    }
1692
1693    fn headers_range(
1694        &self,
1695        range: impl RangeBounds<BlockNumber>,
1696    ) -> ProviderResult<Vec<Self::Header>> {
1697        self.static_file_provider.headers_range(range)
1698    }
1699
1700    fn sealed_header(
1701        &self,
1702        number: BlockNumber,
1703    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1704        self.static_file_provider.sealed_header(number)
1705    }
1706
1707    fn sealed_headers_while(
1708        &self,
1709        range: impl RangeBounds<BlockNumber>,
1710        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1711    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1712        self.static_file_provider.sealed_headers_while(range, predicate)
1713    }
1714}
1715
1716impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1717    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1718        self.static_file_provider.block_hash(number)
1719    }
1720
1721    fn canonical_hashes_range(
1722        &self,
1723        start: BlockNumber,
1724        end: BlockNumber,
1725    ) -> ProviderResult<Vec<B256>> {
1726        self.static_file_provider.canonical_hashes_range(start, end)
1727    }
1728}
1729
1730impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1731    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1732        let best_number = self.best_block_number()?;
1733        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1734        Ok(ChainInfo { best_hash, best_number })
1735    }
1736
1737    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1738        // The best block number is tracked via the finished stage which gets updated in the same tx
1739        // when new blocks committed
1740        Ok(self
1741            .get_stage_checkpoint(StageId::Finish)?
1742            .map(|checkpoint| checkpoint.block_number)
1743            .unwrap_or_default())
1744    }
1745
1746    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1747        self.static_file_provider.last_block_number()
1748    }
1749
1750    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1751        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1752    }
1753}
1754
1755impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1756    type Block = BlockTy<N>;
1757
1758    fn find_block_by_hash(
1759        &self,
1760        hash: B256,
1761        source: BlockSource,
1762    ) -> ProviderResult<Option<Self::Block>> {
1763        if source.is_canonical() {
1764            self.block(hash.into())
1765        } else {
1766            Ok(None)
1767        }
1768    }
1769
1770    /// Returns the block with matching number from database.
1771    ///
1772    /// If the header for this block is not found, this returns `None`.
1773    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1774    /// will return None.
1775    ///
1776    /// Returns an error if the requested block is below the earliest available history.
1777    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1778        if let Some(number) = self.convert_hash_or_number(id)? {
1779            let earliest_available = self.static_file_provider.earliest_history_height();
1780            if number < earliest_available {
1781                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1782            }
1783
1784            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1785
1786            // If the body indices are not found, this means that the transactions either do not
1787            // exist in the database yet, or they do exit but are not indexed.
1788            // If they exist but are not indexed, we don't have enough
1789            // information to return the block anyways, so we return `None`.
1790            let Some(transactions) = self.transactions_by_block(number.into())? else {
1791                return Ok(None)
1792            };
1793
1794            let body = self
1795                .storage
1796                .reader()
1797                .read_block_bodies(self, vec![(&header, transactions)])?
1798                .pop()
1799                .ok_or(ProviderError::InvalidStorageOutput)?;
1800
1801            return Ok(Some(Self::Block::new(header, body)))
1802        }
1803
1804        Ok(None)
1805    }
1806
1807    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1808        Ok(None)
1809    }
1810
1811    fn pending_block_and_receipts(
1812        &self,
1813    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1814        Ok(None)
1815    }
1816
1817    /// Returns the block with senders with matching number or hash from database.
1818    ///
1819    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1820    /// spot, and we want fast querying.**
1821    ///
1822    /// If the header for this block is not found, this returns `None`.
1823    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1824    /// will return None.
1825    fn recovered_block(
1826        &self,
1827        id: BlockHashOrNumber,
1828        transaction_kind: TransactionVariant,
1829    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1830        self.recovered_block(
1831            id,
1832            transaction_kind,
1833            |block_number| self.header_by_number(block_number),
1834            |header, body, senders| {
1835                Self::Block::new(header, body)
1836                    // Note: we're using unchecked here because we know the block contains valid txs
1837                    // wrt to its height and can ignore the s value check so pre
1838                    // EIP-2 txs are allowed
1839                    .try_into_recovered_unchecked(senders)
1840                    .map(Some)
1841                    .map_err(|_| ProviderError::SenderRecoveryError)
1842            },
1843        )
1844    }
1845
1846    fn sealed_block_with_senders(
1847        &self,
1848        id: BlockHashOrNumber,
1849        transaction_kind: TransactionVariant,
1850    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1851        self.recovered_block(
1852            id,
1853            transaction_kind,
1854            |block_number| self.sealed_header(block_number),
1855            |header, body, senders| {
1856                Self::Block::new_sealed(header, body)
1857                    // Note: we're using unchecked here because we know the block contains valid txs
1858                    // wrt to its height and can ignore the s value check so pre
1859                    // EIP-2 txs are allowed
1860                    .try_with_senders_unchecked(senders)
1861                    .map(Some)
1862                    .map_err(|_| ProviderError::SenderRecoveryError)
1863            },
1864        )
1865    }
1866
1867    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1868        self.block_range(
1869            range,
1870            |range| self.headers_range(range),
1871            |header, body, _| Ok(Self::Block::new(header, body)),
1872        )
1873    }
1874
1875    fn block_with_senders_range(
1876        &self,
1877        range: RangeInclusive<BlockNumber>,
1878    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1879        self.block_with_senders_range(
1880            range,
1881            |range| self.headers_range(range),
1882            |header, body, senders| {
1883                Self::Block::new(header, body)
1884                    .try_into_recovered_unchecked(senders)
1885                    .map_err(|_| ProviderError::SenderRecoveryError)
1886            },
1887        )
1888    }
1889
1890    fn recovered_block_range(
1891        &self,
1892        range: RangeInclusive<BlockNumber>,
1893    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1894        self.block_with_senders_range(
1895            range,
1896            |range| self.sealed_headers_range(range),
1897            |header, body, senders| {
1898                Self::Block::new_sealed(header, body)
1899                    .try_with_senders(senders)
1900                    .map_err(|_| ProviderError::SenderRecoveryError)
1901            },
1902        )
1903    }
1904
1905    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1906        Ok(self
1907            .tx
1908            .cursor_read::<tables::TransactionBlocks>()?
1909            .seek(id)
1910            .map(|b| b.map(|(_, bn)| bn))?)
1911    }
1912}
1913
1914impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1915    for DatabaseProvider<TX, N>
1916{
1917    /// Recovers transaction hashes by walking through `Transactions` table and
1918    /// calculating them in a parallel manner. Returned unsorted.
1919    fn transaction_hashes_by_range(
1920        &self,
1921        tx_range: Range<TxNumber>,
1922    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1923        self.static_file_provider.transaction_hashes_by_range(tx_range)
1924    }
1925}
1926
1927// Calculates the hash of the given transaction
1928impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1929    type Transaction = TxTy<N>;
1930
1931    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1932        self.with_rocksdb_tx(|tx_ref| {
1933            let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1934            reader.get_transaction_hash_number(tx_hash)
1935        })
1936    }
1937
1938    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1939        self.static_file_provider.transaction_by_id(id)
1940    }
1941
1942    fn transaction_by_id_unhashed(
1943        &self,
1944        id: TxNumber,
1945    ) -> ProviderResult<Option<Self::Transaction>> {
1946        self.static_file_provider.transaction_by_id_unhashed(id)
1947    }
1948
1949    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1950        if let Some(id) = self.transaction_id(hash)? {
1951            Ok(self.transaction_by_id_unhashed(id)?)
1952        } else {
1953            Ok(None)
1954        }
1955    }
1956
1957    fn transaction_by_hash_with_meta(
1958        &self,
1959        tx_hash: TxHash,
1960    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1961        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1962            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1963            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1964            let Some(sealed_header) = self.sealed_header(block_number)?
1965        {
1966            let (header, block_hash) = sealed_header.split();
1967            if let Some(block_body) = self.block_body_indices(block_number)? {
1968                // the index of the tx in the block is the offset:
1969                // len([start..tx_id])
1970                // NOTE: `transaction_id` is always `>=` the block's first
1971                // index
1972                let index = transaction_id - block_body.first_tx_num();
1973
1974                let meta = TransactionMeta {
1975                    tx_hash,
1976                    index,
1977                    block_hash,
1978                    block_number,
1979                    base_fee: header.base_fee_per_gas(),
1980                    excess_blob_gas: header.excess_blob_gas(),
1981                    timestamp: header.timestamp(),
1982                };
1983
1984                return Ok(Some((transaction, meta)))
1985            }
1986        }
1987
1988        Ok(None)
1989    }
1990
1991    fn transactions_by_block(
1992        &self,
1993        id: BlockHashOrNumber,
1994    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1995        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1996            let Some(body) = self.block_body_indices(block_number)?
1997        {
1998            let tx_range = body.tx_num_range();
1999            return if tx_range.is_empty() {
2000                Ok(Some(Vec::new()))
2001            } else {
2002                self.transactions_by_tx_range(tx_range).map(Some)
2003            }
2004        }
2005        Ok(None)
2006    }
2007
2008    fn transactions_by_block_range(
2009        &self,
2010        range: impl RangeBounds<BlockNumber>,
2011    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2012        let range = to_range(range);
2013
2014        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2015            .into_iter()
2016            .map(|body| {
2017                let tx_num_range = body.tx_num_range();
2018                if tx_num_range.is_empty() {
2019                    Ok(Vec::new())
2020                } else {
2021                    self.transactions_by_tx_range(tx_num_range)
2022                }
2023            })
2024            .collect()
2025    }
2026
2027    fn transactions_by_tx_range(
2028        &self,
2029        range: impl RangeBounds<TxNumber>,
2030    ) -> ProviderResult<Vec<Self::Transaction>> {
2031        self.static_file_provider.transactions_by_tx_range(range)
2032    }
2033
2034    fn senders_by_tx_range(
2035        &self,
2036        range: impl RangeBounds<TxNumber>,
2037    ) -> ProviderResult<Vec<Address>> {
2038        if EitherWriterDestination::senders(self).is_static_file() {
2039            self.static_file_provider.senders_by_tx_range(range)
2040        } else {
2041            self.cursor_read_collect::<tables::TransactionSenders>(range)
2042        }
2043    }
2044
2045    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2046        if EitherWriterDestination::senders(self).is_static_file() {
2047            self.static_file_provider.transaction_sender(id)
2048        } else {
2049            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2050        }
2051    }
2052}
2053
2054impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2055    type Receipt = ReceiptTy<N>;
2056
2057    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2058        self.static_file_provider.get_with_static_file_or_database(
2059            StaticFileSegment::Receipts,
2060            id,
2061            |static_file| static_file.receipt(id),
2062            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2063        )
2064    }
2065
2066    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2067        if let Some(id) = self.transaction_id(hash)? {
2068            self.receipt(id)
2069        } else {
2070            Ok(None)
2071        }
2072    }
2073
2074    fn receipts_by_block(
2075        &self,
2076        block: BlockHashOrNumber,
2077    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2078        if let Some(number) = self.convert_hash_or_number(block)? &&
2079            let Some(body) = self.block_body_indices(number)?
2080        {
2081            let tx_range = body.tx_num_range();
2082            return if tx_range.is_empty() {
2083                Ok(Some(Vec::new()))
2084            } else {
2085                self.receipts_by_tx_range(tx_range).map(Some)
2086            }
2087        }
2088        Ok(None)
2089    }
2090
2091    fn receipts_by_tx_range(
2092        &self,
2093        range: impl RangeBounds<TxNumber>,
2094    ) -> ProviderResult<Vec<Self::Receipt>> {
2095        self.static_file_provider.get_range_with_static_file_or_database(
2096            StaticFileSegment::Receipts,
2097            to_range(range),
2098            |static_file, range, _| static_file.receipts_by_tx_range(range),
2099            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2100            |_| true,
2101        )
2102    }
2103
2104    fn receipts_by_block_range(
2105        &self,
2106        block_range: RangeInclusive<BlockNumber>,
2107    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2108        if block_range.is_empty() {
2109            return Ok(Vec::new());
2110        }
2111
2112        // collect block body indices for each block in the range
2113        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2114        let mut block_body_indices = Vec::with_capacity(range_len);
2115        for block_num in block_range {
2116            if let Some(indices) = self.block_body_indices(block_num)? {
2117                block_body_indices.push(indices);
2118            } else {
2119                // use default indices for missing blocks (empty block)
2120                block_body_indices.push(StoredBlockBodyIndices::default());
2121            }
2122        }
2123
2124        if block_body_indices.is_empty() {
2125            return Ok(Vec::new());
2126        }
2127
2128        // find blocks with transactions to determine transaction range
2129        let non_empty_blocks: Vec<_> =
2130            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2131
2132        if non_empty_blocks.is_empty() {
2133            // all blocks are empty
2134            return Ok(vec![Vec::new(); block_body_indices.len()]);
2135        }
2136
2137        // calculate the overall transaction range
2138        let first_tx = non_empty_blocks[0].first_tx_num();
2139        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2140
2141        // fetch all receipts in the transaction range
2142        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2143        let mut receipts_iter = all_receipts.into_iter();
2144
2145        // distribute receipts to their respective blocks
2146        let mut result = Vec::with_capacity(block_body_indices.len());
2147        for indices in &block_body_indices {
2148            if indices.tx_count == 0 {
2149                result.push(Vec::new());
2150            } else {
2151                let block_receipts =
2152                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2153                result.push(block_receipts);
2154            }
2155        }
2156
2157        Ok(result)
2158    }
2159}
2160
2161impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2162    for DatabaseProvider<TX, N>
2163{
2164    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2165        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2166    }
2167
2168    fn block_body_indices_range(
2169        &self,
2170        range: RangeInclusive<BlockNumber>,
2171    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2172        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2173    }
2174}
2175
2176impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2177    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2178        Ok(if let Some(encoded) = id.get_pre_encoded() {
2179            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2180        } else {
2181            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2182        })
2183    }
2184
2185    /// Get stage checkpoint progress.
2186    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2187        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2188    }
2189
2190    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2191        self.tx
2192            .cursor_read::<tables::StageCheckpoints>()?
2193            .walk(None)?
2194            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2195            .map_err(ProviderError::Database)
2196    }
2197}
2198
2199impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2200    /// Save stage checkpoint.
2201    fn save_stage_checkpoint(
2202        &self,
2203        id: StageId,
2204        checkpoint: StageCheckpoint,
2205    ) -> ProviderResult<()> {
2206        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2207    }
2208
2209    /// Save stage checkpoint progress.
2210    fn save_stage_checkpoint_progress(
2211        &self,
2212        id: StageId,
2213        checkpoint: Vec<u8>,
2214    ) -> ProviderResult<()> {
2215        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2216    }
2217
2218    #[instrument(level = "debug", target = "providers::db", skip_all)]
2219    fn update_pipeline_stages(
2220        &self,
2221        block_number: BlockNumber,
2222        drop_stage_checkpoint: bool,
2223    ) -> ProviderResult<()> {
2224        // iterate over all existing stages in the table and update its progress.
2225        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2226        for stage_id in StageId::ALL {
2227            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2228            cursor.upsert(
2229                stage_id.to_string(),
2230                &StageCheckpoint {
2231                    block_number,
2232                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2233                },
2234            )?;
2235        }
2236
2237        Ok(())
2238    }
2239}
2240
2241impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2242    fn plain_state_storages(
2243        &self,
2244        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2245    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2246        if self.cached_storage_settings().use_hashed_state() {
2247            let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2248
2249            addresses_with_keys
2250                .into_iter()
2251                .map(|(address, storage)| {
2252                    let hashed_address = keccak256(address);
2253                    storage
2254                        .into_iter()
2255                        .map(|key| -> ProviderResult<_> {
2256                            let hashed_key = keccak256(key);
2257                            let value = hashed_storage
2258                                .seek_by_key_subkey(hashed_address, hashed_key)?
2259                                .filter(|v| v.key == hashed_key)
2260                                .map(|v| v.value)
2261                                .unwrap_or_default();
2262                            Ok(StorageEntry { key, value })
2263                        })
2264                        .collect::<ProviderResult<Vec<_>>>()
2265                        .map(|storage| (address, storage))
2266                })
2267                .collect::<ProviderResult<Vec<(_, _)>>>()
2268        } else {
2269            let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2270
2271            addresses_with_keys
2272                .into_iter()
2273                .map(|(address, storage)| {
2274                    storage
2275                        .into_iter()
2276                        .map(|key| -> ProviderResult<_> {
2277                            Ok(plain_storage
2278                                .seek_by_key_subkey(address, key)?
2279                                .filter(|v| v.key == key)
2280                                .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2281                        })
2282                        .collect::<ProviderResult<Vec<_>>>()
2283                        .map(|storage| (address, storage))
2284                })
2285                .collect::<ProviderResult<Vec<(_, _)>>>()
2286        }
2287    }
2288
2289    fn changed_storages_with_range(
2290        &self,
2291        range: RangeInclusive<BlockNumber>,
2292    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2293        if self.cached_storage_settings().storage_v2 {
2294            self.storage_changesets_range(range)?.into_iter().try_fold(
2295                BTreeMap::new(),
2296                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2297                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2298                    accounts.entry(address).or_default().insert(storage_entry.key.as_b256());
2299                    Ok(accounts)
2300                },
2301            )
2302        } else {
2303            self.tx
2304                .cursor_read::<tables::StorageChangeSets>()?
2305                .walk_range(BlockNumberAddress::range(range))?
2306                // fold all storages and save its old state so we can remove it from HashedStorage
2307                // it is needed as it is dup table.
2308                .try_fold(
2309                    BTreeMap::new(),
2310                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2311                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2312                        accounts.entry(address).or_default().insert(storage_entry.key);
2313                        Ok(accounts)
2314                    },
2315                )
2316        }
2317    }
2318
2319    fn changed_storages_and_blocks_with_range(
2320        &self,
2321        range: RangeInclusive<BlockNumber>,
2322    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2323        if self.cached_storage_settings().storage_v2 {
2324            self.storage_changesets_range(range)?.into_iter().try_fold(
2325                BTreeMap::new(),
2326                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2327                    storages
2328                        .entry((index.address(), storage.key.as_b256()))
2329                        .or_default()
2330                        .push(index.block_number());
2331                    Ok(storages)
2332                },
2333            )
2334        } else {
2335            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2336
2337            let storage_changeset_lists =
2338                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2339                    BTreeMap::new(),
2340                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2341                     entry|
2342                     -> ProviderResult<_> {
2343                        let (index, storage) = entry?;
2344                        storages
2345                            .entry((index.address(), storage.key))
2346                            .or_default()
2347                            .push(index.block_number());
2348                        Ok(storages)
2349                    },
2350                )?;
2351
2352            Ok(storage_changeset_lists)
2353        }
2354    }
2355}
2356
2357impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2358    for DatabaseProvider<TX, N>
2359{
2360    type Receipt = ReceiptTy<N>;
2361
2362    #[instrument(level = "debug", target = "providers::db", skip_all)]
2363    fn write_state<'a>(
2364        &self,
2365        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2366        is_value_known: OriginalValuesKnown,
2367        config: StateWriteConfig,
2368    ) -> ProviderResult<()> {
2369        let execution_outcome = execution_outcome.into();
2370        let first_block = execution_outcome.first_block();
2371
2372        let (plain_state, reverts) =
2373            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2374
2375        self.write_state_reverts(reverts, first_block, config)?;
2376        self.write_state_changes(plain_state)?;
2377
2378        if !config.write_receipts {
2379            return Ok(());
2380        }
2381
2382        let block_count = execution_outcome.len() as u64;
2383        let last_block = execution_outcome.last_block();
2384        let block_range = first_block..=last_block;
2385
2386        let tip = self.last_block_number()?.max(last_block);
2387
2388        // Fetch the first transaction number for each block in the range
2389        let block_indices: Vec<_> = self
2390            .block_body_indices_range(block_range)?
2391            .into_iter()
2392            .map(|b| b.first_tx_num)
2393            .collect();
2394
2395        // Ensure all expected blocks are present.
2396        if block_indices.len() < block_count as usize {
2397            let missing_blocks = block_count - block_indices.len() as u64;
2398            return Err(ProviderError::BlockBodyIndicesNotFound(
2399                last_block.saturating_sub(missing_blocks - 1),
2400            ));
2401        }
2402
2403        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2404
2405        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2406        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2407
2408        // All receipts from the last 128 blocks are required for blockchain tree, even with
2409        // [`PruneSegment::ContractLogs`].
2410        //
2411        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2412        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2413        // files yet. Once receipts exist in static files, we must continue writing to maintain
2414        // continuity and have no gaps.
2415        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2416            self.static_file_provider()
2417                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2418                .is_none()) &&
2419            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2420
2421        // Prepare set of addresses which logs should not be pruned.
2422        let mut allowed_addresses: AddressSet = AddressSet::default();
2423        for (_, addresses) in contract_log_pruner.range(..first_block) {
2424            allowed_addresses.extend(addresses.iter().copied());
2425        }
2426
2427        for (idx, (receipts, first_tx_index)) in
2428            execution_outcome.receipts().zip(block_indices).enumerate()
2429        {
2430            let block_number = first_block + idx as u64;
2431
2432            // Increment block number for receipts static file writer
2433            receipts_writer.increment_block(block_number)?;
2434
2435            // Skip writing receipts if pruning configuration requires us to.
2436            if prunable_receipts &&
2437                self.prune_modes
2438                    .receipts
2439                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2440            {
2441                continue
2442            }
2443
2444            // If there are new addresses to retain after this block number, track them
2445            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2446                allowed_addresses.extend(new_addresses.iter().copied());
2447            }
2448
2449            for (idx, receipt) in receipts.iter().enumerate() {
2450                let receipt_idx = first_tx_index + idx as u64;
2451                // Skip writing receipt if log filter is active and it does not have any logs to
2452                // retain
2453                if prunable_receipts &&
2454                    has_contract_log_filter &&
2455                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2456                {
2457                    continue
2458                }
2459
2460                receipts_writer.append_receipt(receipt_idx, receipt)?;
2461            }
2462        }
2463
2464        Ok(())
2465    }
2466
2467    fn write_state_reverts(
2468        &self,
2469        reverts: PlainStateReverts,
2470        first_block: BlockNumber,
2471        config: StateWriteConfig,
2472    ) -> ProviderResult<()> {
2473        let use_hashed_state = self.cached_storage_settings().use_hashed_state();
2474
2475        // Write storage changes
2476        if config.write_storage_changesets {
2477            tracing::trace!("Writing storage changes");
2478            let mut storages_cursor =
2479                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2480            let mut hashed_storages_cursor =
2481                self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2482            for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2483                let block_number = first_block + block_index as BlockNumber;
2484
2485                tracing::trace!(block_number, "Writing block change");
2486                // sort changes by address.
2487                storage_changes.par_sort_unstable_by_key(|a| a.address);
2488                let total_changes =
2489                    storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2490                let mut changeset = Vec::with_capacity(total_changes);
2491                for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2492                    let mut storage = storage_revert
2493                        .into_iter()
2494                        .map(|(k, v)| {
2495                            (StorageSlotKey::from_u256(k).to_changeset_key(use_hashed_state), v)
2496                        })
2497                        .collect::<Vec<_>>();
2498                    // sort storage slots by key.
2499                    storage.par_sort_unstable_by_key(|a| a.0);
2500
2501                    // If we are writing the primary storage wipe transition, the pre-existing
2502                    // storage state has to be taken from the database and written to storage
2503                    // history. See [StorageWipe::Primary] for more details.
2504                    //
2505                    // When `use_hashed_state` is enabled, we read from `HashedStorages`
2506                    // instead of `PlainStorageState`. The hashed entries already have
2507                    // `keccak256(slot)` keys which is exactly the format needed for hashed
2508                    // changesets (static file changesets always use hashed keys when
2509                    // `use_hashed_state` is true).
2510                    //
2511                    // TODO(mediocregopher): This could be rewritten in a way which doesn't
2512                    // require collecting wiped entries into a Vec like this, see
2513                    // `write_storage_trie_changesets`.
2514                    let mut wiped_storage = Vec::new();
2515                    if wiped {
2516                        tracing::trace!(?address, "Wiping storage");
2517                        if use_hashed_state {
2518                            let hashed_address = keccak256(address);
2519                            if let Some((_, entry)) =
2520                                hashed_storages_cursor.seek_exact(hashed_address)?
2521                            {
2522                                wiped_storage.push((entry.key, entry.value));
2523                                while let Some(entry) = hashed_storages_cursor.next_dup_val()? {
2524                                    wiped_storage.push((entry.key, entry.value))
2525                                }
2526                            }
2527                        } else if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2528                            wiped_storage.push((entry.key, entry.value));
2529                            while let Some(entry) = storages_cursor.next_dup_val()? {
2530                                wiped_storage.push((entry.key, entry.value))
2531                            }
2532                        }
2533                    }
2534
2535                    tracing::trace!(?address, ?storage, "Writing storage reverts");
2536                    for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2537                        changeset.push(StorageBeforeTx { address, key, value });
2538                    }
2539                }
2540
2541                let mut storage_changesets_writer =
2542                    EitherWriter::new_storage_changesets(self, block_number)?;
2543                storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2544            }
2545        }
2546
2547        if !config.write_account_changesets {
2548            return Ok(());
2549        }
2550
2551        // Write account changes
2552        tracing::trace!(?first_block, "Writing account changes");
2553        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2554            let block_number = first_block + block_index as BlockNumber;
2555            let changeset = account_block_reverts
2556                .into_iter()
2557                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2558                .collect::<Vec<_>>();
2559            let mut account_changesets_writer =
2560                EitherWriter::new_account_changesets(self, block_number)?;
2561
2562            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2563        }
2564
2565        Ok(())
2566    }
2567
2568    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2569        // sort all entries so they can be written to database in more performant way.
2570        // and take smaller memory footprint.
2571        changes.accounts.par_sort_by_key(|a| a.0);
2572        changes.storage.par_sort_by_key(|a| a.address);
2573        changes.contracts.par_sort_by_key(|a| a.0);
2574
2575        // When use_hashed_state is enabled, skip plain state writes for accounts and storage.
2576        // The hashed state is already written by the separate `write_hashed_state()` call.
2577        // Bytecode writes remain unconditional since Bytecodes is not a plain/hashed table.
2578        if !self.cached_storage_settings().use_hashed_state() {
2579            // Write new account state
2580            tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2581            let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2582            // write account to database.
2583            for (address, account) in changes.accounts {
2584                if let Some(account) = account {
2585                    tracing::trace!(?address, "Updating plain state account");
2586                    accounts_cursor.upsert(address, &account.into())?;
2587                } else if accounts_cursor.seek_exact(address)?.is_some() {
2588                    tracing::trace!(?address, "Deleting plain state account");
2589                    accounts_cursor.delete_current()?;
2590                }
2591            }
2592
2593            // Write new storage state and wipe storage if needed.
2594            tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2595            let mut storages_cursor =
2596                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2597            for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2598                // Wiping of storage.
2599                if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2600                    storages_cursor.delete_current_duplicates()?;
2601                }
2602                // cast storages to B256.
2603                let mut storage = storage
2604                    .into_iter()
2605                    .map(|(k, value)| StorageEntry { key: k.into(), value })
2606                    .collect::<Vec<_>>();
2607                // sort storage slots by key.
2608                storage.par_sort_unstable_by_key(|a| a.key);
2609
2610                for entry in storage {
2611                    tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2612                    if let Some(db_entry) =
2613                        storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2614                        db_entry.key == entry.key
2615                    {
2616                        storages_cursor.delete_current()?;
2617                    }
2618
2619                    if !entry.value.is_zero() {
2620                        storages_cursor.upsert(address, &entry)?;
2621                    }
2622                }
2623            }
2624        }
2625
2626        // Write bytecode
2627        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2628        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
2629        for (hash, bytecode) in changes.contracts {
2630            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
2631        }
2632
2633        Ok(())
2634    }
2635
2636    #[instrument(level = "debug", target = "providers::db", skip_all)]
2637    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2638        // Write hashed account updates.
2639        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2640        for (hashed_address, account) in hashed_state.accounts() {
2641            if let Some(account) = account {
2642                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2643            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2644                hashed_accounts_cursor.delete_current()?;
2645            }
2646        }
2647
2648        // Write hashed storage changes.
2649        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2650        let mut hashed_storage_cursor =
2651            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2652        for (hashed_address, storage) in sorted_storages {
2653            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2654                hashed_storage_cursor.delete_current_duplicates()?;
2655            }
2656
2657            for (hashed_slot, value) in storage.storage_slots_ref() {
2658                let entry = StorageEntry { key: *hashed_slot, value: *value };
2659
2660                if let Some(db_entry) =
2661                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2662                    db_entry.key == entry.key
2663                {
2664                    hashed_storage_cursor.delete_current()?;
2665                }
2666
2667                if !entry.value.is_zero() {
2668                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2669                }
2670            }
2671        }
2672
2673        Ok(())
2674    }
2675
2676    /// Remove the last N blocks of state.
2677    ///
2678    /// The latest state will be unwound
2679    ///
2680    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2681    ///    transaction ids.
2682    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2683    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2684    ///    the changesets.
2685    ///    - In order to have both the old and new values in the changesets, we also access the
2686    ///      plain state tables.
2687    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2688    ///    we:
2689    ///     1. Take the old value from the changeset
2690    ///     2. Take the new value from the plain state
2691    ///     3. Save the old value to the local state
2692    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2693    ///    have seen before we:
2694    ///     1. Take the old value from the changeset
2695    ///     2. Take the new value from the local state
2696    ///     3. Set the local state to the value in the changeset
2697    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2698        let range = block + 1..=self.last_block_number()?;
2699
2700        if range.is_empty() {
2701            return Ok(());
2702        }
2703
2704        // We are not removing block meta as it is used to get block changesets.
2705        let block_bodies = self.block_body_indices_range(range.clone())?;
2706
2707        // get transaction receipts
2708        let from_transaction_num =
2709            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2710
2711        let storage_range = BlockNumberAddress::range(range.clone());
2712        let storage_changeset = if self.cached_storage_settings().storage_v2 {
2713            let changesets = self.storage_changesets_range(range.clone())?;
2714            let mut changeset_writer =
2715                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2716            changeset_writer.prune_storage_changesets(block)?;
2717            changesets
2718        } else {
2719            self.take::<tables::StorageChangeSets>(storage_range)?
2720                .into_iter()
2721                .map(|(k, v)| {
2722                    (k, ChangesetEntry { key: StorageSlotKey::plain(v.key), value: v.value })
2723                })
2724                .collect()
2725        };
2726        let account_changeset = if self.cached_storage_settings().storage_v2 {
2727            let changesets = self.account_changesets_range(range)?;
2728            let mut changeset_writer =
2729                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2730            changeset_writer.prune_account_changesets(block)?;
2731            changesets
2732        } else {
2733            self.take::<tables::AccountChangeSets>(range)?
2734        };
2735
2736        if self.cached_storage_settings().use_hashed_state() {
2737            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2738            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2739
2740            let (state, _) = self.populate_bundle_state_hashed(
2741                account_changeset,
2742                storage_changeset,
2743                &mut hashed_accounts_cursor,
2744                &mut hashed_storage_cursor,
2745            )?;
2746
2747            for (address, (old_account, new_account, storage)) in &state {
2748                if old_account != new_account {
2749                    let hashed_address = keccak256(address);
2750                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2751                    if let Some(account) = old_account {
2752                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2753                    } else if existing_entry.is_some() {
2754                        hashed_accounts_cursor.delete_current()?;
2755                    }
2756                }
2757
2758                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2759                    let hashed_address = keccak256(address);
2760                    let storage_entry =
2761                        StorageEntry { key: *storage_key, value: *old_storage_value };
2762                    if hashed_storage_cursor
2763                        .seek_by_key_subkey(hashed_address, *storage_key)?
2764                        .filter(|s| s.key == *storage_key)
2765                        .is_some()
2766                    {
2767                        hashed_storage_cursor.delete_current()?
2768                    }
2769
2770                    if !old_storage_value.is_zero() {
2771                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2772                    }
2773                }
2774            }
2775        } else {
2776            // This is not working for blocks that are not at tip. as plain state is not the last
2777            // state of end range. We should rename the functions or add support to access
2778            // History state. Accessing history state can be tricky but we are not gaining
2779            // anything.
2780            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2781            let mut plain_storage_cursor =
2782                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2783
2784            let (state, _) = self.populate_bundle_state(
2785                account_changeset,
2786                storage_changeset,
2787                &mut plain_accounts_cursor,
2788                &mut plain_storage_cursor,
2789            )?;
2790
2791            for (address, (old_account, new_account, storage)) in &state {
2792                if old_account != new_account {
2793                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2794                    if let Some(account) = old_account {
2795                        plain_accounts_cursor.upsert(*address, account)?;
2796                    } else if existing_entry.is_some() {
2797                        plain_accounts_cursor.delete_current()?;
2798                    }
2799                }
2800
2801                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2802                    let storage_entry =
2803                        StorageEntry { key: *storage_key, value: *old_storage_value };
2804                    if plain_storage_cursor
2805                        .seek_by_key_subkey(*address, *storage_key)?
2806                        .filter(|s| s.key == *storage_key)
2807                        .is_some()
2808                    {
2809                        plain_storage_cursor.delete_current()?
2810                    }
2811
2812                    if !old_storage_value.is_zero() {
2813                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2814                    }
2815                }
2816            }
2817        }
2818
2819        self.remove_receipts_from(from_transaction_num, block)?;
2820
2821        Ok(())
2822    }
2823
2824    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2825    ///
2826    /// The latest state will be unwound and returned back with all the blocks
2827    ///
2828    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2829    ///    transaction ids.
2830    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2831    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2832    ///    the changesets.
2833    ///    - In order to have both the old and new values in the changesets, we also access the
2834    ///      plain state tables.
2835    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2836    ///    we:
2837    ///     1. Take the old value from the changeset
2838    ///     2. Take the new value from the plain state
2839    ///     3. Save the old value to the local state
2840    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2841    ///    have seen before we:
2842    ///     1. Take the old value from the changeset
2843    ///     2. Take the new value from the local state
2844    ///     3. Set the local state to the value in the changeset
2845    fn take_state_above(
2846        &self,
2847        block: BlockNumber,
2848    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2849        let range = block + 1..=self.last_block_number()?;
2850
2851        if range.is_empty() {
2852            return Ok(ExecutionOutcome::default())
2853        }
2854        let start_block_number = *range.start();
2855
2856        // We are not removing block meta as it is used to get block changesets.
2857        let block_bodies = self.block_body_indices_range(range.clone())?;
2858
2859        // get transaction receipts
2860        let from_transaction_num =
2861            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2862        let to_transaction_num =
2863            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2864
2865        let storage_range = BlockNumberAddress::range(range.clone());
2866        let storage_changeset = if let Some(highest_block) = self
2867            .static_file_provider
2868            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2869            self.cached_storage_settings().storage_v2
2870        {
2871            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2872            let mut changeset_writer =
2873                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2874            changeset_writer.prune_storage_changesets(block)?;
2875            changesets
2876        } else {
2877            self.take::<tables::StorageChangeSets>(storage_range)?
2878                .into_iter()
2879                .map(|(k, v)| {
2880                    (k, ChangesetEntry { key: StorageSlotKey::plain(v.key), value: v.value })
2881                })
2882                .collect()
2883        };
2884
2885        // if there are static files for this segment, prune them.
2886        let highest_changeset_block = self
2887            .static_file_provider
2888            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2889        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2890            self.cached_storage_settings().storage_v2
2891        {
2892            // TODO: add a `take` method that removes and returns the items instead of doing this
2893            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2894            let mut changeset_writer =
2895                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2896            changeset_writer.prune_account_changesets(block)?;
2897
2898            changesets
2899        } else {
2900            // Have to remove from static files if they exist, otherwise remove using `take` for the
2901            // changeset tables
2902            self.take::<tables::AccountChangeSets>(range)?
2903        };
2904
2905        let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2906            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2907            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2908
2909            let (state, reverts) = self.populate_bundle_state_hashed(
2910                account_changeset,
2911                storage_changeset,
2912                &mut hashed_accounts_cursor,
2913                &mut hashed_storage_cursor,
2914            )?;
2915
2916            for (address, (old_account, new_account, storage)) in &state {
2917                if old_account != new_account {
2918                    let hashed_address = keccak256(address);
2919                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2920                    if let Some(account) = old_account {
2921                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2922                    } else if existing_entry.is_some() {
2923                        hashed_accounts_cursor.delete_current()?;
2924                    }
2925                }
2926
2927                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2928                    let hashed_address = keccak256(address);
2929                    let storage_entry =
2930                        StorageEntry { key: *storage_key, value: *old_storage_value };
2931                    if hashed_storage_cursor
2932                        .seek_by_key_subkey(hashed_address, *storage_key)?
2933                        .filter(|s| s.key == *storage_key)
2934                        .is_some()
2935                    {
2936                        hashed_storage_cursor.delete_current()?
2937                    }
2938
2939                    if !old_storage_value.is_zero() {
2940                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2941                    }
2942                }
2943            }
2944
2945            (state, reverts)
2946        } else {
2947            // This is not working for blocks that are not at tip. as plain state is not the last
2948            // state of end range. We should rename the functions or add support to access
2949            // History state. Accessing history state can be tricky but we are not gaining
2950            // anything.
2951            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2952            let mut plain_storage_cursor =
2953                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2954
2955            let (state, reverts) = self.populate_bundle_state(
2956                account_changeset,
2957                storage_changeset,
2958                &mut plain_accounts_cursor,
2959                &mut plain_storage_cursor,
2960            )?;
2961
2962            for (address, (old_account, new_account, storage)) in &state {
2963                if old_account != new_account {
2964                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2965                    if let Some(account) = old_account {
2966                        plain_accounts_cursor.upsert(*address, account)?;
2967                    } else if existing_entry.is_some() {
2968                        plain_accounts_cursor.delete_current()?;
2969                    }
2970                }
2971
2972                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2973                    let storage_entry =
2974                        StorageEntry { key: *storage_key, value: *old_storage_value };
2975                    if plain_storage_cursor
2976                        .seek_by_key_subkey(*address, *storage_key)?
2977                        .filter(|s| s.key == *storage_key)
2978                        .is_some()
2979                    {
2980                        plain_storage_cursor.delete_current()?
2981                    }
2982
2983                    if !old_storage_value.is_zero() {
2984                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2985                    }
2986                }
2987            }
2988
2989            (state, reverts)
2990        };
2991
2992        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2993        let mut receipts_iter = self
2994            .static_file_provider
2995            .get_range_with_static_file_or_database(
2996                StaticFileSegment::Receipts,
2997                from_transaction_num..to_transaction_num + 1,
2998                |static_file, range, _| {
2999                    static_file
3000                        .receipts_by_tx_range(range.clone())
3001                        .map(|r| range.into_iter().zip(r).collect())
3002                },
3003                |range, _| {
3004                    self.tx
3005                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
3006                        .walk_range(range)?
3007                        .map(|r| r.map_err(Into::into))
3008                        .collect()
3009                },
3010                |_| true,
3011            )?
3012            .into_iter()
3013            .peekable();
3014
3015        let mut receipts = Vec::with_capacity(block_bodies.len());
3016        // loop break if we are at the end of the blocks.
3017        for block_body in block_bodies {
3018            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3019            for num in block_body.tx_num_range() {
3020                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3021                    block_receipts.push(receipts_iter.next().unwrap().1);
3022                }
3023            }
3024            receipts.push(block_receipts);
3025        }
3026
3027        self.remove_receipts_from(from_transaction_num, block)?;
3028
3029        Ok(ExecutionOutcome::new_init(
3030            state,
3031            reverts,
3032            Vec::new(),
3033            receipts,
3034            start_block_number,
3035            Vec::new(),
3036        ))
3037    }
3038}
3039
3040impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3041    /// Writes trie updates to the database with already sorted updates.
3042    ///
3043    /// Returns the number of entries modified.
3044    #[instrument(level = "debug", target = "providers::db", skip_all)]
3045    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3046        if trie_updates.is_empty() {
3047            return Ok(0)
3048        }
3049
3050        // Track the number of inserted entries.
3051        let mut num_entries = 0;
3052
3053        let tx = self.tx_ref();
3054        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
3055
3056        // Process sorted account nodes
3057        for (key, updated_node) in trie_updates.account_nodes_ref() {
3058            let nibbles = StoredNibbles(*key);
3059            match updated_node {
3060                Some(node) => {
3061                    if !nibbles.0.is_empty() {
3062                        num_entries += 1;
3063                        account_trie_cursor.upsert(nibbles, node)?;
3064                    }
3065                }
3066                None => {
3067                    num_entries += 1;
3068                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3069                        account_trie_cursor.delete_current()?;
3070                    }
3071                }
3072            }
3073        }
3074
3075        num_entries +=
3076            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3077
3078        Ok(num_entries)
3079    }
3080}
3081
3082impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3083    /// Writes storage trie updates from the given storage trie map with already sorted updates.
3084    ///
3085    /// Expects the storage trie updates to already be sorted by the hashed address key.
3086    ///
3087    /// Returns the number of entries modified.
3088    fn write_storage_trie_updates_sorted<'a>(
3089        &self,
3090        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3091    ) -> ProviderResult<usize> {
3092        let mut num_entries = 0;
3093        let mut storage_tries = storage_tries.collect::<Vec<_>>();
3094        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3095        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
3096        for (hashed_address, storage_trie_updates) in storage_tries {
3097            let mut db_storage_trie_cursor =
3098                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3099            num_entries +=
3100                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3101            cursor = db_storage_trie_cursor.cursor;
3102        }
3103
3104        Ok(num_entries)
3105    }
3106}
3107
3108impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3109    fn unwind_account_hashing<'a>(
3110        &self,
3111        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3112    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3113        // Aggregate all block changesets and make a list of accounts that have been changed.
3114        // Note that collecting and then reversing the order is necessary to ensure that the
3115        // changes are applied in the correct order.
3116        let hashed_accounts = changesets
3117            .into_iter()
3118            .map(|(_, e)| (keccak256(e.address), e.info))
3119            .collect::<Vec<_>>()
3120            .into_iter()
3121            .rev()
3122            .collect::<BTreeMap<_, _>>();
3123
3124        // Apply values to HashedState, and remove the account if it's None.
3125        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3126        for (hashed_address, account) in &hashed_accounts {
3127            if let Some(account) = account {
3128                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3129            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3130                hashed_accounts_cursor.delete_current()?;
3131            }
3132        }
3133
3134        Ok(hashed_accounts)
3135    }
3136
3137    fn unwind_account_hashing_range(
3138        &self,
3139        range: impl RangeBounds<BlockNumber>,
3140    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3141        let changesets = self.account_changesets_range(range)?;
3142        self.unwind_account_hashing(changesets.iter())
3143    }
3144
3145    fn insert_account_for_hashing(
3146        &self,
3147        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3148    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3149        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3150        let hashed_accounts =
3151            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3152        for (hashed_address, account) in &hashed_accounts {
3153            if let Some(account) = account {
3154                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3155            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3156                hashed_accounts_cursor.delete_current()?;
3157            }
3158        }
3159        Ok(hashed_accounts)
3160    }
3161
3162    fn unwind_storage_hashing(
3163        &self,
3164        changesets: impl Iterator<Item = (BlockNumberAddress, ChangesetEntry)>,
3165    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3166        // Aggregate all block changesets and make list of accounts that have been changed.
3167        let mut hashed_storages = changesets
3168            .into_iter()
3169            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3170                let hashed_key = storage_entry.key.to_hashed();
3171                (keccak256(address), hashed_key, storage_entry.value)
3172            })
3173            .collect::<Vec<_>>();
3174        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3175
3176        // Apply values to HashedState, and remove the account if it's None.
3177        let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3178            B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3179        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3180        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3181            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3182
3183            if hashed_storage
3184                .seek_by_key_subkey(hashed_address, key)?
3185                .filter(|entry| entry.key == key)
3186                .is_some()
3187            {
3188                hashed_storage.delete_current()?;
3189            }
3190
3191            if !value.is_zero() {
3192                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3193            }
3194        }
3195        Ok(hashed_storage_keys)
3196    }
3197
3198    fn unwind_storage_hashing_range(
3199        &self,
3200        range: impl RangeBounds<BlockNumber>,
3201    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3202        let changesets = self.storage_changesets_range(range)?;
3203        self.unwind_storage_hashing(changesets.into_iter())
3204    }
3205
3206    fn insert_storage_for_hashing(
3207        &self,
3208        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3209    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3210        // hash values
3211        let hashed_storages =
3212            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3213                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3214                    map.insert(keccak256(entry.key), entry.value);
3215                    map
3216                });
3217                map.insert(keccak256(address), storage);
3218                map
3219            });
3220
3221        let hashed_storage_keys = hashed_storages
3222            .iter()
3223            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3224            .collect();
3225
3226        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3227        // Hash the address and key and apply them to HashedStorage (if Storage is None
3228        // just remove it);
3229        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3230            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3231                if hashed_storage_cursor
3232                    .seek_by_key_subkey(hashed_address, key)?
3233                    .filter(|entry| entry.key == key)
3234                    .is_some()
3235                {
3236                    hashed_storage_cursor.delete_current()?;
3237                }
3238
3239                if !value.is_zero() {
3240                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3241                }
3242                Ok(())
3243            })
3244        })?;
3245
3246        Ok(hashed_storage_keys)
3247    }
3248}
3249
3250impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3251    fn unwind_account_history_indices<'a>(
3252        &self,
3253        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3254    ) -> ProviderResult<usize> {
3255        let mut last_indices = changesets
3256            .into_iter()
3257            .map(|(index, account)| (account.address, *index))
3258            .collect::<Vec<_>>();
3259        last_indices.sort_unstable_by_key(|(a, _)| *a);
3260
3261        if self.cached_storage_settings().storage_v2 {
3262            #[cfg(all(unix, feature = "rocksdb"))]
3263            {
3264                let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3265                self.pending_rocksdb_batches.lock().push(batch);
3266            }
3267        } else {
3268            // Unwind the account history index in MDBX.
3269            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3270            for &(address, rem_index) in &last_indices {
3271                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3272                    &mut cursor,
3273                    ShardedKey::last(address),
3274                    rem_index,
3275                    |sharded_key| sharded_key.key == address,
3276                )?;
3277
3278                // Check the last returned partial shard.
3279                // If it's not empty, the shard needs to be reinserted.
3280                if !partial_shard.is_empty() {
3281                    cursor.insert(
3282                        ShardedKey::last(address),
3283                        &BlockNumberList::new_pre_sorted(partial_shard),
3284                    )?;
3285                }
3286            }
3287        }
3288
3289        let changesets = last_indices.len();
3290        Ok(changesets)
3291    }
3292
3293    fn unwind_account_history_indices_range(
3294        &self,
3295        range: impl RangeBounds<BlockNumber>,
3296    ) -> ProviderResult<usize> {
3297        let changesets = self.account_changesets_range(range)?;
3298        self.unwind_account_history_indices(changesets.iter())
3299    }
3300
3301    fn insert_account_history_index(
3302        &self,
3303        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3304    ) -> ProviderResult<()> {
3305        self.append_history_index::<_, tables::AccountsHistory>(
3306            account_transitions,
3307            ShardedKey::new,
3308        )
3309    }
3310
3311    fn unwind_storage_history_indices(
3312        &self,
3313        changesets: impl Iterator<Item = (BlockNumberAddress, ChangesetEntry)>,
3314    ) -> ProviderResult<usize> {
3315        let mut storage_changesets = changesets
3316            .into_iter()
3317            .map(|(BlockNumberAddress((bn, address)), storage)| {
3318                (address, storage.key.as_b256(), bn)
3319            })
3320            .collect::<Vec<_>>();
3321        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3322
3323        if self.cached_storage_settings().storage_v2 {
3324            #[cfg(all(unix, feature = "rocksdb"))]
3325            {
3326                let batch =
3327                    self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3328                self.pending_rocksdb_batches.lock().push(batch);
3329            }
3330        } else {
3331            // Unwind the storage history index in MDBX.
3332            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3333            for &(address, storage_key, rem_index) in &storage_changesets {
3334                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3335                    &mut cursor,
3336                    StorageShardedKey::last(address, storage_key),
3337                    rem_index,
3338                    |storage_sharded_key| {
3339                        storage_sharded_key.address == address &&
3340                            storage_sharded_key.sharded_key.key == storage_key
3341                    },
3342                )?;
3343
3344                // Check the last returned partial shard.
3345                // If it's not empty, the shard needs to be reinserted.
3346                if !partial_shard.is_empty() {
3347                    cursor.insert(
3348                        StorageShardedKey::last(address, storage_key),
3349                        &BlockNumberList::new_pre_sorted(partial_shard),
3350                    )?;
3351                }
3352            }
3353        }
3354
3355        let changesets = storage_changesets.len();
3356        Ok(changesets)
3357    }
3358
3359    fn unwind_storage_history_indices_range(
3360        &self,
3361        range: impl RangeBounds<BlockNumber>,
3362    ) -> ProviderResult<usize> {
3363        let changesets = self.storage_changesets_range(range)?;
3364        self.unwind_storage_history_indices(changesets.into_iter())
3365    }
3366
3367    fn insert_storage_history_index(
3368        &self,
3369        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3370    ) -> ProviderResult<()> {
3371        self.append_history_index::<_, tables::StoragesHistory>(
3372            storage_transitions,
3373            |(address, storage_key), highest_block_number| {
3374                StorageShardedKey::new(address, storage_key, highest_block_number)
3375            },
3376        )
3377    }
3378
3379    #[instrument(level = "debug", target = "providers::db", skip_all)]
3380    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3381        let storage_settings = self.cached_storage_settings();
3382        if !storage_settings.storage_v2 {
3383            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3384            self.insert_account_history_index(indices)?;
3385        }
3386
3387        if !storage_settings.storage_v2 {
3388            let indices = self.changed_storages_and_blocks_with_range(range)?;
3389            self.insert_storage_history_index(indices)?;
3390        }
3391
3392        Ok(())
3393    }
3394}
3395
3396impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3397    for DatabaseProvider<TX, N>
3398{
3399    fn take_block_and_execution_above(
3400        &self,
3401        block: BlockNumber,
3402    ) -> ProviderResult<Chain<Self::Primitives>> {
3403        let range = block + 1..=self.last_block_number()?;
3404
3405        self.unwind_trie_state_from(block + 1)?;
3406
3407        // get execution res
3408        let execution_state = self.take_state_above(block)?;
3409
3410        let blocks = self.recovered_block_range(range)?;
3411
3412        // remove block bodies it is needed for both get block range and get block execution results
3413        // that is why it is deleted afterwards.
3414        self.remove_blocks_above(block)?;
3415
3416        // Update pipeline progress
3417        self.update_pipeline_stages(block, true)?;
3418
3419        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3420    }
3421
3422    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3423        self.unwind_trie_state_from(block + 1)?;
3424
3425        // remove execution res
3426        self.remove_state_above(block)?;
3427
3428        // remove block bodies it is needed for both get block range and get block execution results
3429        // that is why it is deleted afterwards.
3430        self.remove_blocks_above(block)?;
3431
3432        // Update pipeline progress
3433        self.update_pipeline_stages(block, true)?;
3434
3435        Ok(())
3436    }
3437}
3438
3439impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3440    for DatabaseProvider<TX, N>
3441{
3442    type Block = BlockTy<N>;
3443    type Receipt = ReceiptTy<N>;
3444
3445    /// Inserts the block into the database, writing to both static files and MDBX.
3446    ///
3447    /// This is a convenience method primarily used in tests. For production use,
3448    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3449    fn insert_block(
3450        &self,
3451        block: &RecoveredBlock<Self::Block>,
3452    ) -> ProviderResult<StoredBlockBodyIndices> {
3453        let block_number = block.number();
3454
3455        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3456        let executed_block = ExecutedBlock::new(
3457            Arc::new(block.clone()),
3458            Arc::new(BlockExecutionOutput {
3459                result: BlockExecutionResult {
3460                    receipts: Default::default(),
3461                    requests: Default::default(),
3462                    gas_used: 0,
3463                    blob_gas_used: 0,
3464                },
3465                state: Default::default(),
3466            }),
3467            ComputedTrieData::default(),
3468        );
3469
3470        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3471        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3472
3473        // Return the body indices
3474        self.block_body_indices(block_number)?
3475            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3476    }
3477
3478    fn append_block_bodies(
3479        &self,
3480        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3481    ) -> ProviderResult<()> {
3482        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3483
3484        // Initialize writer if we will be writing transactions to staticfiles
3485        let mut tx_writer =
3486            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3487
3488        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3489        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3490
3491        // Get id for the next tx_num or zero if there are no transactions.
3492        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3493
3494        for (block_number, body) in &bodies {
3495            // Increment block on static file header.
3496            tx_writer.increment_block(*block_number)?;
3497
3498            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3499            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3500
3501            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3502
3503            // insert block meta
3504            block_indices_cursor.append(*block_number, &block_indices)?;
3505
3506            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3507
3508            let Some(body) = body else { continue };
3509
3510            // write transaction block index
3511            if !body.transactions().is_empty() {
3512                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3513                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3514            }
3515
3516            // write transactions
3517            for transaction in body.transactions() {
3518                tx_writer.append_transaction(next_tx_num, transaction)?;
3519
3520                // Increment transaction id for each transaction.
3521                next_tx_num += 1;
3522            }
3523        }
3524
3525        self.storage.writer().write_block_bodies(self, bodies)?;
3526
3527        Ok(())
3528    }
3529
3530    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3531        let last_block_number = self.last_block_number()?;
3532        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3533        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3534            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3535        }
3536
3537        // Get highest static file block for the total block range
3538        let highest_static_file_block = self
3539            .static_file_provider()
3540            .get_highest_static_file_block(StaticFileSegment::Headers)
3541            .expect("todo: error handling, headers should exist");
3542
3543        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3544        // we remove only what is ABOVE the block.
3545        //
3546        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3547        // will have three blocks to remove, which will be block 8, 7, and 6.
3548        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3549        self.static_file_provider()
3550            .get_writer(block, StaticFileSegment::Headers)?
3551            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3552
3553        // First transaction to be removed
3554        let unwind_tx_from = self
3555            .block_body_indices(block)?
3556            .map(|b| b.next_tx_num())
3557            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3558
3559        // Last transaction to be removed
3560        let unwind_tx_to = self
3561            .tx
3562            .cursor_read::<tables::BlockBodyIndices>()?
3563            .last()?
3564            // shouldn't happen because this was OK above
3565            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3566            .1
3567            .last_tx_num();
3568
3569        if unwind_tx_from <= unwind_tx_to {
3570            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3571            self.with_rocksdb_batch(|batch| {
3572                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3573                for (hash, _) in hashes {
3574                    writer.delete_transaction_hash_number(hash)?;
3575                }
3576                Ok(((), writer.into_raw_rocksdb_batch()))
3577            })?;
3578        }
3579
3580        EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3581
3582        self.remove_bodies_above(block)?;
3583
3584        Ok(())
3585    }
3586
3587    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3588        self.storage.writer().remove_block_bodies_above(self, block)?;
3589
3590        // First transaction to be removed
3591        let unwind_tx_from = self
3592            .block_body_indices(block)?
3593            .map(|b| b.next_tx_num())
3594            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3595
3596        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3597        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3598
3599        let static_file_tx_num =
3600            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3601
3602        let to_delete = static_file_tx_num
3603            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3604            .unwrap_or_default();
3605
3606        self.static_file_provider
3607            .latest_writer(StaticFileSegment::Transactions)?
3608            .prune_transactions(to_delete, block)?;
3609
3610        Ok(())
3611    }
3612
3613    /// Appends blocks with their execution state to the database.
3614    ///
3615    /// **Note:** This function is only used in tests.
3616    ///
3617    /// History indices are written to the appropriate backend based on storage settings:
3618    /// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
3619    ///
3620    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3621    fn append_blocks_with_state(
3622        &self,
3623        blocks: Vec<RecoveredBlock<Self::Block>>,
3624        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3625        hashed_state: HashedPostStateSorted,
3626    ) -> ProviderResult<()> {
3627        if blocks.is_empty() {
3628            debug!(target: "providers::db", "Attempted to append empty block range");
3629            return Ok(())
3630        }
3631
3632        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3633        // `None`.
3634        let first_number = blocks[0].number();
3635
3636        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3637        // `None`.
3638        let last_block_number = blocks[blocks.len() - 1].number();
3639
3640        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3641
3642        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3643        // This is necessary because with edge storage, changesets are written to static files
3644        // whose index isn't updated until commit, making them invisible to subsequent reads
3645        // within the same transaction.
3646        let use_hashed = self.cached_storage_settings().use_hashed_state();
3647        let (account_transitions, storage_transitions) = {
3648            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3649            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3650            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3651                let block_number = first_number + block_idx as u64;
3652                for (address, account_revert) in block_reverts {
3653                    account_transitions.entry(*address).or_default().push(block_number);
3654                    for storage_key in account_revert.storage.keys() {
3655                        let key =
3656                            StorageSlotKey::from_u256(*storage_key).to_changeset_key(use_hashed);
3657                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3658                    }
3659                }
3660            }
3661            (account_transitions, storage_transitions)
3662        };
3663
3664        // Insert the blocks
3665        for block in blocks {
3666            self.insert_block(&block)?;
3667            durations_recorder.record_relative(metrics::Action::InsertBlock);
3668        }
3669
3670        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3671        durations_recorder.record_relative(metrics::Action::InsertState);
3672
3673        // insert hashes and intermediate merkle nodes
3674        self.write_hashed_state(&hashed_state)?;
3675        durations_recorder.record_relative(metrics::Action::InsertHashes);
3676
3677        // Use pre-computed transitions for history indices since static file
3678        // writes aren't visible until commit.
3679        // Note: For MDBX we use insert_*_history_index. For RocksDB we use
3680        // append_*_history_shard which handles read-merge-write internally.
3681        let storage_settings = self.cached_storage_settings();
3682        if storage_settings.storage_v2 {
3683            #[cfg(all(unix, feature = "rocksdb"))]
3684            self.with_rocksdb_batch(|mut batch| {
3685                for (address, blocks) in account_transitions {
3686                    batch.append_account_history_shard(address, blocks)?;
3687                }
3688                Ok(((), Some(batch.into_inner())))
3689            })?;
3690        } else {
3691            self.insert_account_history_index(account_transitions)?;
3692        }
3693        if storage_settings.storage_v2 {
3694            #[cfg(all(unix, feature = "rocksdb"))]
3695            self.with_rocksdb_batch(|mut batch| {
3696                for ((address, key), blocks) in storage_transitions {
3697                    batch.append_storage_history_shard(address, key, blocks)?;
3698                }
3699                Ok(((), Some(batch.into_inner())))
3700            })?;
3701        } else {
3702            self.insert_storage_history_index(storage_transitions)?;
3703        }
3704        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3705
3706        // Update pipeline progress
3707        self.update_pipeline_stages(last_block_number, false)?;
3708        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3709
3710        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3711
3712        Ok(())
3713    }
3714}
3715
3716impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3717    fn get_prune_checkpoint(
3718        &self,
3719        segment: PruneSegment,
3720    ) -> ProviderResult<Option<PruneCheckpoint>> {
3721        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3722    }
3723
3724    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3725        Ok(PruneSegment::variants()
3726            .filter_map(|segment| {
3727                self.tx
3728                    .get::<tables::PruneCheckpoints>(segment)
3729                    .transpose()
3730                    .map(|chk| chk.map(|chk| (segment, chk)))
3731            })
3732            .collect::<Result<_, _>>()?)
3733    }
3734}
3735
3736impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3737    fn save_prune_checkpoint(
3738        &self,
3739        segment: PruneSegment,
3740        checkpoint: PruneCheckpoint,
3741    ) -> ProviderResult<()> {
3742        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3743    }
3744}
3745
3746impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3747    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3748        let db_entries = self.tx.entries::<T>()?;
3749        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3750            Ok(entries) => entries,
3751            Err(ProviderError::UnsupportedProvider) => 0,
3752            Err(err) => return Err(err),
3753        };
3754
3755        Ok(db_entries + static_file_entries)
3756    }
3757}
3758
3759impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3760    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3761        let mut finalized_blocks = self
3762            .tx
3763            .cursor_read::<tables::ChainState>()?
3764            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3765            .take(1)
3766            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3767
3768        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3769        Ok(last_finalized_block_number)
3770    }
3771
3772    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3773        let mut finalized_blocks = self
3774            .tx
3775            .cursor_read::<tables::ChainState>()?
3776            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3777            .take(1)
3778            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3779
3780        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3781        Ok(last_finalized_block_number)
3782    }
3783}
3784
3785impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3786    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3787        Ok(self
3788            .tx
3789            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3790    }
3791
3792    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3793        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3794    }
3795}
3796
3797impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3798    type Tx = TX;
3799
3800    fn tx_ref(&self) -> &Self::Tx {
3801        &self.tx
3802    }
3803
3804    fn tx_mut(&mut self) -> &mut Self::Tx {
3805        &mut self.tx
3806    }
3807
3808    fn into_tx(self) -> Self::Tx {
3809        self.tx
3810    }
3811
3812    fn prune_modes_ref(&self) -> &PruneModes {
3813        self.prune_modes_ref()
3814    }
3815
3816    /// Commit database transaction, static files, and pending `RocksDB` batches.
3817    #[instrument(
3818        name = "DatabaseProvider::commit",
3819        level = "debug",
3820        target = "providers::db",
3821        skip_all
3822    )]
3823    fn commit(self) -> ProviderResult<()> {
3824        // For unwinding it makes more sense to commit the database first, since if
3825        // it is interrupted before the static files commit, we can just
3826        // truncate the static files according to the
3827        // checkpoints on the next start-up.
3828        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3829            self.tx.commit()?;
3830
3831            #[cfg(all(unix, feature = "rocksdb"))]
3832            {
3833                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3834                for batch in batches {
3835                    self.rocksdb_provider.commit_batch(batch)?;
3836                }
3837            }
3838
3839            self.static_file_provider.commit()?;
3840        } else {
3841            // Normal path: finalize() will call sync_all() if not already synced
3842            let mut timings = metrics::CommitTimings::default();
3843
3844            let start = Instant::now();
3845            self.static_file_provider.finalize()?;
3846            timings.sf = start.elapsed();
3847
3848            #[cfg(all(unix, feature = "rocksdb"))]
3849            {
3850                let start = Instant::now();
3851                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3852                for batch in batches {
3853                    self.rocksdb_provider.commit_batch(batch)?;
3854                }
3855                timings.rocksdb = start.elapsed();
3856            }
3857
3858            let start = Instant::now();
3859            self.tx.commit()?;
3860            timings.mdbx = start.elapsed();
3861
3862            self.metrics.record_commit(&timings);
3863        }
3864
3865        Ok(())
3866    }
3867}
3868
3869impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3870    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3871        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3872    }
3873}
3874
3875impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3876    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3877        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3878    }
3879}
3880
3881impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3882    fn cached_storage_settings(&self) -> StorageSettings {
3883        *self.storage_settings.read()
3884    }
3885
3886    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3887        *self.storage_settings.write() = settings;
3888    }
3889}
3890
3891#[cfg(test)]
3892mod tests {
3893    use super::*;
3894    use crate::{
3895        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3896        BlockWriter,
3897    };
3898    use alloy_consensus::Header;
3899    use alloy_primitives::{
3900        map::{AddressMap, B256Map},
3901        U256,
3902    };
3903    use reth_chain_state::ExecutedBlock;
3904    use reth_ethereum_primitives::Receipt;
3905    use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3906    use reth_primitives_traits::SealedBlock;
3907    use reth_testing_utils::generators::{self, random_block, BlockParams};
3908    use reth_trie::{HashedPostState, KeccakKeyHasher, Nibbles, StoredNibblesSubKey};
3909    use revm_database::BundleState;
3910    use revm_state::AccountInfo;
3911
3912    #[test]
3913    fn test_receipts_by_block_range_empty_range() {
3914        let factory = create_test_provider_factory();
3915        let provider = factory.provider().unwrap();
3916
3917        // empty range should return empty vec
3918        let start = 10u64;
3919        let end = 9u64;
3920        let result = provider.receipts_by_block_range(start..=end).unwrap();
3921        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3922    }
3923
3924    #[test]
3925    fn test_receipts_by_block_range_nonexistent_blocks() {
3926        let factory = create_test_provider_factory();
3927        let provider = factory.provider().unwrap();
3928
3929        // non-existent blocks should return empty vecs for each block
3930        let result = provider.receipts_by_block_range(10..=12).unwrap();
3931        assert_eq!(result, vec![vec![], vec![], vec![]]);
3932    }
3933
3934    #[test]
3935    fn test_receipts_by_block_range_single_block() {
3936        let factory = create_test_provider_factory();
3937        let data = BlockchainTestData::default();
3938
3939        let provider_rw = factory.provider_rw().unwrap();
3940        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3941        provider_rw
3942            .write_state(
3943                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3944                crate::OriginalValuesKnown::No,
3945                StateWriteConfig::default(),
3946            )
3947            .unwrap();
3948        provider_rw.insert_block(&data.blocks[0].0).unwrap();
3949        provider_rw
3950            .write_state(
3951                &data.blocks[0].1,
3952                crate::OriginalValuesKnown::No,
3953                StateWriteConfig::default(),
3954            )
3955            .unwrap();
3956        provider_rw.commit().unwrap();
3957
3958        let provider = factory.provider().unwrap();
3959        let result = provider.receipts_by_block_range(1..=1).unwrap();
3960
3961        // should have one vec with one receipt
3962        assert_eq!(result.len(), 1);
3963        assert_eq!(result[0].len(), 1);
3964        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3965    }
3966
3967    #[test]
3968    fn test_receipts_by_block_range_multiple_blocks() {
3969        let factory = create_test_provider_factory();
3970        let data = BlockchainTestData::default();
3971
3972        let provider_rw = factory.provider_rw().unwrap();
3973        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
3974        provider_rw
3975            .write_state(
3976                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3977                crate::OriginalValuesKnown::No,
3978                StateWriteConfig::default(),
3979            )
3980            .unwrap();
3981        for i in 0..3 {
3982            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3983            provider_rw
3984                .write_state(
3985                    &data.blocks[i].1,
3986                    crate::OriginalValuesKnown::No,
3987                    StateWriteConfig::default(),
3988                )
3989                .unwrap();
3990        }
3991        provider_rw.commit().unwrap();
3992
3993        let provider = factory.provider().unwrap();
3994        let result = provider.receipts_by_block_range(1..=3).unwrap();
3995
3996        // should have 3 vecs, each with one receipt
3997        assert_eq!(result.len(), 3);
3998        for (i, block_receipts) in result.iter().enumerate() {
3999            assert_eq!(block_receipts.len(), 1);
4000            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4001        }
4002    }
4003
4004    #[test]
4005    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4006        let factory = create_test_provider_factory();
4007        let data = BlockchainTestData::default();
4008
4009        let provider_rw = factory.provider_rw().unwrap();
4010        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4011        provider_rw
4012            .write_state(
4013                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4014                crate::OriginalValuesKnown::No,
4015                StateWriteConfig::default(),
4016            )
4017            .unwrap();
4018
4019        // insert blocks 1-3 with receipts
4020        for i in 0..3 {
4021            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4022            provider_rw
4023                .write_state(
4024                    &data.blocks[i].1,
4025                    crate::OriginalValuesKnown::No,
4026                    StateWriteConfig::default(),
4027                )
4028                .unwrap();
4029        }
4030        provider_rw.commit().unwrap();
4031
4032        let provider = factory.provider().unwrap();
4033        let result = provider.receipts_by_block_range(1..=3).unwrap();
4034
4035        // verify each block has one receipt
4036        assert_eq!(result.len(), 3);
4037        for block_receipts in &result {
4038            assert_eq!(block_receipts.len(), 1);
4039        }
4040    }
4041
4042    #[test]
4043    fn test_receipts_by_block_range_partial_range() {
4044        let factory = create_test_provider_factory();
4045        let data = BlockchainTestData::default();
4046
4047        let provider_rw = factory.provider_rw().unwrap();
4048        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4049        provider_rw
4050            .write_state(
4051                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4052                crate::OriginalValuesKnown::No,
4053                StateWriteConfig::default(),
4054            )
4055            .unwrap();
4056        for i in 0..3 {
4057            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4058            provider_rw
4059                .write_state(
4060                    &data.blocks[i].1,
4061                    crate::OriginalValuesKnown::No,
4062                    StateWriteConfig::default(),
4063                )
4064                .unwrap();
4065        }
4066        provider_rw.commit().unwrap();
4067
4068        let provider = factory.provider().unwrap();
4069
4070        // request range that includes both existing and non-existing blocks
4071        let result = provider.receipts_by_block_range(2..=5).unwrap();
4072        assert_eq!(result.len(), 4);
4073
4074        // blocks 2-3 should have receipts, blocks 4-5 should be empty
4075        assert_eq!(result[0].len(), 1); // block 2
4076        assert_eq!(result[1].len(), 1); // block 3
4077        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
4078        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
4079
4080        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4081        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4082    }
4083
4084    #[test]
4085    fn test_receipts_by_block_range_all_empty_blocks() {
4086        let factory = create_test_provider_factory();
4087        let mut rng = generators::rng();
4088
4089        // create blocks with no transactions
4090        let mut blocks = Vec::new();
4091        for i in 0..3 {
4092            let block =
4093                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4094            blocks.push(block);
4095        }
4096
4097        let provider_rw = factory.provider_rw().unwrap();
4098        for block in blocks {
4099            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4100        }
4101        provider_rw.commit().unwrap();
4102
4103        let provider = factory.provider().unwrap();
4104        let result = provider.receipts_by_block_range(1..=3).unwrap();
4105
4106        assert_eq!(result.len(), 3);
4107        for block_receipts in result {
4108            assert_eq!(block_receipts.len(), 0);
4109        }
4110    }
4111
4112    #[test]
4113    fn test_receipts_by_block_range_consistency_with_individual_calls() {
4114        let factory = create_test_provider_factory();
4115        let data = BlockchainTestData::default();
4116
4117        let provider_rw = factory.provider_rw().unwrap();
4118        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4119        provider_rw
4120            .write_state(
4121                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4122                crate::OriginalValuesKnown::No,
4123                StateWriteConfig::default(),
4124            )
4125            .unwrap();
4126        for i in 0..3 {
4127            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4128            provider_rw
4129                .write_state(
4130                    &data.blocks[i].1,
4131                    crate::OriginalValuesKnown::No,
4132                    StateWriteConfig::default(),
4133                )
4134                .unwrap();
4135        }
4136        provider_rw.commit().unwrap();
4137
4138        let provider = factory.provider().unwrap();
4139
4140        // get receipts using block range method
4141        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4142
4143        // get receipts using individual block calls
4144        let mut individual_results = Vec::new();
4145        for block_num in 1..=3 {
4146            let receipts =
4147                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4148            individual_results.push(receipts);
4149        }
4150
4151        assert_eq!(range_result, individual_results);
4152    }
4153
4154    #[test]
4155    fn test_write_trie_updates_sorted() {
4156        use reth_trie::{
4157            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4158            BranchNodeCompact, StorageTrieEntry,
4159        };
4160
4161        let factory = create_test_provider_factory();
4162        let provider_rw = factory.provider_rw().unwrap();
4163
4164        // Pre-populate account trie with data that will be deleted
4165        {
4166            let tx = provider_rw.tx_ref();
4167            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4168
4169            // Add account node that will be deleted
4170            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4171            cursor
4172                .upsert(
4173                    to_delete,
4174                    &BranchNodeCompact::new(
4175                        0b1010_1010_1010_1010, // state_mask
4176                        0b0000_0000_0000_0000, // tree_mask
4177                        0b0000_0000_0000_0000, // hash_mask
4178                        vec![],
4179                        None,
4180                    ),
4181                )
4182                .unwrap();
4183
4184            // Add account node that will be updated
4185            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4186            cursor
4187                .upsert(
4188                    to_update,
4189                    &BranchNodeCompact::new(
4190                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4191                        0b0000_0000_0000_0000, // tree_mask
4192                        0b0000_0000_0000_0000, // hash_mask
4193                        vec![],
4194                        None,
4195                    ),
4196                )
4197                .unwrap();
4198        }
4199
4200        // Pre-populate storage tries with data
4201        let storage_address1 = B256::from([1u8; 32]);
4202        let storage_address2 = B256::from([2u8; 32]);
4203        {
4204            let tx = provider_rw.tx_ref();
4205            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4206
4207            // Add storage nodes for address1 (one will be deleted)
4208            storage_cursor
4209                .upsert(
4210                    storage_address1,
4211                    &StorageTrieEntry {
4212                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4213                        node: BranchNodeCompact::new(
4214                            0b0011_0011_0011_0011, // will be deleted
4215                            0b0000_0000_0000_0000,
4216                            0b0000_0000_0000_0000,
4217                            vec![],
4218                            None,
4219                        ),
4220                    },
4221                )
4222                .unwrap();
4223
4224            // Add storage nodes for address2 (will be wiped)
4225            storage_cursor
4226                .upsert(
4227                    storage_address2,
4228                    &StorageTrieEntry {
4229                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4230                        node: BranchNodeCompact::new(
4231                            0b1100_1100_1100_1100, // will be wiped
4232                            0b0000_0000_0000_0000,
4233                            0b0000_0000_0000_0000,
4234                            vec![],
4235                            None,
4236                        ),
4237                    },
4238                )
4239                .unwrap();
4240            storage_cursor
4241                .upsert(
4242                    storage_address2,
4243                    &StorageTrieEntry {
4244                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4245                        node: BranchNodeCompact::new(
4246                            0b0011_1100_0011_1100, // will be wiped
4247                            0b0000_0000_0000_0000,
4248                            0b0000_0000_0000_0000,
4249                            vec![],
4250                            None,
4251                        ),
4252                    },
4253                )
4254                .unwrap();
4255        }
4256
4257        // Create sorted account trie updates
4258        let account_nodes = vec![
4259            (
4260                Nibbles::from_nibbles([0x1, 0x2]),
4261                Some(BranchNodeCompact::new(
4262                    0b1111_1111_1111_1111, // state_mask (updated)
4263                    0b0000_0000_0000_0000, // tree_mask
4264                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4265                    vec![],
4266                    None,
4267                )),
4268            ),
4269            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4270            (
4271                Nibbles::from_nibbles([0x5, 0x6]),
4272                Some(BranchNodeCompact::new(
4273                    0b1111_1111_1111_1111, // state_mask
4274                    0b0000_0000_0000_0000, // tree_mask
4275                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4276                    vec![],
4277                    None,
4278                )),
4279            ),
4280        ];
4281
4282        // Create sorted storage trie updates
4283        let storage_trie1 = StorageTrieUpdatesSorted {
4284            is_deleted: false,
4285            storage_nodes: vec![
4286                (
4287                    Nibbles::from_nibbles([0x1, 0x0]),
4288                    Some(BranchNodeCompact::new(
4289                        0b1111_0000_0000_0000, // state_mask
4290                        0b0000_0000_0000_0000, // tree_mask
4291                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4292                        vec![],
4293                        None,
4294                    )),
4295                ),
4296                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4297            ],
4298        };
4299
4300        let storage_trie2 = StorageTrieUpdatesSorted {
4301            is_deleted: true, // Wipe all storage for this address
4302            storage_nodes: vec![],
4303        };
4304
4305        let mut storage_tries = B256Map::default();
4306        storage_tries.insert(storage_address1, storage_trie1);
4307        storage_tries.insert(storage_address2, storage_trie2);
4308
4309        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4310
4311        // Write the sorted trie updates
4312        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4313
4314        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4315        // storage deletion = 5
4316        assert_eq!(num_entries, 5);
4317
4318        // Verify account trie updates were written correctly
4319        let tx = provider_rw.tx_ref();
4320        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4321
4322        // Check first account node was updated
4323        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4324        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4325        assert!(entry1.is_some(), "Updated account node should exist");
4326        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4327        assert_eq!(
4328            entry1.unwrap().1.state_mask,
4329            expected_mask,
4330            "Account node should have updated state_mask"
4331        );
4332
4333        // Check deleted account node no longer exists
4334        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4335        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4336        assert!(entry2.is_none(), "Deleted account node should not exist");
4337
4338        // Check new account node exists
4339        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4340        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4341        assert!(entry3.is_some(), "New account node should exist");
4342
4343        // Verify storage trie updates were written correctly
4344        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4345
4346        // Check storage for address1
4347        let storage_entries1: Vec<_> = storage_cursor
4348            .walk_dup(Some(storage_address1), None)
4349            .unwrap()
4350            .collect::<Result<Vec<_>, _>>()
4351            .unwrap();
4352        assert_eq!(
4353            storage_entries1.len(),
4354            1,
4355            "Storage address1 should have 1 entry after deletion"
4356        );
4357        assert_eq!(
4358            storage_entries1[0].1.nibbles.0,
4359            Nibbles::from_nibbles([0x1, 0x0]),
4360            "Remaining entry should be [0x1, 0x0]"
4361        );
4362
4363        // Check storage for address2 was wiped
4364        let storage_entries2: Vec<_> = storage_cursor
4365            .walk_dup(Some(storage_address2), None)
4366            .unwrap()
4367            .collect::<Result<Vec<_>, _>>()
4368            .unwrap();
4369        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4370
4371        provider_rw.commit().unwrap();
4372    }
4373
4374    #[test]
4375    fn test_prunable_receipts_logic() {
4376        let insert_blocks =
4377            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4378                let mut rng = generators::rng();
4379                for block_num in 0..=tip_block {
4380                    let block = random_block(
4381                        &mut rng,
4382                        block_num,
4383                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4384                    );
4385                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4386                }
4387            };
4388
4389        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4390            let outcome = ExecutionOutcome {
4391                first_block: block,
4392                receipts: vec![vec![Receipt {
4393                    tx_type: Default::default(),
4394                    success: true,
4395                    cumulative_gas_used: block, // identifier to assert against
4396                    logs: vec![],
4397                }]],
4398                ..Default::default()
4399            };
4400            provider_rw
4401                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4402                .unwrap();
4403            provider_rw.commit().unwrap();
4404        };
4405
4406        // Legacy mode (receipts in DB) - should be prunable
4407        {
4408            let factory = create_test_provider_factory();
4409            let storage_settings = StorageSettings::v1();
4410            factory.set_storage_settings_cache(storage_settings);
4411            let factory = factory.with_prune_modes(PruneModes {
4412                receipts: Some(PruneMode::Before(100)),
4413                ..Default::default()
4414            });
4415
4416            let tip_block = 200u64;
4417            let first_block = 1u64;
4418
4419            // create chain
4420            let provider_rw = factory.provider_rw().unwrap();
4421            insert_blocks(&provider_rw, tip_block, 1);
4422            provider_rw.commit().unwrap();
4423
4424            write_receipts(
4425                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4426                first_block,
4427            );
4428            write_receipts(
4429                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4430                tip_block - 1,
4431            );
4432
4433            let provider = factory.provider().unwrap();
4434
4435            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4436                assert!(provider
4437                    .receipts_by_block(block.into())
4438                    .unwrap()
4439                    .is_some_and(|r| r.len() == num_receipts));
4440            }
4441        }
4442
4443        // Static files mode
4444        {
4445            let factory = create_test_provider_factory();
4446            let storage_settings = StorageSettings::v2();
4447            factory.set_storage_settings_cache(storage_settings);
4448            let factory = factory.with_prune_modes(PruneModes {
4449                receipts: Some(PruneMode::Before(2)),
4450                ..Default::default()
4451            });
4452
4453            let tip_block = 200u64;
4454
4455            // create chain
4456            let provider_rw = factory.provider_rw().unwrap();
4457            insert_blocks(&provider_rw, tip_block, 1);
4458            provider_rw.commit().unwrap();
4459
4460            // Attempt to write receipts for block 0 and 1 (should be skipped)
4461            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4462            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4463
4464            assert!(factory
4465                .static_file_provider()
4466                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4467                .is_none(),);
4468            assert!(factory
4469                .static_file_provider()
4470                .get_highest_static_file_block(StaticFileSegment::Receipts)
4471                .is_some_and(|b| b == 1),);
4472
4473            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4474            // static files.
4475            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4476            assert!(factory
4477                .static_file_provider()
4478                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4479                .is_some_and(|num| num == 2),);
4480
4481            // After having a receipt already in static files, attempt to skip the next receipt by
4482            // changing the prune mode. It should NOT skip it and should still write the receipt,
4483            // since static files do not support gaps.
4484            let factory = factory.with_prune_modes(PruneModes {
4485                receipts: Some(PruneMode::Before(100)),
4486                ..Default::default()
4487            });
4488            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4489            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4490            write_receipts(provider_rw, 3);
4491
4492            // Ensure we can only fetch the 2 last receipts.
4493            //
4494            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4495            // to the block number it belongs to easily identify and assert.
4496            let provider = factory.provider().unwrap();
4497            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4498            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4499                assert!(provider
4500                    .receipts_by_block(num.into())
4501                    .unwrap()
4502                    .is_some_and(|r| r.len() == num_receipts));
4503
4504                let receipt = provider.receipt(num).unwrap();
4505                if num_receipts > 0 {
4506                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4507                } else {
4508                    assert!(receipt.is_none());
4509                }
4510            }
4511        }
4512    }
4513
4514    #[test]
4515    fn test_try_into_history_rejects_unexecuted_blocks() {
4516        use reth_storage_api::TryIntoHistoricalStateProvider;
4517
4518        let factory = create_test_provider_factory();
4519
4520        // Insert genesis block to have some data
4521        let data = BlockchainTestData::default();
4522        let provider_rw = factory.provider_rw().unwrap();
4523        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4524        provider_rw
4525            .write_state(
4526                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4527                crate::OriginalValuesKnown::No,
4528                StateWriteConfig::default(),
4529            )
4530            .unwrap();
4531        provider_rw.commit().unwrap();
4532
4533        // Get a fresh provider - Execution checkpoint is 0, no receipts written beyond genesis
4534        let provider = factory.provider().unwrap();
4535
4536        // Requesting historical state for block 0 (executed) should succeed
4537        let result = provider.try_into_history_at_block(0);
4538        assert!(result.is_ok(), "Block 0 should be available");
4539
4540        // Get another provider and request state for block 100 (not executed)
4541        let provider = factory.provider().unwrap();
4542        let result = provider.try_into_history_at_block(100);
4543
4544        // Should fail with BlockNotExecuted error
4545        match result {
4546            Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4547            Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4548            Ok(_) => panic!("Expected error, got Ok"),
4549        }
4550    }
4551
4552    #[test]
4553    fn test_unwind_storage_hashing_with_hashed_state() {
4554        let factory = create_test_provider_factory();
4555        let storage_settings = StorageSettings::v2();
4556        factory.set_storage_settings_cache(storage_settings);
4557
4558        let address = Address::random();
4559        let hashed_address = keccak256(address);
4560
4561        let slot_key_already_hashed = B256::random();
4562
4563        let current_value = U256::from(100);
4564        let old_value = U256::from(42);
4565
4566        let provider_rw = factory.provider_rw().unwrap();
4567        provider_rw
4568            .tx
4569            .cursor_dup_write::<tables::HashedStorages>()
4570            .unwrap()
4571            .upsert(
4572                hashed_address,
4573                &StorageEntry { key: slot_key_already_hashed, value: current_value },
4574            )
4575            .unwrap();
4576
4577        let changesets = vec![(
4578            BlockNumberAddress((1, address)),
4579            ChangesetEntry {
4580                key: StorageSlotKey::Hashed(slot_key_already_hashed),
4581                value: old_value,
4582            },
4583        )];
4584
4585        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4586
4587        assert_eq!(result.len(), 1);
4588        assert!(result.contains_key(&hashed_address));
4589        assert!(result[&hashed_address].contains(&slot_key_already_hashed));
4590
4591        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4592        let entry = cursor
4593            .seek_by_key_subkey(hashed_address, slot_key_already_hashed)
4594            .unwrap()
4595            .expect("entry should exist");
4596        assert_eq!(entry.key, slot_key_already_hashed);
4597        assert_eq!(entry.value, old_value);
4598    }
4599
4600    #[test]
4601    fn test_write_and_remove_state_roundtrip_legacy() {
4602        let factory = create_test_provider_factory();
4603        let storage_settings = StorageSettings::v1();
4604        assert!(!storage_settings.use_hashed_state());
4605        factory.set_storage_settings_cache(storage_settings);
4606
4607        let address = Address::with_last_byte(1);
4608        let hashed_address = keccak256(address);
4609        let slot = U256::from(5);
4610        let slot_key = B256::from(slot);
4611        let hashed_slot = keccak256(slot_key);
4612
4613        let mut rng = generators::rng();
4614        let block0 =
4615            random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4616        let block1 =
4617            random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4618
4619        {
4620            let provider_rw = factory.provider_rw().unwrap();
4621            provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4622            provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4623            provider_rw
4624                .tx
4625                .cursor_write::<tables::PlainAccountState>()
4626                .unwrap()
4627                .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4628                .unwrap();
4629            provider_rw.commit().unwrap();
4630        }
4631
4632        let provider_rw = factory.provider_rw().unwrap();
4633
4634        let mut state_init: BundleStateInit = AddressMap::default();
4635        let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4636        storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4637        state_init.insert(
4638            address,
4639            (
4640                Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4641                Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4642                storage_map,
4643            ),
4644        );
4645
4646        let mut reverts_init: RevertsInit = HashMap::default();
4647        let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4648        block_reverts.insert(
4649            address,
4650            (
4651                Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4652                vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4653            ),
4654        );
4655        reverts_init.insert(1, block_reverts);
4656
4657        let execution_outcome =
4658            ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4659
4660        provider_rw
4661            .write_state(
4662                &execution_outcome,
4663                OriginalValuesKnown::Yes,
4664                StateWriteConfig {
4665                    write_receipts: false,
4666                    write_account_changesets: true,
4667                    write_storage_changesets: true,
4668                },
4669            )
4670            .unwrap();
4671
4672        let hashed_state =
4673            execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4674        provider_rw.write_hashed_state(&hashed_state).unwrap();
4675
4676        let account = provider_rw
4677            .tx
4678            .cursor_read::<tables::PlainAccountState>()
4679            .unwrap()
4680            .seek_exact(address)
4681            .unwrap()
4682            .unwrap()
4683            .1;
4684        assert_eq!(account.nonce, 1);
4685
4686        let storage_entry = provider_rw
4687            .tx
4688            .cursor_dup_read::<tables::PlainStorageState>()
4689            .unwrap()
4690            .seek_by_key_subkey(address, slot_key)
4691            .unwrap()
4692            .unwrap();
4693        assert_eq!(storage_entry.key, slot_key);
4694        assert_eq!(storage_entry.value, U256::from(10));
4695
4696        let hashed_entry = provider_rw
4697            .tx
4698            .cursor_dup_read::<tables::HashedStorages>()
4699            .unwrap()
4700            .seek_by_key_subkey(hashed_address, hashed_slot)
4701            .unwrap()
4702            .unwrap();
4703        assert_eq!(hashed_entry.key, hashed_slot);
4704        assert_eq!(hashed_entry.value, U256::from(10));
4705
4706        let account_cs_entries = provider_rw
4707            .tx
4708            .cursor_dup_read::<tables::AccountChangeSets>()
4709            .unwrap()
4710            .walk(Some(1))
4711            .unwrap()
4712            .collect::<Result<Vec<_>, _>>()
4713            .unwrap();
4714        assert!(!account_cs_entries.is_empty());
4715
4716        let storage_cs_entries = provider_rw
4717            .tx
4718            .cursor_read::<tables::StorageChangeSets>()
4719            .unwrap()
4720            .walk(Some(BlockNumberAddress((1, address))))
4721            .unwrap()
4722            .collect::<Result<Vec<_>, _>>()
4723            .unwrap();
4724        assert!(!storage_cs_entries.is_empty());
4725        assert_eq!(storage_cs_entries[0].1.key, slot_key);
4726
4727        provider_rw.remove_state_above(0).unwrap();
4728
4729        let restored_account = provider_rw
4730            .tx
4731            .cursor_read::<tables::PlainAccountState>()
4732            .unwrap()
4733            .seek_exact(address)
4734            .unwrap()
4735            .unwrap()
4736            .1;
4737        assert_eq!(restored_account.nonce, 0);
4738
4739        let storage_gone = provider_rw
4740            .tx
4741            .cursor_dup_read::<tables::PlainStorageState>()
4742            .unwrap()
4743            .seek_by_key_subkey(address, slot_key)
4744            .unwrap();
4745        assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4746
4747        let account_cs_after = provider_rw
4748            .tx
4749            .cursor_dup_read::<tables::AccountChangeSets>()
4750            .unwrap()
4751            .walk(Some(1))
4752            .unwrap()
4753            .collect::<Result<Vec<_>, _>>()
4754            .unwrap();
4755        assert!(account_cs_after.is_empty());
4756
4757        let storage_cs_after = provider_rw
4758            .tx
4759            .cursor_read::<tables::StorageChangeSets>()
4760            .unwrap()
4761            .walk(Some(BlockNumberAddress((1, address))))
4762            .unwrap()
4763            .collect::<Result<Vec<_>, _>>()
4764            .unwrap();
4765        assert!(storage_cs_after.is_empty());
4766    }
4767
4768    #[test]
4769    fn test_unwind_storage_hashing_legacy() {
4770        let factory = create_test_provider_factory();
4771        let storage_settings = StorageSettings::v1();
4772        assert!(!storage_settings.use_hashed_state());
4773        factory.set_storage_settings_cache(storage_settings);
4774
4775        let address = Address::random();
4776        let hashed_address = keccak256(address);
4777
4778        let plain_slot = B256::random();
4779        let hashed_slot = keccak256(plain_slot);
4780
4781        let current_value = U256::from(100);
4782        let old_value = U256::from(42);
4783
4784        let provider_rw = factory.provider_rw().unwrap();
4785        provider_rw
4786            .tx
4787            .cursor_dup_write::<tables::HashedStorages>()
4788            .unwrap()
4789            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4790            .unwrap();
4791
4792        let changesets = vec![(
4793            BlockNumberAddress((1, address)),
4794            ChangesetEntry { key: StorageSlotKey::Plain(plain_slot), value: old_value },
4795        )];
4796
4797        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4798
4799        assert_eq!(result.len(), 1);
4800        assert!(result.contains_key(&hashed_address));
4801        assert!(result[&hashed_address].contains(&hashed_slot));
4802
4803        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4804        let entry = cursor
4805            .seek_by_key_subkey(hashed_address, hashed_slot)
4806            .unwrap()
4807            .expect("entry should exist");
4808        assert_eq!(entry.key, hashed_slot);
4809        assert_eq!(entry.value, old_value);
4810    }
4811
4812    #[test]
4813    fn test_write_state_and_historical_read_hashed() {
4814        use reth_storage_api::StateProvider;
4815        use reth_trie::{HashedPostState, KeccakKeyHasher};
4816        use revm_database::BundleState;
4817        use revm_state::AccountInfo;
4818
4819        let factory = create_test_provider_factory();
4820        factory.set_storage_settings_cache(StorageSettings::v2());
4821
4822        let address = Address::with_last_byte(1);
4823        let slot = U256::from(5);
4824        let slot_key = B256::from(slot);
4825        let hashed_address = keccak256(address);
4826        let hashed_slot = keccak256(slot_key);
4827
4828        {
4829            let sf = factory.static_file_provider();
4830            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4831            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4832            hw.append_header(&h0, &B256::ZERO).unwrap();
4833            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4834            hw.append_header(&h1, &B256::ZERO).unwrap();
4835            hw.commit().unwrap();
4836
4837            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4838            aw.append_account_changeset(vec![], 0).unwrap();
4839            aw.commit().unwrap();
4840
4841            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4842            sw.append_storage_changeset(vec![], 0).unwrap();
4843            sw.commit().unwrap();
4844        }
4845
4846        let provider_rw = factory.provider_rw().unwrap();
4847
4848        let bundle = BundleState::builder(1..=1)
4849            .state_present_account_info(
4850                address,
4851                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4852            )
4853            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4854            .revert_account_info(1, address, Some(None))
4855            .revert_storage(1, address, vec![(slot, U256::ZERO)])
4856            .build();
4857
4858        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4859
4860        provider_rw
4861            .tx
4862            .put::<tables::BlockBodyIndices>(
4863                1,
4864                StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4865            )
4866            .unwrap();
4867
4868        provider_rw
4869            .write_state(
4870                &execution_outcome,
4871                OriginalValuesKnown::Yes,
4872                StateWriteConfig {
4873                    write_receipts: false,
4874                    write_account_changesets: true,
4875                    write_storage_changesets: true,
4876                },
4877            )
4878            .unwrap();
4879
4880        let hashed_state =
4881            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4882        provider_rw.write_hashed_state(&hashed_state).unwrap();
4883
4884        let plain_storage_entries = provider_rw
4885            .tx
4886            .cursor_dup_read::<tables::PlainStorageState>()
4887            .unwrap()
4888            .walk(None)
4889            .unwrap()
4890            .collect::<Result<Vec<_>, _>>()
4891            .unwrap();
4892        assert!(plain_storage_entries.is_empty());
4893
4894        let hashed_entry = provider_rw
4895            .tx
4896            .cursor_dup_read::<tables::HashedStorages>()
4897            .unwrap()
4898            .seek_by_key_subkey(hashed_address, hashed_slot)
4899            .unwrap()
4900            .unwrap();
4901        assert_eq!(hashed_entry.key, hashed_slot);
4902        assert_eq!(hashed_entry.value, U256::from(10));
4903
4904        provider_rw.static_file_provider().commit().unwrap();
4905
4906        let sf = factory.static_file_provider();
4907        let storage_cs = sf.storage_changeset(1).unwrap();
4908        assert!(!storage_cs.is_empty());
4909        assert_eq!(storage_cs[0].1.key.as_b256(), hashed_slot);
4910
4911        let account_cs = sf.account_block_changeset(1).unwrap();
4912        assert!(!account_cs.is_empty());
4913        assert_eq!(account_cs[0].address, address);
4914
4915        let historical_value =
4916            HistoricalStateProviderRef::new(&*provider_rw, 0).storage(address, slot_key).unwrap();
4917        assert_eq!(historical_value, None);
4918    }
4919
4920    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
4921    enum StorageMode {
4922        V1,
4923        V2,
4924    }
4925
4926    fn run_save_blocks_and_verify(mode: StorageMode) {
4927        use alloy_primitives::map::HashMap;
4928
4929        let factory = create_test_provider_factory();
4930
4931        match mode {
4932            StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
4933            StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
4934        }
4935
4936        let num_blocks = 3u64;
4937        let accounts_per_block = 5usize;
4938        let slots_per_account = 3usize;
4939
4940        let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
4941            SealedHeader::new(
4942                Header { number: 0, difficulty: U256::from(1), ..Default::default() },
4943                B256::ZERO,
4944            ),
4945            Default::default(),
4946        );
4947
4948        let genesis_executed = ExecutedBlock::new(
4949            Arc::new(genesis.try_recover().unwrap()),
4950            Arc::new(BlockExecutionOutput {
4951                result: BlockExecutionResult {
4952                    receipts: vec![],
4953                    requests: Default::default(),
4954                    gas_used: 0,
4955                    blob_gas_used: 0,
4956                },
4957                state: Default::default(),
4958            }),
4959            ComputedTrieData::default(),
4960        );
4961        let provider_rw = factory.provider_rw().unwrap();
4962        provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
4963        provider_rw.commit().unwrap();
4964
4965        let mut blocks: Vec<ExecutedBlock> = Vec::new();
4966        let mut parent_hash = B256::ZERO;
4967
4968        for block_num in 1..=num_blocks {
4969            let mut builder = BundleState::builder(block_num..=block_num);
4970
4971            for acct_idx in 0..accounts_per_block {
4972                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
4973                let info = AccountInfo {
4974                    nonce: block_num,
4975                    balance: U256::from(block_num * 100 + acct_idx as u64),
4976                    ..Default::default()
4977                };
4978
4979                let storage: HashMap<U256, (U256, U256)> = (1..=slots_per_account as u64)
4980                    .map(|s| {
4981                        (
4982                            U256::from(s + acct_idx as u64 * 100),
4983                            (U256::ZERO, U256::from(block_num * 1000 + s)),
4984                        )
4985                    })
4986                    .collect();
4987
4988                let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
4989                    .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
4990                    .collect();
4991
4992                builder = builder
4993                    .state_present_account_info(address, info)
4994                    .revert_account_info(block_num, address, Some(None))
4995                    .state_storage(address, storage)
4996                    .revert_storage(block_num, address, revert_storage);
4997            }
4998
4999            let bundle = builder.build();
5000
5001            let hashed_state =
5002                HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5003
5004            let header = Header {
5005                number: block_num,
5006                parent_hash,
5007                difficulty: U256::from(1),
5008                ..Default::default()
5009            };
5010            let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5011                header,
5012                Default::default(),
5013            );
5014            parent_hash = block.hash();
5015
5016            let executed = ExecutedBlock::new(
5017                Arc::new(block.try_recover().unwrap()),
5018                Arc::new(BlockExecutionOutput {
5019                    result: BlockExecutionResult {
5020                        receipts: vec![],
5021                        requests: Default::default(),
5022                        gas_used: 0,
5023                        blob_gas_used: 0,
5024                    },
5025                    state: bundle,
5026                }),
5027                ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5028            );
5029            blocks.push(executed);
5030        }
5031
5032        let provider_rw = factory.provider_rw().unwrap();
5033        provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5034        provider_rw.commit().unwrap();
5035
5036        let provider = factory.provider().unwrap();
5037
5038        for block_num in 1..=num_blocks {
5039            for acct_idx in 0..accounts_per_block {
5040                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5041                let hashed_address = keccak256(address);
5042
5043                let ha_entry = provider
5044                    .tx_ref()
5045                    .cursor_read::<tables::HashedAccounts>()
5046                    .unwrap()
5047                    .seek_exact(hashed_address)
5048                    .unwrap();
5049                assert!(
5050                    ha_entry.is_some(),
5051                    "HashedAccounts missing for block {block_num} acct {acct_idx}"
5052                );
5053
5054                for s in 1..=slots_per_account as u64 {
5055                    let slot = U256::from(s + acct_idx as u64 * 100);
5056                    let slot_key = B256::from(slot);
5057                    let hashed_slot = keccak256(slot_key);
5058
5059                    let hs_entry = provider
5060                        .tx_ref()
5061                        .cursor_dup_read::<tables::HashedStorages>()
5062                        .unwrap()
5063                        .seek_by_key_subkey(hashed_address, hashed_slot)
5064                        .unwrap();
5065                    assert!(
5066                        hs_entry.is_some(),
5067                        "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5068                    );
5069                    let entry = hs_entry.unwrap();
5070                    assert_eq!(entry.key, hashed_slot);
5071                    assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5072                }
5073            }
5074        }
5075
5076        for block_num in 1..=num_blocks {
5077            let header = provider.header_by_number(block_num).unwrap();
5078            assert!(header.is_some(), "Header missing for block {block_num}");
5079
5080            let indices = provider.block_body_indices(block_num).unwrap();
5081            assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5082        }
5083
5084        let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5085        let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5086
5087        if mode == StorageMode::V2 {
5088            assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5089            assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5090
5091            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5092            assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5093
5094            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5095            assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5096
5097            provider.static_file_provider().commit().unwrap();
5098            let sf = factory.static_file_provider();
5099
5100            for block_num in 1..=num_blocks {
5101                let account_cs = sf.account_block_changeset(block_num).unwrap();
5102                assert!(
5103                    !account_cs.is_empty(),
5104                    "v2: static file AccountChangeSets should exist for block {block_num}"
5105                );
5106
5107                let storage_cs = sf.storage_changeset(block_num).unwrap();
5108                assert!(
5109                    !storage_cs.is_empty(),
5110                    "v2: static file StorageChangeSets should exist for block {block_num}"
5111                );
5112
5113                for (_, entry) in &storage_cs {
5114                    assert!(
5115                        entry.key.is_hashed(),
5116                        "v2: static file storage changeset should have hashed slot keys"
5117                    );
5118                }
5119            }
5120
5121            #[cfg(all(unix, feature = "rocksdb"))]
5122            {
5123                let rocksdb = factory.rocksdb_provider();
5124                for block_num in 1..=num_blocks {
5125                    for acct_idx in 0..accounts_per_block {
5126                        let address =
5127                            Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5128                        let shards = rocksdb.account_history_shards(address).unwrap();
5129                        assert!(
5130                            !shards.is_empty(),
5131                            "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5132                        );
5133
5134                        for s in 1..=slots_per_account as u64 {
5135                            let slot = U256::from(s + acct_idx as u64 * 100);
5136                            let slot_key = B256::from(slot);
5137                            let hashed_slot = keccak256(slot_key);
5138
5139                            let shards =
5140                                rocksdb.storage_history_shards(address, hashed_slot).unwrap();
5141                            assert!(
5142                                !shards.is_empty(),
5143                                "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5144                            );
5145                        }
5146                    }
5147                }
5148            }
5149        } else {
5150            assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5151            assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5152
5153            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5154            assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5155
5156            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5157            assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5158
5159            for block_num in 1..=num_blocks {
5160                let storage_entries: Vec<_> = provider
5161                    .tx_ref()
5162                    .cursor_dup_read::<tables::StorageChangeSets>()
5163                    .unwrap()
5164                    .walk_range(BlockNumberAddress::range(block_num..=block_num))
5165                    .unwrap()
5166                    .collect::<Result<Vec<_>, _>>()
5167                    .unwrap();
5168                assert!(
5169                    !storage_entries.is_empty(),
5170                    "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5171                );
5172
5173                for (_, entry) in &storage_entries {
5174                    let slot_key = B256::from(entry.key);
5175                    assert!(
5176                        slot_key != keccak256(slot_key),
5177                        "v1: storage changeset keys should be plain (not hashed)"
5178                    );
5179                }
5180            }
5181
5182            let mdbx_account_history =
5183                provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5184            assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5185
5186            let mdbx_storage_history =
5187                provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5188            assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5189        }
5190    }
5191
5192    #[test]
5193    fn test_save_blocks_v1_table_assertions() {
5194        run_save_blocks_and_verify(StorageMode::V1);
5195    }
5196
5197    #[test]
5198    fn test_save_blocks_v2_table_assertions() {
5199        run_save_blocks_and_verify(StorageMode::V2);
5200    }
5201
5202    #[test]
5203    fn test_write_and_remove_state_roundtrip_v2() {
5204        let factory = create_test_provider_factory();
5205        let storage_settings = StorageSettings::v2();
5206        assert!(storage_settings.use_hashed_state());
5207        factory.set_storage_settings_cache(storage_settings);
5208
5209        let address = Address::with_last_byte(1);
5210        let hashed_address = keccak256(address);
5211        let slot = U256::from(5);
5212        let slot_key = B256::from(slot);
5213        let hashed_slot = keccak256(slot_key);
5214
5215        {
5216            let sf = factory.static_file_provider();
5217            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5218            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5219            hw.append_header(&h0, &B256::ZERO).unwrap();
5220            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5221            hw.append_header(&h1, &B256::ZERO).unwrap();
5222            hw.commit().unwrap();
5223
5224            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5225            aw.append_account_changeset(vec![], 0).unwrap();
5226            aw.commit().unwrap();
5227
5228            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5229            sw.append_storage_changeset(vec![], 0).unwrap();
5230            sw.commit().unwrap();
5231        }
5232
5233        {
5234            let provider_rw = factory.provider_rw().unwrap();
5235            provider_rw
5236                .tx
5237                .put::<tables::BlockBodyIndices>(
5238                    0,
5239                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5240                )
5241                .unwrap();
5242            provider_rw
5243                .tx
5244                .put::<tables::BlockBodyIndices>(
5245                    1,
5246                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5247                )
5248                .unwrap();
5249            provider_rw
5250                .tx
5251                .cursor_write::<tables::HashedAccounts>()
5252                .unwrap()
5253                .upsert(
5254                    hashed_address,
5255                    &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5256                )
5257                .unwrap();
5258            provider_rw.commit().unwrap();
5259        }
5260
5261        let provider_rw = factory.provider_rw().unwrap();
5262
5263        let bundle = BundleState::builder(1..=1)
5264            .state_present_account_info(
5265                address,
5266                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5267            )
5268            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5269            .revert_account_info(1, address, Some(None))
5270            .revert_storage(1, address, vec![(slot, U256::ZERO)])
5271            .build();
5272
5273        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5274
5275        provider_rw
5276            .write_state(
5277                &execution_outcome,
5278                OriginalValuesKnown::Yes,
5279                StateWriteConfig {
5280                    write_receipts: false,
5281                    write_account_changesets: true,
5282                    write_storage_changesets: true,
5283                },
5284            )
5285            .unwrap();
5286
5287        let hashed_state =
5288            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5289        provider_rw.write_hashed_state(&hashed_state).unwrap();
5290
5291        let hashed_account = provider_rw
5292            .tx
5293            .cursor_read::<tables::HashedAccounts>()
5294            .unwrap()
5295            .seek_exact(hashed_address)
5296            .unwrap()
5297            .unwrap()
5298            .1;
5299        assert_eq!(hashed_account.nonce, 1);
5300
5301        let hashed_entry = provider_rw
5302            .tx
5303            .cursor_dup_read::<tables::HashedStorages>()
5304            .unwrap()
5305            .seek_by_key_subkey(hashed_address, hashed_slot)
5306            .unwrap()
5307            .unwrap();
5308        assert_eq!(hashed_entry.key, hashed_slot);
5309        assert_eq!(hashed_entry.value, U256::from(10));
5310
5311        let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5312        assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5313
5314        let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5315        assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5316
5317        provider_rw.static_file_provider().commit().unwrap();
5318
5319        let sf = factory.static_file_provider();
5320        let storage_cs = sf.storage_changeset(1).unwrap();
5321        assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5322        assert_eq!(
5323            storage_cs[0].1.key.as_b256(),
5324            hashed_slot,
5325            "v2: changeset key should be hashed"
5326        );
5327
5328        provider_rw.remove_state_above(0).unwrap();
5329
5330        let restored_account = provider_rw
5331            .tx
5332            .cursor_read::<tables::HashedAccounts>()
5333            .unwrap()
5334            .seek_exact(hashed_address)
5335            .unwrap();
5336        assert!(
5337            restored_account.is_none(),
5338            "v2: account should be removed (didn't exist before block 1)"
5339        );
5340
5341        let storage_gone = provider_rw
5342            .tx
5343            .cursor_dup_read::<tables::HashedStorages>()
5344            .unwrap()
5345            .seek_by_key_subkey(hashed_address, hashed_slot)
5346            .unwrap();
5347        assert!(
5348            storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5349            "v2: storage should be reverted (removed or different key)"
5350        );
5351
5352        let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5353        assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5354
5355        let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5356        assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5357    }
5358
5359    #[test]
5360    fn test_populate_bundle_state_hashed_with_hashed_keys() {
5361        let factory = create_test_provider_factory();
5362        factory.set_storage_settings_cache(StorageSettings::v2());
5363
5364        let address = Address::with_last_byte(1);
5365        let hashed_address = keccak256(address);
5366        let slot_key = B256::from(U256::from(42));
5367        let hashed_slot = keccak256(slot_key);
5368        let current_value = U256::from(100);
5369        let old_value = U256::from(50);
5370
5371        let provider_rw = factory.provider_rw().unwrap();
5372
5373        provider_rw
5374            .tx
5375            .cursor_write::<tables::HashedAccounts>()
5376            .unwrap()
5377            .upsert(hashed_address, &Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None })
5378            .unwrap();
5379        provider_rw
5380            .tx
5381            .cursor_dup_write::<tables::HashedStorages>()
5382            .unwrap()
5383            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
5384            .unwrap();
5385
5386        let storage_changeset = vec![(
5387            BlockNumberAddress((1, address)),
5388            ChangesetEntry { key: StorageSlotKey::Hashed(hashed_slot), value: old_value },
5389        )];
5390
5391        let account_changeset = vec![(
5392            1u64,
5393            AccountBeforeTx {
5394                address,
5395                info: Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
5396            },
5397        )];
5398
5399        let mut hashed_accounts_cursor =
5400            provider_rw.tx.cursor_read::<tables::HashedAccounts>().unwrap();
5401        let mut hashed_storage_cursor =
5402            provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
5403
5404        let (state, reverts) = provider_rw
5405            .populate_bundle_state_hashed(
5406                account_changeset,
5407                storage_changeset,
5408                &mut hashed_accounts_cursor,
5409                &mut hashed_storage_cursor,
5410            )
5411            .unwrap();
5412
5413        let (_, new_account, storage_map) =
5414            state.get(&address).expect("address should be in state");
5415        assert!(new_account.is_some());
5416        assert_eq!(new_account.unwrap().nonce, 1);
5417
5418        let (old_val, new_val) =
5419            storage_map.get(&hashed_slot).expect("hashed slot should be in storage map");
5420        assert_eq!(*old_val, old_value);
5421        assert_eq!(*new_val, current_value);
5422
5423        let block_reverts = reverts.get(&1).expect("block 1 should have reverts");
5424        let (_, storage_reverts) =
5425            block_reverts.get(&address).expect("address should have reverts");
5426        assert_eq!(storage_reverts.len(), 1);
5427        assert_eq!(storage_reverts[0].key, hashed_slot);
5428        assert_eq!(storage_reverts[0].value, old_value);
5429    }
5430
5431    #[test]
5432    #[cfg(all(unix, feature = "rocksdb"))]
5433    fn test_unwind_storage_history_indices_v2() {
5434        let factory = create_test_provider_factory();
5435        factory.set_storage_settings_cache(StorageSettings::v2());
5436
5437        let address = Address::with_last_byte(1);
5438        let slot_key = B256::from(U256::from(42));
5439        let hashed_slot = keccak256(slot_key);
5440
5441        {
5442            let rocksdb = factory.rocksdb_provider();
5443            let mut batch = rocksdb.batch();
5444            batch.append_storage_history_shard(address, hashed_slot, vec![3u64, 7, 10]).unwrap();
5445            batch.commit().unwrap();
5446
5447            let shards = rocksdb.storage_history_shards(address, hashed_slot).unwrap();
5448            assert!(!shards.is_empty(), "history should be written to rocksdb");
5449        }
5450
5451        let provider_rw = factory.provider_rw().unwrap();
5452
5453        let changesets = vec![
5454            (
5455                BlockNumberAddress((7, address)),
5456                ChangesetEntry { key: StorageSlotKey::Hashed(hashed_slot), value: U256::from(5) },
5457            ),
5458            (
5459                BlockNumberAddress((10, address)),
5460                ChangesetEntry { key: StorageSlotKey::Hashed(hashed_slot), value: U256::from(8) },
5461            ),
5462        ];
5463
5464        let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5465        assert_eq!(count, 2);
5466
5467        provider_rw.commit().unwrap();
5468
5469        let rocksdb = factory.rocksdb_provider();
5470        let shards = rocksdb.storage_history_shards(address, hashed_slot).unwrap();
5471
5472        assert!(
5473            !shards.is_empty(),
5474            "history shards should still exist with block 3 after partial unwind"
5475        );
5476
5477        let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5478        assert!(all_blocks.contains(&3), "block 3 should remain");
5479        assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5480        assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5481    }
5482}