reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, HashMap, HashSet},
31    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::Database,
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        ShardedKey, StorageBeforeTx, StorageSettings, StoredBlockBodyIndices,
44    },
45    table::Table,
46    tables,
47    transaction::{DbTx, DbTxMut},
48    BlockNumberList, PlainAccountState, PlainStorageState,
49};
50use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
51use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
52use reth_primitives_traits::{
53    Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
54};
55use reth_prune_types::{
56    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
57};
58use reth_stages_types::{StageCheckpoint, StageId};
59use reth_static_file_types::StaticFileSegment;
60use reth_storage_api::{
61    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
62    NodePrimitivesProvider, StateProvider, StateWriteConfig, StorageChangeSetReader,
63    StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
64};
65use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
66use reth_trie::{
67    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
68    HashedPostStateSorted, StoredNibbles,
69};
70use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor};
71use revm_database::states::{
72    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
73};
74use std::{
75    cmp::Ordering,
76    collections::{BTreeMap, BTreeSet},
77    fmt::Debug,
78    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
79    sync::Arc,
80    thread,
81    time::Instant,
82};
83use tracing::{debug, instrument, trace};
84
85/// Determines the commit order for database operations.
86#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
87pub enum CommitOrder {
88    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
89    #[default]
90    Normal,
91    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
92    /// Used for unwind operations to allow recovery by truncating static files on restart.
93    Unwind,
94}
95
96impl CommitOrder {
97    /// Returns true if this is unwind commit order.
98    pub const fn is_unwind(&self) -> bool {
99        matches!(self, Self::Unwind)
100    }
101}
102
103/// A [`DatabaseProvider`] that holds a read-only database transaction.
104pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
105
106/// A [`DatabaseProvider`] that holds a read-write database transaction.
107///
108/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
109/// Once that issue is solved, we can probably revert back to being an alias type.
110#[derive(Debug)]
111pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
112    pub DatabaseProvider<<DB as Database>::TXMut, N>,
113);
114
115impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
116    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
117
118    fn deref(&self) -> &Self::Target {
119        &self.0
120    }
121}
122
123impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
124    fn deref_mut(&mut self) -> &mut Self::Target {
125        &mut self.0
126    }
127}
128
129impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
130    for DatabaseProviderRW<DB, N>
131{
132    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
133        &self.0
134    }
135}
136
137impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
138    /// Commit database transaction and static file if it exists.
139    pub fn commit(self) -> ProviderResult<()> {
140        self.0.commit()
141    }
142
143    /// Consume `DbTx` or `DbTxMut`.
144    pub fn into_tx(self) -> <DB as Database>::TXMut {
145        self.0.into_tx()
146    }
147
148    /// Override the minimum pruning distance for testing purposes.
149    #[cfg(any(test, feature = "test-utils"))]
150    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
151        self.0.minimum_pruning_distance = distance;
152        self
153    }
154}
155
156impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
157    for DatabaseProvider<<DB as Database>::TXMut, N>
158{
159    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
160        provider.0
161    }
162}
163
164/// Mode for [`DatabaseProvider::save_blocks`].
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum SaveBlocksMode {
167    /// Full mode: write block structure + receipts + state + trie.
168    /// Used by engine/production code.
169    Full,
170    /// Blocks only: write block structure (headers, txs, senders, indices).
171    /// Receipts/state/trie are skipped - they may come later via separate calls.
172    /// Used by `insert_block`.
173    BlocksOnly,
174}
175
176impl SaveBlocksMode {
177    /// Returns `true` if this is [`SaveBlocksMode::Full`].
178    pub const fn with_state(self) -> bool {
179        matches!(self, Self::Full)
180    }
181}
182
183/// A provider struct that fetches data from the database.
184/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
185pub struct DatabaseProvider<TX, N: NodeTypes> {
186    /// Database transaction.
187    tx: TX,
188    /// Chain spec
189    chain_spec: Arc<N::ChainSpec>,
190    /// Static File provider
191    static_file_provider: StaticFileProvider<N::Primitives>,
192    /// Pruning configuration
193    prune_modes: PruneModes,
194    /// Node storage handler.
195    storage: Arc<N::Storage>,
196    /// Storage configuration settings for this node
197    storage_settings: Arc<RwLock<StorageSettings>>,
198    /// `RocksDB` provider
199    rocksdb_provider: RocksDBProvider,
200    /// Changeset cache for trie unwinding
201    changeset_cache: ChangesetCache,
202    /// Pending `RocksDB` batches to be committed at provider commit time.
203    #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
204    pending_rocksdb_batches: PendingRocksDBBatches,
205    /// Commit order for database operations.
206    commit_order: CommitOrder,
207    /// Minimum distance from tip required for pruning
208    minimum_pruning_distance: u64,
209    /// Database provider metrics
210    metrics: metrics::DatabaseProviderMetrics,
211}
212
213impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        let mut s = f.debug_struct("DatabaseProvider");
216        s.field("tx", &self.tx)
217            .field("chain_spec", &self.chain_spec)
218            .field("static_file_provider", &self.static_file_provider)
219            .field("prune_modes", &self.prune_modes)
220            .field("storage", &self.storage)
221            .field("storage_settings", &self.storage_settings)
222            .field("rocksdb_provider", &self.rocksdb_provider)
223            .field("changeset_cache", &self.changeset_cache)
224            .field("pending_rocksdb_batches", &"<pending batches>")
225            .field("commit_order", &self.commit_order)
226            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
227            .finish()
228    }
229}
230
231impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
232    /// Returns reference to prune modes.
233    pub const fn prune_modes_ref(&self) -> &PruneModes {
234        &self.prune_modes
235    }
236}
237
238impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
239    /// State provider for latest state
240    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
241        trace!(target: "providers::db", "Returning latest state provider");
242        Box::new(LatestStateProviderRef::new(self))
243    }
244
245    /// Storage provider for state at that given block hash
246    pub fn history_by_block_hash<'a>(
247        &'a self,
248        block_hash: BlockHash,
249    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
250        let mut block_number =
251            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
252        if block_number == self.best_block_number().unwrap_or_default() &&
253            block_number == self.last_block_number().unwrap_or_default()
254        {
255            return Ok(Box::new(LatestStateProviderRef::new(self)))
256        }
257
258        // +1 as the changeset that we want is the one that was applied after this block.
259        block_number += 1;
260
261        let account_history_prune_checkpoint =
262            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
263        let storage_history_prune_checkpoint =
264            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
265
266        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
267
268        // If we pruned account or storage history, we can't return state on every historical block.
269        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
270        if let Some(prune_checkpoint_block_number) =
271            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
272        {
273            state_provider = state_provider.with_lowest_available_account_history_block_number(
274                prune_checkpoint_block_number + 1,
275            );
276        }
277        if let Some(prune_checkpoint_block_number) =
278            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
279        {
280            state_provider = state_provider.with_lowest_available_storage_history_block_number(
281                prune_checkpoint_block_number + 1,
282            );
283        }
284
285        Ok(Box::new(state_provider))
286    }
287
288    #[cfg(feature = "test-utils")]
289    /// Sets the prune modes for provider.
290    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
291        self.prune_modes = prune_modes;
292    }
293}
294
295impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
296    type Primitives = N::Primitives;
297}
298
299impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
300    /// Returns a static file provider
301    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
302        self.static_file_provider.clone()
303    }
304
305    fn get_static_file_writer(
306        &self,
307        block: BlockNumber,
308        segment: StaticFileSegment,
309    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
310        self.static_file_provider.get_writer(block, segment)
311    }
312}
313
314impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
315    /// Returns the `RocksDB` provider.
316    fn rocksdb_provider(&self) -> RocksDBProvider {
317        self.rocksdb_provider.clone()
318    }
319
320    #[cfg(all(unix, feature = "rocksdb"))]
321    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
322        self.pending_rocksdb_batches.lock().push(batch);
323    }
324}
325
326impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
327    for DatabaseProvider<TX, N>
328{
329    type ChainSpec = N::ChainSpec;
330
331    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
332        self.chain_spec.clone()
333    }
334}
335
336impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
337    /// Creates a provider with an inner read-write transaction.
338    #[allow(clippy::too_many_arguments)]
339    fn new_rw_inner(
340        tx: TX,
341        chain_spec: Arc<N::ChainSpec>,
342        static_file_provider: StaticFileProvider<N::Primitives>,
343        prune_modes: PruneModes,
344        storage: Arc<N::Storage>,
345        storage_settings: Arc<RwLock<StorageSettings>>,
346        rocksdb_provider: RocksDBProvider,
347        changeset_cache: ChangesetCache,
348        commit_order: CommitOrder,
349    ) -> Self {
350        Self {
351            tx,
352            chain_spec,
353            static_file_provider,
354            prune_modes,
355            storage,
356            storage_settings,
357            rocksdb_provider,
358            changeset_cache,
359            pending_rocksdb_batches: Default::default(),
360            commit_order,
361            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
362            metrics: metrics::DatabaseProviderMetrics::default(),
363        }
364    }
365
366    /// Creates a provider with an inner read-write transaction using normal commit order.
367    #[allow(clippy::too_many_arguments)]
368    pub fn new_rw(
369        tx: TX,
370        chain_spec: Arc<N::ChainSpec>,
371        static_file_provider: StaticFileProvider<N::Primitives>,
372        prune_modes: PruneModes,
373        storage: Arc<N::Storage>,
374        storage_settings: Arc<RwLock<StorageSettings>>,
375        rocksdb_provider: RocksDBProvider,
376        changeset_cache: ChangesetCache,
377    ) -> Self {
378        Self::new_rw_inner(
379            tx,
380            chain_spec,
381            static_file_provider,
382            prune_modes,
383            storage,
384            storage_settings,
385            rocksdb_provider,
386            changeset_cache,
387            CommitOrder::Normal,
388        )
389    }
390
391    /// Creates a provider with an inner read-write transaction using unwind commit order.
392    #[allow(clippy::too_many_arguments)]
393    pub fn new_unwind_rw(
394        tx: TX,
395        chain_spec: Arc<N::ChainSpec>,
396        static_file_provider: StaticFileProvider<N::Primitives>,
397        prune_modes: PruneModes,
398        storage: Arc<N::Storage>,
399        storage_settings: Arc<RwLock<StorageSettings>>,
400        rocksdb_provider: RocksDBProvider,
401        changeset_cache: ChangesetCache,
402    ) -> Self {
403        Self::new_rw_inner(
404            tx,
405            chain_spec,
406            static_file_provider,
407            prune_modes,
408            storage,
409            storage_settings,
410            rocksdb_provider,
411            changeset_cache,
412            CommitOrder::Unwind,
413        )
414    }
415}
416
417impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
418    fn as_ref(&self) -> &Self {
419        self
420    }
421}
422
423impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
424    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
425    ///
426    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
427    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
428    where
429        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
430    {
431        #[cfg(all(unix, feature = "rocksdb"))]
432        let rocksdb = self.rocksdb_provider();
433        #[cfg(all(unix, feature = "rocksdb"))]
434        let rocksdb_batch = rocksdb.batch();
435        #[cfg(not(all(unix, feature = "rocksdb")))]
436        let rocksdb_batch = ();
437
438        let (result, raw_batch) = f(rocksdb_batch)?;
439
440        #[cfg(all(unix, feature = "rocksdb"))]
441        if let Some(batch) = raw_batch {
442            self.set_pending_rocksdb_batch(batch);
443        }
444        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
445
446        Ok(result)
447    }
448
449    /// Creates the context for static file writes.
450    fn static_file_write_ctx(
451        &self,
452        save_mode: SaveBlocksMode,
453        first_block: BlockNumber,
454        last_block: BlockNumber,
455    ) -> ProviderResult<StaticFileWriteCtx> {
456        let tip = self.last_block_number()?.max(last_block);
457        Ok(StaticFileWriteCtx {
458            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
459                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
460            write_receipts: save_mode.with_state() &&
461                EitherWriter::receipts_destination(self).is_static_file(),
462            write_account_changesets: save_mode.with_state() &&
463                EitherWriterDestination::account_changesets(self).is_static_file(),
464            write_storage_changesets: save_mode.with_state() &&
465                EitherWriterDestination::storage_changesets(self).is_static_file(),
466            tip,
467            receipts_prune_mode: self.prune_modes.receipts,
468            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
469            receipts_prunable: self
470                .static_file_provider
471                .get_highest_static_file_tx(StaticFileSegment::Receipts)
472                .is_none() &&
473                PruneMode::Distance(self.minimum_pruning_distance)
474                    .should_prune(first_block, tip),
475        })
476    }
477
478    /// Creates the context for `RocksDB` writes.
479    #[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
480    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
481        RocksDBWriteCtx {
482            first_block_number: first_block,
483            prune_tx_lookup: self.prune_modes.transaction_lookup,
484            storage_settings: self.cached_storage_settings(),
485            pending_batches: self.pending_rocksdb_batches.clone(),
486        }
487    }
488
489    /// Writes executed blocks and state to storage.
490    ///
491    /// This method parallelizes static file (SF) writes with MDBX writes.
492    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
493    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
494    ///
495    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
496    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
497    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
498    pub fn save_blocks(
499        &self,
500        blocks: Vec<ExecutedBlock<N::Primitives>>,
501        save_mode: SaveBlocksMode,
502    ) -> ProviderResult<()> {
503        if blocks.is_empty() {
504            debug!(target: "providers::db", "Attempted to write empty block range");
505            return Ok(())
506        }
507
508        let total_start = Instant::now();
509        let block_count = blocks.len() as u64;
510        let first_number = blocks.first().unwrap().recovered_block().number();
511        let last_block_number = blocks.last().unwrap().recovered_block().number();
512
513        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
514
515        // Compute tx_nums upfront (both threads need these)
516        let first_tx_num = self
517            .tx
518            .cursor_read::<tables::TransactionBlocks>()?
519            .last()?
520            .map(|(n, _)| n + 1)
521            .unwrap_or_default();
522
523        let tx_nums: Vec<TxNumber> = {
524            let mut nums = Vec::with_capacity(blocks.len());
525            let mut current = first_tx_num;
526            for block in &blocks {
527                nums.push(current);
528                current += block.recovered_block().body().transaction_count() as u64;
529            }
530            nums
531        };
532
533        let mut timings = metrics::SaveBlocksTimings { block_count, ..Default::default() };
534
535        // avoid capturing &self.tx in scope below.
536        let sf_provider = &self.static_file_provider;
537        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
538        #[cfg(all(unix, feature = "rocksdb"))]
539        let rocksdb_provider = self.rocksdb_provider.clone();
540        #[cfg(all(unix, feature = "rocksdb"))]
541        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
542
543        thread::scope(|s| {
544            // SF writes
545            let sf_handle = s.spawn(|| {
546                let start = Instant::now();
547                sf_provider.write_blocks_data(&blocks, &tx_nums, sf_ctx)?;
548                Ok::<_, ProviderError>(start.elapsed())
549            });
550
551            // RocksDB writes
552            #[cfg(all(unix, feature = "rocksdb"))]
553            let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
554                s.spawn(|| {
555                    let start = Instant::now();
556                    rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
557                    Ok::<_, ProviderError>(start.elapsed())
558                })
559            });
560
561            // MDBX writes
562            let mdbx_start = Instant::now();
563
564            // Collect all transaction hashes across all blocks, sort them, and write in batch
565            if !self.cached_storage_settings().transaction_hash_numbers_in_rocksdb &&
566                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
567            {
568                let start = Instant::now();
569                let total_tx_count: usize =
570                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
571                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
572                for (i, block) in blocks.iter().enumerate() {
573                    let recovered_block = block.recovered_block();
574                    let mut tx_num = tx_nums[i];
575                    for transaction in recovered_block.body().transactions_iter() {
576                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
577                        tx_num += 1;
578                    }
579                }
580
581                // Sort by hash for optimal MDBX insertion performance
582                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
583
584                // Write all transaction hash numbers in a single batch
585                self.with_rocksdb_batch(|batch| {
586                    let mut tx_hash_writer =
587                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
588                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
589                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
590                    Ok(((), raw_batch))
591                })?;
592                self.metrics.record_duration(
593                    metrics::Action::InsertTransactionHashNumbers,
594                    start.elapsed(),
595                );
596            }
597
598            for (i, block) in blocks.iter().enumerate() {
599                let recovered_block = block.recovered_block();
600
601                let start = Instant::now();
602                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
603                timings.insert_block += start.elapsed();
604
605                if save_mode.with_state() {
606                    let execution_output = block.execution_outcome();
607
608                    // Write state and changesets to the database.
609                    // Must be written after blocks because of the receipt lookup.
610                    // Skip receipts/account changesets if they're being written to static files.
611                    let start = Instant::now();
612                    self.write_state(
613                        WriteStateInput::Single {
614                            outcome: execution_output,
615                            block: recovered_block.number(),
616                        },
617                        OriginalValuesKnown::No,
618                        StateWriteConfig {
619                            write_receipts: !sf_ctx.write_receipts,
620                            write_account_changesets: !sf_ctx.write_account_changesets,
621                        },
622                    )?;
623                    timings.write_state += start.elapsed();
624
625                    let trie_data = block.trie_data();
626
627                    // insert hashes and intermediate merkle nodes
628                    let start = Instant::now();
629                    self.write_hashed_state(&trie_data.hashed_state)?;
630                    timings.write_hashed_state += start.elapsed();
631                }
632            }
633
634            // Write all trie updates in a single batch.
635            // This reduces cursor open/close overhead from N calls to 1.
636            if save_mode.with_state() {
637                let start = Instant::now();
638
639                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
640                let merged =
641                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
642
643                if !merged.is_empty() {
644                    self.write_trie_updates_sorted(&merged)?;
645                }
646                timings.write_trie_updates += start.elapsed();
647            }
648
649            // Full mode: update history indices
650            if save_mode.with_state() {
651                let start = Instant::now();
652                self.update_history_indices(first_number..=last_block_number)?;
653                timings.update_history_indices = start.elapsed();
654            }
655
656            // Update pipeline progress
657            let start = Instant::now();
658            self.update_pipeline_stages(last_block_number, false)?;
659            timings.update_pipeline_stages = start.elapsed();
660
661            timings.mdbx = mdbx_start.elapsed();
662
663            // Wait for SF thread
664            timings.sf = sf_handle
665                .join()
666                .map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;
667
668            // Wait for RocksDB thread
669            #[cfg(all(unix, feature = "rocksdb"))]
670            if let Some(handle) = rocksdb_handle {
671                timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
672            }
673
674            timings.total = total_start.elapsed();
675
676            self.metrics.record_save_blocks(&timings);
677            debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
678
679            Ok(())
680        })
681    }
682
683    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
684    ///
685    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
686    #[instrument(level = "debug", target = "providers::db", skip_all)]
687    fn insert_block_mdbx_only(
688        &self,
689        block: &RecoveredBlock<BlockTy<N>>,
690        first_tx_num: TxNumber,
691    ) -> ProviderResult<StoredBlockBodyIndices> {
692        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
693            EitherWriterDestination::senders(self).is_database()
694        {
695            let start = Instant::now();
696            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
697            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
698            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
699                cursor.append(tx_num, &sender)?;
700            }
701            self.metrics
702                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
703        }
704
705        let block_number = block.number();
706        let tx_count = block.body().transaction_count() as u64;
707
708        let start = Instant::now();
709        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
710        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
711
712        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
713
714        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
715    }
716
717    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
718    /// `Ommers`/`Withdrawals`).
719    fn write_block_body_indices(
720        &self,
721        block_number: BlockNumber,
722        body: &BodyTy<N>,
723        first_tx_num: TxNumber,
724        tx_count: u64,
725    ) -> ProviderResult<()> {
726        // MDBX: BlockBodyIndices
727        let start = Instant::now();
728        self.tx
729            .cursor_write::<tables::BlockBodyIndices>()?
730            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
731        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
732
733        // MDBX: TransactionBlocks (last tx -> block mapping)
734        if tx_count > 0 {
735            let start = Instant::now();
736            self.tx
737                .cursor_write::<tables::TransactionBlocks>()?
738                .append(first_tx_num + tx_count - 1, &block_number)?;
739            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
740        }
741
742        // MDBX: Ommers/Withdrawals
743        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
744
745        Ok(())
746    }
747
748    /// Unwinds trie state starting at and including the given block.
749    ///
750    /// This includes calculating the resulted state root and comparing it with the parent block
751    /// state root.
752    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
753        let changed_accounts = self
754            .tx
755            .cursor_read::<tables::AccountChangeSets>()?
756            .walk_range(from..)?
757            .collect::<Result<Vec<_>, _>>()?;
758
759        // Unwind account hashes.
760        self.unwind_account_hashing(changed_accounts.iter())?;
761
762        // Unwind account history indices.
763        self.unwind_account_history_indices(changed_accounts.iter())?;
764
765        let storage_start = BlockNumberAddress((from, Address::ZERO));
766        let changed_storages = self
767            .tx
768            .cursor_read::<tables::StorageChangeSets>()?
769            .walk_range(storage_start..)?
770            .collect::<Result<Vec<_>, _>>()?;
771
772        // Unwind storage hashes.
773        self.unwind_storage_hashing(changed_storages.iter().copied())?;
774
775        // Unwind storage history indices.
776        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
777
778        // Unwind accounts/storages trie tables using the revert.
779        // Get the database tip block number
780        let db_tip_block = self
781            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
782            .as_ref()
783            .map(|chk| chk.block_number)
784            .ok_or_else(|| ProviderError::InsufficientChangesets {
785                requested: from,
786                available: 0..=0,
787            })?;
788
789        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
790        self.write_trie_updates_sorted(&trie_revert)?;
791
792        Ok(())
793    }
794
795    /// Removes receipts from all transactions starting with provided number (inclusive).
796    fn remove_receipts_from(
797        &self,
798        from_tx: TxNumber,
799        last_block: BlockNumber,
800    ) -> ProviderResult<()> {
801        // iterate over block body and remove receipts
802        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
803
804        if EitherWriter::receipts_destination(self).is_static_file() {
805            let static_file_receipt_num =
806                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
807
808            let to_delete = static_file_receipt_num
809                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
810                .unwrap_or_default();
811
812            self.static_file_provider
813                .latest_writer(StaticFileSegment::Receipts)?
814                .prune_receipts(to_delete, last_block)?;
815        }
816
817        Ok(())
818    }
819}
820
821impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
822    fn try_into_history_at_block(
823        self,
824        mut block_number: BlockNumber,
825    ) -> ProviderResult<StateProviderBox> {
826        // if the block number is the same as the currently best block number on disk we can use the
827        // latest state provider here
828        if block_number == self.best_block_number().unwrap_or_default() {
829            return Ok(Box::new(LatestStateProvider::new(self)))
830        }
831
832        // +1 as the changeset that we want is the one that was applied after this block.
833        block_number += 1;
834
835        let account_history_prune_checkpoint =
836            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
837        let storage_history_prune_checkpoint =
838            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
839
840        let mut state_provider = HistoricalStateProvider::new(self, block_number);
841
842        // If we pruned account or storage history, we can't return state on every historical block.
843        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
844        if let Some(prune_checkpoint_block_number) =
845            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
846        {
847            state_provider = state_provider.with_lowest_available_account_history_block_number(
848                prune_checkpoint_block_number + 1,
849            );
850        }
851        if let Some(prune_checkpoint_block_number) =
852            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
853        {
854            state_provider = state_provider.with_lowest_available_storage_history_block_number(
855                prune_checkpoint_block_number + 1,
856            );
857        }
858
859        Ok(Box::new(state_provider))
860    }
861}
862
863/// For a given key, unwind all history shards that contain block numbers at or above the given
864/// block number.
865///
866/// S - Sharded key subtype.
867/// T - Table to walk over.
868/// C - Cursor implementation.
869///
870/// This function walks the entries from the given start key and deletes all shards that belong to
871/// the key and contain block numbers at or above the given block number. Shards entirely below
872/// the block number are preserved.
873///
874/// The boundary shard (the shard that spans across the block number) is removed from the database.
875/// Any indices that are below the block number are filtered out and returned for reinsertion.
876/// The boundary shard is returned for reinsertion (if it's not empty).
877fn unwind_history_shards<S, T, C>(
878    cursor: &mut C,
879    start_key: T::Key,
880    block_number: BlockNumber,
881    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
882) -> ProviderResult<Vec<u64>>
883where
884    T: Table<Value = BlockNumberList>,
885    T::Key: AsRef<ShardedKey<S>>,
886    C: DbCursorRO<T> + DbCursorRW<T>,
887{
888    // Start from the given key and iterate through shards
889    let mut item = cursor.seek_exact(start_key)?;
890    while let Some((sharded_key, list)) = item {
891        // If the shard does not belong to the key, break.
892        if !shard_belongs_to_key(&sharded_key) {
893            break
894        }
895
896        // Always delete the current shard from the database first
897        // We'll decide later what (if anything) to reinsert
898        cursor.delete_current()?;
899
900        // Get the first (lowest) block number in this shard
901        // All block numbers in a shard are sorted in ascending order
902        let first = list.iter().next().expect("List can't be empty");
903
904        // Case 1: Entire shard is at or above the unwinding point
905        // Keep it deleted (don't return anything for reinsertion)
906        if first >= block_number {
907            item = cursor.prev()?;
908            continue
909        }
910        // Case 2: This is a boundary shard (spans across the unwinding point)
911        // The shard contains some blocks below and some at/above the unwinding point
912        else if block_number <= sharded_key.as_ref().highest_block_number {
913            // Return only the block numbers that are below the unwinding point
914            // These will be reinserted to preserve the historical data
915            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
916        }
917        // Case 3: Entire shard is below the unwinding point
918        // Return all block numbers for reinsertion (preserve entire shard)
919        return Ok(list.iter().collect::<Vec<_>>())
920    }
921
922    // No shards found or all processed
923    Ok(Vec::new())
924}
925
926impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
927    /// Creates a provider with an inner read-only transaction.
928    #[allow(clippy::too_many_arguments)]
929    pub fn new(
930        tx: TX,
931        chain_spec: Arc<N::ChainSpec>,
932        static_file_provider: StaticFileProvider<N::Primitives>,
933        prune_modes: PruneModes,
934        storage: Arc<N::Storage>,
935        storage_settings: Arc<RwLock<StorageSettings>>,
936        rocksdb_provider: RocksDBProvider,
937        changeset_cache: ChangesetCache,
938    ) -> Self {
939        Self {
940            tx,
941            chain_spec,
942            static_file_provider,
943            prune_modes,
944            storage,
945            storage_settings,
946            rocksdb_provider,
947            changeset_cache,
948            pending_rocksdb_batches: Default::default(),
949            commit_order: CommitOrder::Normal,
950            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
951            metrics: metrics::DatabaseProviderMetrics::default(),
952        }
953    }
954
955    /// Consume `DbTx` or `DbTxMut`.
956    pub fn into_tx(self) -> TX {
957        self.tx
958    }
959
960    /// Pass `DbTx` or `DbTxMut` mutable reference.
961    pub const fn tx_mut(&mut self) -> &mut TX {
962        &mut self.tx
963    }
964
965    /// Pass `DbTx` or `DbTxMut` immutable reference.
966    pub const fn tx_ref(&self) -> &TX {
967        &self.tx
968    }
969
970    /// Returns a reference to the chain specification.
971    pub fn chain_spec(&self) -> &N::ChainSpec {
972        &self.chain_spec
973    }
974}
975
976impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
977    fn recovered_block<H, HF, B, BF>(
978        &self,
979        id: BlockHashOrNumber,
980        _transaction_kind: TransactionVariant,
981        header_by_number: HF,
982        construct_block: BF,
983    ) -> ProviderResult<Option<B>>
984    where
985        H: AsRef<HeaderTy<N>>,
986        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
987        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
988    {
989        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
990        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
991
992        // Get the block body
993        //
994        // If the body indices are not found, this means that the transactions either do not exist
995        // in the database yet, or they do exit but are not indexed. If they exist but are not
996        // indexed, we don't have enough information to return the block anyways, so we return
997        // `None`.
998        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
999
1000        let tx_range = body.tx_num_range();
1001
1002        let (transactions, senders) = if tx_range.is_empty() {
1003            (vec![], vec![])
1004        } else {
1005            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
1006        };
1007
1008        let body = self
1009            .storage
1010            .reader()
1011            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1012            .pop()
1013            .ok_or(ProviderError::InvalidStorageOutput)?;
1014
1015        construct_block(header, body, senders)
1016    }
1017
1018    /// Returns a range of blocks from the database.
1019    ///
1020    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1021    /// construct blocks from the following inputs:
1022    ///     – Header
1023    ///     - Range of transaction numbers
1024    ///     – Ommers
1025    ///     – Withdrawals
1026    ///     – Senders
1027    fn block_range<F, H, HF, R>(
1028        &self,
1029        range: RangeInclusive<BlockNumber>,
1030        headers_range: HF,
1031        mut assemble_block: F,
1032    ) -> ProviderResult<Vec<R>>
1033    where
1034        H: AsRef<HeaderTy<N>>,
1035        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1036        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1037    {
1038        if range.is_empty() {
1039            return Ok(Vec::new())
1040        }
1041
1042        let len = range.end().saturating_sub(*range.start()) as usize;
1043        let mut blocks = Vec::with_capacity(len);
1044
1045        let headers = headers_range(range.clone())?;
1046
1047        // If the body indices are not found, this means that the transactions either do
1048        // not exist in the database yet, or they do exit but are
1049        // not indexed. If they exist but are not indexed, we don't
1050        // have enough information to return the block anyways, so
1051        // we skip the block.
1052        let present_headers = self
1053            .block_body_indices_range(range)?
1054            .into_iter()
1055            .map(|b| b.tx_num_range())
1056            .zip(headers)
1057            .collect::<Vec<_>>();
1058
1059        let mut inputs = Vec::with_capacity(present_headers.len());
1060        for (tx_range, header) in &present_headers {
1061            let transactions = if tx_range.is_empty() {
1062                Vec::new()
1063            } else {
1064                self.transactions_by_tx_range(tx_range.clone())?
1065            };
1066
1067            inputs.push((header.as_ref(), transactions));
1068        }
1069
1070        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1071
1072        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1073            blocks.push(assemble_block(header, body, tx_range)?);
1074        }
1075
1076        Ok(blocks)
1077    }
1078
1079    /// Returns a range of blocks from the database, along with the senders of each
1080    /// transaction in the blocks.
1081    ///
1082    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1083    /// construct blocks from the following inputs:
1084    ///     – Header
1085    ///     - Transactions
1086    ///     – Ommers
1087    ///     – Withdrawals
1088    ///     – Senders
1089    fn block_with_senders_range<H, HF, B, BF>(
1090        &self,
1091        range: RangeInclusive<BlockNumber>,
1092        headers_range: HF,
1093        assemble_block: BF,
1094    ) -> ProviderResult<Vec<B>>
1095    where
1096        H: AsRef<HeaderTy<N>>,
1097        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1098        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1099    {
1100        self.block_range(range, headers_range, |header, body, tx_range| {
1101            let senders = if tx_range.is_empty() {
1102                Vec::new()
1103            } else {
1104                let known_senders: HashMap<TxNumber, Address> =
1105                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1106
1107                let mut senders = Vec::with_capacity(body.transactions().len());
1108                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1109                    match known_senders.get(&tx_num) {
1110                        None => {
1111                            // recover the sender from the transaction if not found
1112                            let sender = tx.recover_signer_unchecked()?;
1113                            senders.push(sender);
1114                        }
1115                        Some(sender) => senders.push(*sender),
1116                    }
1117                }
1118
1119                senders
1120            };
1121
1122            assemble_block(header, body, senders)
1123        })
1124    }
1125
1126    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1127    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
1128    /// account changesets.
1129    fn populate_bundle_state<A, S>(
1130        &self,
1131        account_changeset: Vec<(u64, AccountBeforeTx)>,
1132        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1133        plain_accounts_cursor: &mut A,
1134        plain_storage_cursor: &mut S,
1135    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
1136    where
1137        A: DbCursorRO<PlainAccountState>,
1138        S: DbDupCursorRO<PlainStorageState>,
1139    {
1140        // iterate previous value and get plain state value to create changeset
1141        // Double option around Account represent if Account state is know (first option) and
1142        // account is removed (Second Option)
1143        let mut state: BundleStateInit = HashMap::default();
1144
1145        // This is not working for blocks that are not at tip. as plain state is not the last
1146        // state of end range. We should rename the functions or add support to access
1147        // History state. Accessing history state can be tricky but we are not gaining
1148        // anything.
1149
1150        let mut reverts: RevertsInit = HashMap::default();
1151
1152        // add account changeset changes
1153        for (block_number, account_before) in account_changeset.into_iter().rev() {
1154            let AccountBeforeTx { info: old_info, address } = account_before;
1155            match state.entry(address) {
1156                hash_map::Entry::Vacant(entry) => {
1157                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1158                    entry.insert((old_info, new_info, HashMap::default()));
1159                }
1160                hash_map::Entry::Occupied(mut entry) => {
1161                    // overwrite old account state.
1162                    entry.get_mut().0 = old_info;
1163                }
1164            }
1165            // insert old info into reverts.
1166            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1167        }
1168
1169        // add storage changeset changes
1170        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1171            let BlockNumberAddress((block_number, address)) = block_and_address;
1172            // get account state or insert from plain state.
1173            let account_state = match state.entry(address) {
1174                hash_map::Entry::Vacant(entry) => {
1175                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
1176                    entry.insert((present_info, present_info, HashMap::default()))
1177                }
1178                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1179            };
1180
1181            // match storage.
1182            match account_state.2.entry(old_storage.key) {
1183                hash_map::Entry::Vacant(entry) => {
1184                    let new_storage = plain_storage_cursor
1185                        .seek_by_key_subkey(address, old_storage.key)?
1186                        .filter(|storage| storage.key == old_storage.key)
1187                        .unwrap_or_default();
1188                    entry.insert((old_storage.value, new_storage.value));
1189                }
1190                hash_map::Entry::Occupied(mut entry) => {
1191                    entry.get_mut().0 = old_storage.value;
1192                }
1193            };
1194
1195            reverts
1196                .entry(block_number)
1197                .or_default()
1198                .entry(address)
1199                .or_default()
1200                .1
1201                .push(old_storage);
1202        }
1203
1204        Ok((state, reverts))
1205    }
1206}
1207
1208impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1209    /// Insert history index to the database.
1210    ///
1211    /// For each updated partial key, this function retrieves the last shard from the database
1212    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1213    /// the shards back into the database.
1214    ///
1215    /// This function is used by history indexing stages.
1216    fn append_history_index<P, T>(
1217        &self,
1218        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1219        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1220    ) -> ProviderResult<()>
1221    where
1222        P: Copy,
1223        T: Table<Value = BlockNumberList>,
1224    {
1225        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1226        // will append duplicate entries instead of updating existing ones, causing data corruption.
1227        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1228
1229        let mut cursor = self.tx.cursor_write::<T>()?;
1230
1231        for (partial_key, indices) in index_updates {
1232            let last_key = sharded_key_factory(partial_key, u64::MAX);
1233            let mut last_shard = cursor
1234                .seek_exact(last_key.clone())?
1235                .map(|(_, list)| list)
1236                .unwrap_or_else(BlockNumberList::empty);
1237
1238            last_shard.append(indices).map_err(ProviderError::other)?;
1239
1240            // fast path: all indices fit in one shard
1241            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1242                cursor.upsert(last_key, &last_shard)?;
1243                continue;
1244            }
1245
1246            // slow path: rechunk into multiple shards
1247            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1248            let mut chunks_peekable = chunks.into_iter().peekable();
1249
1250            while let Some(chunk) = chunks_peekable.next() {
1251                let shard = BlockNumberList::new_pre_sorted(chunk);
1252                let highest_block_number = if chunks_peekable.peek().is_some() {
1253                    shard.iter().next_back().expect("`chunks` does not return empty list")
1254                } else {
1255                    // Insert last list with `u64::MAX`.
1256                    u64::MAX
1257                };
1258
1259                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1260            }
1261        }
1262
1263        Ok(())
1264    }
1265}
1266
1267impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1268    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1269        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1270    }
1271}
1272
1273impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1274    fn changed_accounts_with_range(
1275        &self,
1276        range: RangeInclusive<BlockNumber>,
1277    ) -> ProviderResult<BTreeSet<Address>> {
1278        let mut reader = EitherReader::new_account_changesets(self)?;
1279
1280        reader.changed_accounts_with_range(range)
1281    }
1282
1283    fn basic_accounts(
1284        &self,
1285        iter: impl IntoIterator<Item = Address>,
1286    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1287        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1288        Ok(iter
1289            .into_iter()
1290            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
1291            .collect::<Result<Vec<_>, _>>()?)
1292    }
1293
1294    fn changed_accounts_and_blocks_with_range(
1295        &self,
1296        range: RangeInclusive<BlockNumber>,
1297    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1298        let highest_static_block = self
1299            .static_file_provider
1300            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1301
1302        if let Some(highest) = highest_static_block &&
1303            self.cached_storage_settings().account_changesets_in_static_files
1304        {
1305            let start = *range.start();
1306            let static_end = (*range.end()).min(highest + 1);
1307
1308            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1309            if start <= static_end {
1310                for block in start..=static_end {
1311                    let block_changesets = self.account_block_changeset(block)?;
1312                    for changeset in block_changesets {
1313                        changed_accounts_and_blocks
1314                            .entry(changeset.address)
1315                            .or_default()
1316                            .push(block);
1317                    }
1318                }
1319            }
1320
1321            Ok(changed_accounts_and_blocks)
1322        } else {
1323            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1324
1325            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1326                BTreeMap::new(),
1327                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1328                    let (index, account) = entry?;
1329                    accounts.entry(account.address).or_default().push(index);
1330                    Ok(accounts)
1331                },
1332            )?;
1333
1334            Ok(account_transitions)
1335        }
1336    }
1337}
1338
1339impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1340    fn storage_changeset(
1341        &self,
1342        block_number: BlockNumber,
1343    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1344        if self.cached_storage_settings().storage_changesets_in_static_files {
1345            self.static_file_provider.storage_changeset(block_number)
1346        } else {
1347            let range = block_number..=block_number;
1348            let storage_range = BlockNumberAddress::range(range);
1349            self.tx
1350                .cursor_dup_read::<tables::StorageChangeSets>()?
1351                .walk_range(storage_range)?
1352                .map(|result| -> ProviderResult<_> { Ok(result?) })
1353                .collect()
1354        }
1355    }
1356
1357    fn get_storage_before_block(
1358        &self,
1359        block_number: BlockNumber,
1360        address: Address,
1361        storage_key: B256,
1362    ) -> ProviderResult<Option<StorageEntry>> {
1363        if self.cached_storage_settings().storage_changesets_in_static_files {
1364            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1365        } else {
1366            self.tx
1367                .cursor_dup_read::<tables::StorageChangeSets>()?
1368                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1369                .filter(|entry| entry.key == storage_key)
1370                .map(Ok)
1371                .transpose()
1372        }
1373    }
1374
1375    fn storage_changesets_range(
1376        &self,
1377        range: RangeInclusive<BlockNumber>,
1378    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1379        if self.cached_storage_settings().storage_changesets_in_static_files {
1380            self.static_file_provider.storage_changesets_range(range)
1381        } else {
1382            self.tx
1383                .cursor_dup_read::<tables::StorageChangeSets>()?
1384                .walk_range(BlockNumberAddress::range(range))?
1385                .map(|result| -> ProviderResult<_> { Ok(result?) })
1386                .collect()
1387        }
1388    }
1389
1390    fn storage_changeset_count(&self) -> ProviderResult<usize> {
1391        if self.cached_storage_settings().storage_changesets_in_static_files {
1392            self.static_file_provider.storage_changeset_count()
1393        } else {
1394            Ok(self.tx.entries::<tables::StorageChangeSets>()?)
1395        }
1396    }
1397}
1398
1399impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1400    fn account_block_changeset(
1401        &self,
1402        block_number: BlockNumber,
1403    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1404        if self.cached_storage_settings().account_changesets_in_static_files {
1405            let static_changesets =
1406                self.static_file_provider.account_block_changeset(block_number)?;
1407            Ok(static_changesets)
1408        } else {
1409            let range = block_number..=block_number;
1410            self.tx
1411                .cursor_read::<tables::AccountChangeSets>()?
1412                .walk_range(range)?
1413                .map(|result| -> ProviderResult<_> {
1414                    let (_, account_before) = result?;
1415                    Ok(account_before)
1416                })
1417                .collect()
1418        }
1419    }
1420
1421    fn get_account_before_block(
1422        &self,
1423        block_number: BlockNumber,
1424        address: Address,
1425    ) -> ProviderResult<Option<AccountBeforeTx>> {
1426        if self.cached_storage_settings().account_changesets_in_static_files {
1427            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1428        } else {
1429            self.tx
1430                .cursor_dup_read::<tables::AccountChangeSets>()?
1431                .seek_by_key_subkey(block_number, address)?
1432                .filter(|acc| acc.address == address)
1433                .map(Ok)
1434                .transpose()
1435        }
1436    }
1437
1438    fn account_changesets_range(
1439        &self,
1440        range: impl core::ops::RangeBounds<BlockNumber>,
1441    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1442        let range = to_range(range);
1443        let mut changesets = Vec::new();
1444        if self.cached_storage_settings().account_changesets_in_static_files &&
1445            let Some(highest) = self
1446                .static_file_provider
1447                .get_highest_static_file_block(StaticFileSegment::AccountChangeSets)
1448        {
1449            let static_end = range.end.min(highest + 1);
1450            if range.start < static_end {
1451                for block in range.start..static_end {
1452                    let block_changesets = self.account_block_changeset(block)?;
1453                    for changeset in block_changesets {
1454                        changesets.push((block, changeset));
1455                    }
1456                }
1457            }
1458        } else {
1459            // Fetch from database for blocks not in static files
1460            let mut cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1461            for entry in cursor.walk_range(range)? {
1462                let (block_num, account_before) = entry?;
1463                changesets.push((block_num, account_before));
1464            }
1465        }
1466
1467        Ok(changesets)
1468    }
1469
1470    fn account_changeset_count(&self) -> ProviderResult<usize> {
1471        // check if account changesets are in static files, otherwise just count the changeset
1472        // entries in the DB
1473        if self.cached_storage_settings().account_changesets_in_static_files {
1474            self.static_file_provider.account_changeset_count()
1475        } else {
1476            Ok(self.tx.entries::<tables::AccountChangeSets>()?)
1477        }
1478    }
1479}
1480
1481impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1482    for DatabaseProvider<TX, N>
1483{
1484    type Header = HeaderTy<N>;
1485
1486    fn local_tip_header(
1487        &self,
1488        highest_uninterrupted_block: BlockNumber,
1489    ) -> ProviderResult<SealedHeader<Self::Header>> {
1490        let static_file_provider = self.static_file_provider();
1491
1492        // Make sure Headers static file is at the same height. If it's further, this
1493        // input execution was interrupted previously and we need to unwind the static file.
1494        let next_static_file_block_num = static_file_provider
1495            .get_highest_static_file_block(StaticFileSegment::Headers)
1496            .map(|id| id + 1)
1497            .unwrap_or_default();
1498        let next_block = highest_uninterrupted_block + 1;
1499
1500        match next_static_file_block_num.cmp(&next_block) {
1501            // The node shutdown between an executed static file commit and before the database
1502            // commit, so we need to unwind the static files.
1503            Ordering::Greater => {
1504                let mut static_file_producer =
1505                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1506                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1507                // Since this is a database <-> static file inconsistency, we commit the change
1508                // straight away.
1509                static_file_producer.commit()?
1510            }
1511            Ordering::Less => {
1512                // There's either missing or corrupted files.
1513                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1514            }
1515            Ordering::Equal => {}
1516        }
1517
1518        let local_head = static_file_provider
1519            .sealed_header(highest_uninterrupted_block)?
1520            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1521
1522        Ok(local_head)
1523    }
1524}
1525
1526impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1527    type Header = HeaderTy<N>;
1528
1529    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1530        if let Some(num) = self.block_number(block_hash)? {
1531            Ok(self.header_by_number(num)?)
1532        } else {
1533            Ok(None)
1534        }
1535    }
1536
1537    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1538        self.static_file_provider.header_by_number(num)
1539    }
1540
1541    fn headers_range(
1542        &self,
1543        range: impl RangeBounds<BlockNumber>,
1544    ) -> ProviderResult<Vec<Self::Header>> {
1545        self.static_file_provider.headers_range(range)
1546    }
1547
1548    fn sealed_header(
1549        &self,
1550        number: BlockNumber,
1551    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1552        self.static_file_provider.sealed_header(number)
1553    }
1554
1555    fn sealed_headers_while(
1556        &self,
1557        range: impl RangeBounds<BlockNumber>,
1558        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1559    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1560        self.static_file_provider.sealed_headers_while(range, predicate)
1561    }
1562}
1563
1564impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1565    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1566        self.static_file_provider.block_hash(number)
1567    }
1568
1569    fn canonical_hashes_range(
1570        &self,
1571        start: BlockNumber,
1572        end: BlockNumber,
1573    ) -> ProviderResult<Vec<B256>> {
1574        self.static_file_provider.canonical_hashes_range(start, end)
1575    }
1576}
1577
1578impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1579    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1580        let best_number = self.best_block_number()?;
1581        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1582        Ok(ChainInfo { best_hash, best_number })
1583    }
1584
1585    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1586        // The best block number is tracked via the finished stage which gets updated in the same tx
1587        // when new blocks committed
1588        Ok(self
1589            .get_stage_checkpoint(StageId::Finish)?
1590            .map(|checkpoint| checkpoint.block_number)
1591            .unwrap_or_default())
1592    }
1593
1594    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1595        self.static_file_provider.last_block_number()
1596    }
1597
1598    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1599        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1600    }
1601}
1602
1603impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1604    type Block = BlockTy<N>;
1605
1606    fn find_block_by_hash(
1607        &self,
1608        hash: B256,
1609        source: BlockSource,
1610    ) -> ProviderResult<Option<Self::Block>> {
1611        if source.is_canonical() {
1612            self.block(hash.into())
1613        } else {
1614            Ok(None)
1615        }
1616    }
1617
1618    /// Returns the block with matching number from database.
1619    ///
1620    /// If the header for this block is not found, this returns `None`.
1621    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1622    /// will return None.
1623    ///
1624    /// Returns an error if the requested block is below the earliest available history.
1625    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1626        if let Some(number) = self.convert_hash_or_number(id)? {
1627            let earliest_available = self.static_file_provider.earliest_history_height();
1628            if number < earliest_available {
1629                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1630            }
1631
1632            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1633
1634            // If the body indices are not found, this means that the transactions either do not
1635            // exist in the database yet, or they do exit but are not indexed.
1636            // If they exist but are not indexed, we don't have enough
1637            // information to return the block anyways, so we return `None`.
1638            let Some(transactions) = self.transactions_by_block(number.into())? else {
1639                return Ok(None)
1640            };
1641
1642            let body = self
1643                .storage
1644                .reader()
1645                .read_block_bodies(self, vec![(&header, transactions)])?
1646                .pop()
1647                .ok_or(ProviderError::InvalidStorageOutput)?;
1648
1649            return Ok(Some(Self::Block::new(header, body)))
1650        }
1651
1652        Ok(None)
1653    }
1654
1655    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1656        Ok(None)
1657    }
1658
1659    fn pending_block_and_receipts(
1660        &self,
1661    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1662        Ok(None)
1663    }
1664
1665    /// Returns the block with senders with matching number or hash from database.
1666    ///
1667    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1668    /// spot, and we want fast querying.**
1669    ///
1670    /// If the header for this block is not found, this returns `None`.
1671    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1672    /// will return None.
1673    fn recovered_block(
1674        &self,
1675        id: BlockHashOrNumber,
1676        transaction_kind: TransactionVariant,
1677    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1678        self.recovered_block(
1679            id,
1680            transaction_kind,
1681            |block_number| self.header_by_number(block_number),
1682            |header, body, senders| {
1683                Self::Block::new(header, body)
1684                    // Note: we're using unchecked here because we know the block contains valid txs
1685                    // wrt to its height and can ignore the s value check so pre
1686                    // EIP-2 txs are allowed
1687                    .try_into_recovered_unchecked(senders)
1688                    .map(Some)
1689                    .map_err(|_| ProviderError::SenderRecoveryError)
1690            },
1691        )
1692    }
1693
1694    fn sealed_block_with_senders(
1695        &self,
1696        id: BlockHashOrNumber,
1697        transaction_kind: TransactionVariant,
1698    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1699        self.recovered_block(
1700            id,
1701            transaction_kind,
1702            |block_number| self.sealed_header(block_number),
1703            |header, body, senders| {
1704                Self::Block::new_sealed(header, body)
1705                    // Note: we're using unchecked here because we know the block contains valid txs
1706                    // wrt to its height and can ignore the s value check so pre
1707                    // EIP-2 txs are allowed
1708                    .try_with_senders_unchecked(senders)
1709                    .map(Some)
1710                    .map_err(|_| ProviderError::SenderRecoveryError)
1711            },
1712        )
1713    }
1714
1715    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1716        self.block_range(
1717            range,
1718            |range| self.headers_range(range),
1719            |header, body, _| Ok(Self::Block::new(header, body)),
1720        )
1721    }
1722
1723    fn block_with_senders_range(
1724        &self,
1725        range: RangeInclusive<BlockNumber>,
1726    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1727        self.block_with_senders_range(
1728            range,
1729            |range| self.headers_range(range),
1730            |header, body, senders| {
1731                Self::Block::new(header, body)
1732                    .try_into_recovered_unchecked(senders)
1733                    .map_err(|_| ProviderError::SenderRecoveryError)
1734            },
1735        )
1736    }
1737
1738    fn recovered_block_range(
1739        &self,
1740        range: RangeInclusive<BlockNumber>,
1741    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1742        self.block_with_senders_range(
1743            range,
1744            |range| self.sealed_headers_range(range),
1745            |header, body, senders| {
1746                Self::Block::new_sealed(header, body)
1747                    .try_with_senders(senders)
1748                    .map_err(|_| ProviderError::SenderRecoveryError)
1749            },
1750        )
1751    }
1752
1753    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1754        Ok(self
1755            .tx
1756            .cursor_read::<tables::TransactionBlocks>()?
1757            .seek(id)
1758            .map(|b| b.map(|(_, bn)| bn))?)
1759    }
1760}
1761
1762impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1763    for DatabaseProvider<TX, N>
1764{
1765    /// Recovers transaction hashes by walking through `Transactions` table and
1766    /// calculating them in a parallel manner. Returned unsorted.
1767    fn transaction_hashes_by_range(
1768        &self,
1769        tx_range: Range<TxNumber>,
1770    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1771        self.static_file_provider.transaction_hashes_by_range(tx_range)
1772    }
1773}
1774
1775// Calculates the hash of the given transaction
1776impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1777    type Transaction = TxTy<N>;
1778
1779    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1780        self.with_rocksdb_tx(|tx_ref| {
1781            let mut reader = EitherReader::new_transaction_hash_numbers(self, tx_ref)?;
1782            reader.get_transaction_hash_number(tx_hash)
1783        })
1784    }
1785
1786    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1787        self.static_file_provider.transaction_by_id(id)
1788    }
1789
1790    fn transaction_by_id_unhashed(
1791        &self,
1792        id: TxNumber,
1793    ) -> ProviderResult<Option<Self::Transaction>> {
1794        self.static_file_provider.transaction_by_id_unhashed(id)
1795    }
1796
1797    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1798        if let Some(id) = self.transaction_id(hash)? {
1799            Ok(self.transaction_by_id_unhashed(id)?)
1800        } else {
1801            Ok(None)
1802        }
1803    }
1804
1805    fn transaction_by_hash_with_meta(
1806        &self,
1807        tx_hash: TxHash,
1808    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1809        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1810            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1811            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1812            let Some(sealed_header) = self.sealed_header(block_number)?
1813        {
1814            let (header, block_hash) = sealed_header.split();
1815            if let Some(block_body) = self.block_body_indices(block_number)? {
1816                // the index of the tx in the block is the offset:
1817                // len([start..tx_id])
1818                // NOTE: `transaction_id` is always `>=` the block's first
1819                // index
1820                let index = transaction_id - block_body.first_tx_num();
1821
1822                let meta = TransactionMeta {
1823                    tx_hash,
1824                    index,
1825                    block_hash,
1826                    block_number,
1827                    base_fee: header.base_fee_per_gas(),
1828                    excess_blob_gas: header.excess_blob_gas(),
1829                    timestamp: header.timestamp(),
1830                };
1831
1832                return Ok(Some((transaction, meta)))
1833            }
1834        }
1835
1836        Ok(None)
1837    }
1838
1839    fn transactions_by_block(
1840        &self,
1841        id: BlockHashOrNumber,
1842    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1843        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1844            let Some(body) = self.block_body_indices(block_number)?
1845        {
1846            let tx_range = body.tx_num_range();
1847            return if tx_range.is_empty() {
1848                Ok(Some(Vec::new()))
1849            } else {
1850                self.transactions_by_tx_range(tx_range).map(Some)
1851            }
1852        }
1853        Ok(None)
1854    }
1855
1856    fn transactions_by_block_range(
1857        &self,
1858        range: impl RangeBounds<BlockNumber>,
1859    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1860        let range = to_range(range);
1861
1862        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1863            .into_iter()
1864            .map(|body| {
1865                let tx_num_range = body.tx_num_range();
1866                if tx_num_range.is_empty() {
1867                    Ok(Vec::new())
1868                } else {
1869                    self.transactions_by_tx_range(tx_num_range)
1870                }
1871            })
1872            .collect()
1873    }
1874
1875    fn transactions_by_tx_range(
1876        &self,
1877        range: impl RangeBounds<TxNumber>,
1878    ) -> ProviderResult<Vec<Self::Transaction>> {
1879        self.static_file_provider.transactions_by_tx_range(range)
1880    }
1881
1882    fn senders_by_tx_range(
1883        &self,
1884        range: impl RangeBounds<TxNumber>,
1885    ) -> ProviderResult<Vec<Address>> {
1886        if EitherWriterDestination::senders(self).is_static_file() {
1887            self.static_file_provider.senders_by_tx_range(range)
1888        } else {
1889            self.cursor_read_collect::<tables::TransactionSenders>(range)
1890        }
1891    }
1892
1893    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1894        if EitherWriterDestination::senders(self).is_static_file() {
1895            self.static_file_provider.transaction_sender(id)
1896        } else {
1897            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1898        }
1899    }
1900}
1901
1902impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1903    type Receipt = ReceiptTy<N>;
1904
1905    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1906        self.static_file_provider.get_with_static_file_or_database(
1907            StaticFileSegment::Receipts,
1908            id,
1909            |static_file| static_file.receipt(id),
1910            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1911        )
1912    }
1913
1914    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1915        if let Some(id) = self.transaction_id(hash)? {
1916            self.receipt(id)
1917        } else {
1918            Ok(None)
1919        }
1920    }
1921
1922    fn receipts_by_block(
1923        &self,
1924        block: BlockHashOrNumber,
1925    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1926        if let Some(number) = self.convert_hash_or_number(block)? &&
1927            let Some(body) = self.block_body_indices(number)?
1928        {
1929            let tx_range = body.tx_num_range();
1930            return if tx_range.is_empty() {
1931                Ok(Some(Vec::new()))
1932            } else {
1933                self.receipts_by_tx_range(tx_range).map(Some)
1934            }
1935        }
1936        Ok(None)
1937    }
1938
1939    fn receipts_by_tx_range(
1940        &self,
1941        range: impl RangeBounds<TxNumber>,
1942    ) -> ProviderResult<Vec<Self::Receipt>> {
1943        self.static_file_provider.get_range_with_static_file_or_database(
1944            StaticFileSegment::Receipts,
1945            to_range(range),
1946            |static_file, range, _| static_file.receipts_by_tx_range(range),
1947            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1948            |_| true,
1949        )
1950    }
1951
1952    fn receipts_by_block_range(
1953        &self,
1954        block_range: RangeInclusive<BlockNumber>,
1955    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1956        if block_range.is_empty() {
1957            return Ok(Vec::new());
1958        }
1959
1960        // collect block body indices for each block in the range
1961        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
1962        let mut block_body_indices = Vec::with_capacity(range_len);
1963        for block_num in block_range {
1964            if let Some(indices) = self.block_body_indices(block_num)? {
1965                block_body_indices.push(indices);
1966            } else {
1967                // use default indices for missing blocks (empty block)
1968                block_body_indices.push(StoredBlockBodyIndices::default());
1969            }
1970        }
1971
1972        if block_body_indices.is_empty() {
1973            return Ok(Vec::new());
1974        }
1975
1976        // find blocks with transactions to determine transaction range
1977        let non_empty_blocks: Vec<_> =
1978            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1979
1980        if non_empty_blocks.is_empty() {
1981            // all blocks are empty
1982            return Ok(vec![Vec::new(); block_body_indices.len()]);
1983        }
1984
1985        // calculate the overall transaction range
1986        let first_tx = non_empty_blocks[0].first_tx_num();
1987        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1988
1989        // fetch all receipts in the transaction range
1990        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1991        let mut receipts_iter = all_receipts.into_iter();
1992
1993        // distribute receipts to their respective blocks
1994        let mut result = Vec::with_capacity(block_body_indices.len());
1995        for indices in &block_body_indices {
1996            if indices.tx_count == 0 {
1997                result.push(Vec::new());
1998            } else {
1999                let block_receipts =
2000                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2001                result.push(block_receipts);
2002            }
2003        }
2004
2005        Ok(result)
2006    }
2007}
2008
2009impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2010    for DatabaseProvider<TX, N>
2011{
2012    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2013        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2014    }
2015
2016    fn block_body_indices_range(
2017        &self,
2018        range: RangeInclusive<BlockNumber>,
2019    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2020        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2021    }
2022}
2023
2024impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2025    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2026        Ok(if let Some(encoded) = id.get_pre_encoded() {
2027            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2028        } else {
2029            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2030        })
2031    }
2032
2033    /// Get stage checkpoint progress.
2034    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2035        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2036    }
2037
2038    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2039        self.tx
2040            .cursor_read::<tables::StageCheckpoints>()?
2041            .walk(None)?
2042            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2043            .map_err(ProviderError::Database)
2044    }
2045}
2046
2047impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2048    /// Save stage checkpoint.
2049    fn save_stage_checkpoint(
2050        &self,
2051        id: StageId,
2052        checkpoint: StageCheckpoint,
2053    ) -> ProviderResult<()> {
2054        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2055    }
2056
2057    /// Save stage checkpoint progress.
2058    fn save_stage_checkpoint_progress(
2059        &self,
2060        id: StageId,
2061        checkpoint: Vec<u8>,
2062    ) -> ProviderResult<()> {
2063        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2064    }
2065
2066    #[instrument(level = "debug", target = "providers::db", skip_all)]
2067    fn update_pipeline_stages(
2068        &self,
2069        block_number: BlockNumber,
2070        drop_stage_checkpoint: bool,
2071    ) -> ProviderResult<()> {
2072        // iterate over all existing stages in the table and update its progress.
2073        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2074        for stage_id in StageId::ALL {
2075            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2076            cursor.upsert(
2077                stage_id.to_string(),
2078                &StageCheckpoint {
2079                    block_number,
2080                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2081                },
2082            )?;
2083        }
2084
2085        Ok(())
2086    }
2087}
2088
2089impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2090    fn plain_state_storages(
2091        &self,
2092        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2093    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2094        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2095
2096        addresses_with_keys
2097            .into_iter()
2098            .map(|(address, storage)| {
2099                storage
2100                    .into_iter()
2101                    .map(|key| -> ProviderResult<_> {
2102                        Ok(plain_storage
2103                            .seek_by_key_subkey(address, key)?
2104                            .filter(|v| v.key == key)
2105                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2106                    })
2107                    .collect::<ProviderResult<Vec<_>>>()
2108                    .map(|storage| (address, storage))
2109            })
2110            .collect::<ProviderResult<Vec<(_, _)>>>()
2111    }
2112
2113    fn changed_storages_with_range(
2114        &self,
2115        range: RangeInclusive<BlockNumber>,
2116    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2117        if self.cached_storage_settings().storage_changesets_in_static_files {
2118            self.storage_changesets_range(range)?.into_iter().try_fold(
2119                BTreeMap::new(),
2120                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2121                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2122                    accounts.entry(address).or_default().insert(storage_entry.key);
2123                    Ok(accounts)
2124                },
2125            )
2126        } else {
2127            self.tx
2128                .cursor_read::<tables::StorageChangeSets>()?
2129                .walk_range(BlockNumberAddress::range(range))?
2130                // fold all storages and save its old state so we can remove it from HashedStorage
2131                // it is needed as it is dup table.
2132                .try_fold(
2133                    BTreeMap::new(),
2134                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2135                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2136                        accounts.entry(address).or_default().insert(storage_entry.key);
2137                        Ok(accounts)
2138                    },
2139                )
2140        }
2141    }
2142
2143    fn changed_storages_and_blocks_with_range(
2144        &self,
2145        range: RangeInclusive<BlockNumber>,
2146    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2147        if self.cached_storage_settings().storage_changesets_in_static_files {
2148            self.storage_changesets_range(range)?.into_iter().try_fold(
2149                BTreeMap::new(),
2150                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2151                    storages
2152                        .entry((index.address(), storage.key))
2153                        .or_default()
2154                        .push(index.block_number());
2155                    Ok(storages)
2156                },
2157            )
2158        } else {
2159            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2160
2161            let storage_changeset_lists =
2162                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2163                    BTreeMap::new(),
2164                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2165                     entry|
2166                     -> ProviderResult<_> {
2167                        let (index, storage) = entry?;
2168                        storages
2169                            .entry((index.address(), storage.key))
2170                            .or_default()
2171                            .push(index.block_number());
2172                        Ok(storages)
2173                    },
2174                )?;
2175
2176            Ok(storage_changeset_lists)
2177        }
2178    }
2179}
2180
2181impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2182    for DatabaseProvider<TX, N>
2183{
2184    type Receipt = ReceiptTy<N>;
2185
2186    #[instrument(level = "debug", target = "providers::db", skip_all)]
2187    fn write_state<'a>(
2188        &self,
2189        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2190        is_value_known: OriginalValuesKnown,
2191        config: StateWriteConfig,
2192    ) -> ProviderResult<()> {
2193        let execution_outcome = execution_outcome.into();
2194        let first_block = execution_outcome.first_block();
2195
2196        let (plain_state, reverts) =
2197            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2198
2199        self.write_state_reverts(reverts, first_block, config)?;
2200        self.write_state_changes(plain_state)?;
2201
2202        if !config.write_receipts {
2203            return Ok(());
2204        }
2205
2206        let block_count = execution_outcome.len() as u64;
2207        let last_block = execution_outcome.last_block();
2208        let block_range = first_block..=last_block;
2209
2210        let tip = self.last_block_number()?.max(last_block);
2211
2212        // Fetch the first transaction number for each block in the range
2213        let block_indices: Vec<_> = self
2214            .block_body_indices_range(block_range)?
2215            .into_iter()
2216            .map(|b| b.first_tx_num)
2217            .collect();
2218
2219        // Ensure all expected blocks are present.
2220        if block_indices.len() < block_count as usize {
2221            let missing_blocks = block_count - block_indices.len() as u64;
2222            return Err(ProviderError::BlockBodyIndicesNotFound(
2223                last_block.saturating_sub(missing_blocks - 1),
2224            ));
2225        }
2226
2227        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2228
2229        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2230        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2231
2232        // All receipts from the last 128 blocks are required for blockchain tree, even with
2233        // [`PruneSegment::ContractLogs`].
2234        //
2235        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2236        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2237        // files yet. Once receipts exist in static files, we must continue writing to maintain
2238        // continuity and have no gaps.
2239        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2240            self.static_file_provider()
2241                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2242                .is_none()) &&
2243            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2244
2245        // Prepare set of addresses which logs should not be pruned.
2246        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
2247        for (_, addresses) in contract_log_pruner.range(..first_block) {
2248            allowed_addresses.extend(addresses.iter().copied());
2249        }
2250
2251        for (idx, (receipts, first_tx_index)) in
2252            execution_outcome.receipts().zip(block_indices).enumerate()
2253        {
2254            let block_number = first_block + idx as u64;
2255
2256            // Increment block number for receipts static file writer
2257            receipts_writer.increment_block(block_number)?;
2258
2259            // Skip writing receipts if pruning configuration requires us to.
2260            if prunable_receipts &&
2261                self.prune_modes
2262                    .receipts
2263                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2264            {
2265                continue
2266            }
2267
2268            // If there are new addresses to retain after this block number, track them
2269            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2270                allowed_addresses.extend(new_addresses.iter().copied());
2271            }
2272
2273            for (idx, receipt) in receipts.iter().enumerate() {
2274                let receipt_idx = first_tx_index + idx as u64;
2275                // Skip writing receipt if log filter is active and it does not have any logs to
2276                // retain
2277                if prunable_receipts &&
2278                    has_contract_log_filter &&
2279                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2280                {
2281                    continue
2282                }
2283
2284                receipts_writer.append_receipt(receipt_idx, receipt)?;
2285            }
2286        }
2287
2288        Ok(())
2289    }
2290
2291    fn write_state_reverts(
2292        &self,
2293        reverts: PlainStateReverts,
2294        first_block: BlockNumber,
2295        config: StateWriteConfig,
2296    ) -> ProviderResult<()> {
2297        // Write storage changes
2298        tracing::trace!("Writing storage changes");
2299        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2300        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2301            let block_number = first_block + block_index as BlockNumber;
2302
2303            tracing::trace!(block_number, "Writing block change");
2304            // sort changes by address.
2305            storage_changes.par_sort_unstable_by_key(|a| a.address);
2306            let total_changes =
2307                storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2308            let mut changeset = Vec::with_capacity(total_changes);
2309            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2310                let mut storage = storage_revert
2311                    .into_iter()
2312                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
2313                    .collect::<Vec<_>>();
2314                // sort storage slots by key.
2315                storage.par_sort_unstable_by_key(|a| a.0);
2316
2317                // If we are writing the primary storage wipe transition, the pre-existing plain
2318                // storage state has to be taken from the database and written to storage history.
2319                // See [StorageWipe::Primary] for more details.
2320                //
2321                // TODO(mediocregopher): This could be rewritten in a way which doesn't require
2322                // collecting wiped entries into a Vec like this, see
2323                // `write_storage_trie_changesets`.
2324                let mut wiped_storage = Vec::new();
2325                if wiped {
2326                    tracing::trace!(?address, "Wiping storage");
2327                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2328                        wiped_storage.push((entry.key, entry.value));
2329                        while let Some(entry) = storages_cursor.next_dup_val()? {
2330                            wiped_storage.push((entry.key, entry.value))
2331                        }
2332                    }
2333                }
2334
2335                tracing::trace!(?address, ?storage, "Writing storage reverts");
2336                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2337                    changeset.push(StorageBeforeTx { address, key, value });
2338                }
2339            }
2340
2341            let mut storage_changesets_writer =
2342                EitherWriter::new_storage_changesets(self, block_number)?;
2343            storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2344        }
2345
2346        if !config.write_account_changesets {
2347            return Ok(());
2348        }
2349
2350        // Write account changes
2351        tracing::trace!(?first_block, "Writing account changes");
2352        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2353            let block_number = first_block + block_index as BlockNumber;
2354            let changeset = account_block_reverts
2355                .into_iter()
2356                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2357                .collect::<Vec<_>>();
2358            let mut account_changesets_writer =
2359                EitherWriter::new_account_changesets(self, block_number)?;
2360
2361            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2362        }
2363
2364        Ok(())
2365    }
2366
2367    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2368        // sort all entries so they can be written to database in more performant way.
2369        // and take smaller memory footprint.
2370        changes.accounts.par_sort_by_key(|a| a.0);
2371        changes.storage.par_sort_by_key(|a| a.address);
2372        changes.contracts.par_sort_by_key(|a| a.0);
2373
2374        // Write new account state
2375        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2376        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2377        // write account to database.
2378        for (address, account) in changes.accounts {
2379            if let Some(account) = account {
2380                tracing::trace!(?address, "Updating plain state account");
2381                accounts_cursor.upsert(address, &account.into())?;
2382            } else if accounts_cursor.seek_exact(address)?.is_some() {
2383                tracing::trace!(?address, "Deleting plain state account");
2384                accounts_cursor.delete_current()?;
2385            }
2386        }
2387
2388        // Write bytecode
2389        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2390        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
2391        for (hash, bytecode) in changes.contracts {
2392            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
2393        }
2394
2395        // Write new storage state and wipe storage if needed.
2396        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2397        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2398        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2399            // Wiping of storage.
2400            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2401                storages_cursor.delete_current_duplicates()?;
2402            }
2403            // cast storages to B256.
2404            let mut storage = storage
2405                .into_iter()
2406                .map(|(k, value)| StorageEntry { key: k.into(), value })
2407                .collect::<Vec<_>>();
2408            // sort storage slots by key.
2409            storage.par_sort_unstable_by_key(|a| a.key);
2410
2411            for entry in storage {
2412                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2413                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2414                    db_entry.key == entry.key
2415                {
2416                    storages_cursor.delete_current()?;
2417                }
2418
2419                if !entry.value.is_zero() {
2420                    storages_cursor.upsert(address, &entry)?;
2421                }
2422            }
2423        }
2424
2425        Ok(())
2426    }
2427
2428    #[instrument(level = "debug", target = "providers::db", skip_all)]
2429    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2430        // Write hashed account updates.
2431        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2432        for (hashed_address, account) in hashed_state.accounts() {
2433            if let Some(account) = account {
2434                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2435            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2436                hashed_accounts_cursor.delete_current()?;
2437            }
2438        }
2439
2440        // Write hashed storage changes.
2441        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2442        let mut hashed_storage_cursor =
2443            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2444        for (hashed_address, storage) in sorted_storages {
2445            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2446                hashed_storage_cursor.delete_current_duplicates()?;
2447            }
2448
2449            for (hashed_slot, value) in storage.storage_slots_ref() {
2450                let entry = StorageEntry { key: *hashed_slot, value: *value };
2451
2452                if let Some(db_entry) =
2453                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2454                    db_entry.key == entry.key
2455                {
2456                    hashed_storage_cursor.delete_current()?;
2457                }
2458
2459                if !entry.value.is_zero() {
2460                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2461                }
2462            }
2463        }
2464
2465        Ok(())
2466    }
2467
2468    /// Remove the last N blocks of state.
2469    ///
2470    /// The latest state will be unwound
2471    ///
2472    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2473    ///    transaction ids.
2474    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2475    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2476    ///    the changesets.
2477    ///    - In order to have both the old and new values in the changesets, we also access the
2478    ///      plain state tables.
2479    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2480    ///    we:
2481    ///     1. Take the old value from the changeset
2482    ///     2. Take the new value from the plain state
2483    ///     3. Save the old value to the local state
2484    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2485    ///    have seen before we:
2486    ///     1. Take the old value from the changeset
2487    ///     2. Take the new value from the local state
2488    ///     3. Set the local state to the value in the changeset
2489    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2490        let range = block + 1..=self.last_block_number()?;
2491
2492        if range.is_empty() {
2493            return Ok(());
2494        }
2495
2496        // We are not removing block meta as it is used to get block changesets.
2497        let block_bodies = self.block_body_indices_range(range.clone())?;
2498
2499        // get transaction receipts
2500        let from_transaction_num =
2501            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2502
2503        let storage_range = BlockNumberAddress::range(range.clone());
2504        let storage_changeset = if let Some(_highest_block) = self
2505            .static_file_provider
2506            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2507            self.cached_storage_settings().storage_changesets_in_static_files
2508        {
2509            let changesets = self.storage_changesets_range(range.clone())?;
2510            let mut changeset_writer =
2511                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2512            changeset_writer.prune_storage_changesets(block)?;
2513            changesets
2514        } else {
2515            self.take::<tables::StorageChangeSets>(storage_range)?
2516        };
2517        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2518
2519        // This is not working for blocks that are not at tip. as plain state is not the last
2520        // state of end range. We should rename the functions or add support to access
2521        // History state. Accessing history state can be tricky but we are not gaining
2522        // anything.
2523        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2524        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2525
2526        let (state, _) = self.populate_bundle_state(
2527            account_changeset,
2528            storage_changeset,
2529            &mut plain_accounts_cursor,
2530            &mut plain_storage_cursor,
2531        )?;
2532
2533        // iterate over local plain state remove all account and all storages.
2534        for (address, (old_account, new_account, storage)) in &state {
2535            // revert account if needed.
2536            if old_account != new_account {
2537                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2538                if let Some(account) = old_account {
2539                    plain_accounts_cursor.upsert(*address, account)?;
2540                } else if existing_entry.is_some() {
2541                    plain_accounts_cursor.delete_current()?;
2542                }
2543            }
2544
2545            // revert storages
2546            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2547                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2548                // delete previous value
2549                if plain_storage_cursor
2550                    .seek_by_key_subkey(*address, *storage_key)?
2551                    .filter(|s| s.key == *storage_key)
2552                    .is_some()
2553                {
2554                    plain_storage_cursor.delete_current()?
2555                }
2556
2557                // insert value if needed
2558                if !old_storage_value.is_zero() {
2559                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2560                }
2561            }
2562        }
2563
2564        self.remove_receipts_from(from_transaction_num, block)?;
2565
2566        Ok(())
2567    }
2568
2569    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2570    ///
2571    /// The latest state will be unwound and returned back with all the blocks
2572    ///
2573    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2574    ///    transaction ids.
2575    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2576    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2577    ///    the changesets.
2578    ///    - In order to have both the old and new values in the changesets, we also access the
2579    ///      plain state tables.
2580    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2581    ///    we:
2582    ///     1. Take the old value from the changeset
2583    ///     2. Take the new value from the plain state
2584    ///     3. Save the old value to the local state
2585    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2586    ///    have seen before we:
2587    ///     1. Take the old value from the changeset
2588    ///     2. Take the new value from the local state
2589    ///     3. Set the local state to the value in the changeset
2590    fn take_state_above(
2591        &self,
2592        block: BlockNumber,
2593    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2594        let range = block + 1..=self.last_block_number()?;
2595
2596        if range.is_empty() {
2597            return Ok(ExecutionOutcome::default())
2598        }
2599        let start_block_number = *range.start();
2600
2601        // We are not removing block meta as it is used to get block changesets.
2602        let block_bodies = self.block_body_indices_range(range.clone())?;
2603
2604        // get transaction receipts
2605        let from_transaction_num =
2606            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2607        let to_transaction_num =
2608            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2609
2610        let storage_range = BlockNumberAddress::range(range.clone());
2611        let storage_changeset = if let Some(highest_block) = self
2612            .static_file_provider
2613            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2614            self.cached_storage_settings().storage_changesets_in_static_files
2615        {
2616            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2617            let mut changeset_writer =
2618                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2619            changeset_writer.prune_storage_changesets(block)?;
2620            changesets
2621        } else {
2622            self.take::<tables::StorageChangeSets>(storage_range)?
2623        };
2624
2625        // This is not working for blocks that are not at tip. as plain state is not the last
2626        // state of end range. We should rename the functions or add support to access
2627        // History state. Accessing history state can be tricky but we are not gaining
2628        // anything.
2629        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2630        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2631
2632        // if there are static files for this segment, prune them.
2633        let highest_changeset_block = self
2634            .static_file_provider
2635            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2636        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2637            self.cached_storage_settings().account_changesets_in_static_files
2638        {
2639            // TODO: add a `take` method that removes and returns the items instead of doing this
2640            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2641            let mut changeset_writer =
2642                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2643            changeset_writer.prune_account_changesets(block)?;
2644
2645            changesets
2646        } else {
2647            // Have to remove from static files if they exist, otherwise remove using `take` for the
2648            // changeset tables
2649            self.take::<tables::AccountChangeSets>(range)?
2650        };
2651
2652        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2653        // remove, and return later
2654        let (state, reverts) = self.populate_bundle_state(
2655            account_changeset,
2656            storage_changeset,
2657            &mut plain_accounts_cursor,
2658            &mut plain_storage_cursor,
2659        )?;
2660
2661        // iterate over local plain state remove all account and all storages.
2662        for (address, (old_account, new_account, storage)) in &state {
2663            // revert account if needed.
2664            if old_account != new_account {
2665                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2666                if let Some(account) = old_account {
2667                    plain_accounts_cursor.upsert(*address, account)?;
2668                } else if existing_entry.is_some() {
2669                    plain_accounts_cursor.delete_current()?;
2670                }
2671            }
2672
2673            // revert storages
2674            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2675                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2676                // delete previous value
2677                if plain_storage_cursor
2678                    .seek_by_key_subkey(*address, *storage_key)?
2679                    .filter(|s| s.key == *storage_key)
2680                    .is_some()
2681                {
2682                    plain_storage_cursor.delete_current()?
2683                }
2684
2685                // insert value if needed
2686                if !old_storage_value.is_zero() {
2687                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2688                }
2689            }
2690        }
2691
2692        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2693        let mut receipts_iter = self
2694            .static_file_provider
2695            .get_range_with_static_file_or_database(
2696                StaticFileSegment::Receipts,
2697                from_transaction_num..to_transaction_num + 1,
2698                |static_file, range, _| {
2699                    static_file
2700                        .receipts_by_tx_range(range.clone())
2701                        .map(|r| range.into_iter().zip(r).collect())
2702                },
2703                |range, _| {
2704                    self.tx
2705                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2706                        .walk_range(range)?
2707                        .map(|r| r.map_err(Into::into))
2708                        .collect()
2709                },
2710                |_| true,
2711            )?
2712            .into_iter()
2713            .peekable();
2714
2715        let mut receipts = Vec::with_capacity(block_bodies.len());
2716        // loop break if we are at the end of the blocks.
2717        for block_body in block_bodies {
2718            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2719            for num in block_body.tx_num_range() {
2720                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2721                    block_receipts.push(receipts_iter.next().unwrap().1);
2722                }
2723            }
2724            receipts.push(block_receipts);
2725        }
2726
2727        self.remove_receipts_from(from_transaction_num, block)?;
2728
2729        Ok(ExecutionOutcome::new_init(
2730            state,
2731            reverts,
2732            Vec::new(),
2733            receipts,
2734            start_block_number,
2735            Vec::new(),
2736        ))
2737    }
2738}
2739
2740impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2741    /// Writes trie updates to the database with already sorted updates.
2742    ///
2743    /// Returns the number of entries modified.
2744    #[instrument(level = "debug", target = "providers::db", skip_all)]
2745    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2746        if trie_updates.is_empty() {
2747            return Ok(0)
2748        }
2749
2750        // Track the number of inserted entries.
2751        let mut num_entries = 0;
2752
2753        let tx = self.tx_ref();
2754        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2755
2756        // Process sorted account nodes
2757        for (key, updated_node) in trie_updates.account_nodes_ref() {
2758            let nibbles = StoredNibbles(*key);
2759            match updated_node {
2760                Some(node) => {
2761                    if !nibbles.0.is_empty() {
2762                        num_entries += 1;
2763                        account_trie_cursor.upsert(nibbles, node)?;
2764                    }
2765                }
2766                None => {
2767                    num_entries += 1;
2768                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2769                        account_trie_cursor.delete_current()?;
2770                    }
2771                }
2772            }
2773        }
2774
2775        num_entries +=
2776            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2777
2778        Ok(num_entries)
2779    }
2780}
2781
2782impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2783    /// Writes storage trie updates from the given storage trie map with already sorted updates.
2784    ///
2785    /// Expects the storage trie updates to already be sorted by the hashed address key.
2786    ///
2787    /// Returns the number of entries modified.
2788    fn write_storage_trie_updates_sorted<'a>(
2789        &self,
2790        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2791    ) -> ProviderResult<usize> {
2792        let mut num_entries = 0;
2793        let mut storage_tries = storage_tries.collect::<Vec<_>>();
2794        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2795        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2796        for (hashed_address, storage_trie_updates) in storage_tries {
2797            let mut db_storage_trie_cursor =
2798                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2799            num_entries +=
2800                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2801            cursor = db_storage_trie_cursor.cursor;
2802        }
2803
2804        Ok(num_entries)
2805    }
2806}
2807
2808impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2809    fn unwind_account_hashing<'a>(
2810        &self,
2811        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2812    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2813        // Aggregate all block changesets and make a list of accounts that have been changed.
2814        // Note that collecting and then reversing the order is necessary to ensure that the
2815        // changes are applied in the correct order.
2816        let hashed_accounts = changesets
2817            .into_iter()
2818            .map(|(_, e)| (keccak256(e.address), e.info))
2819            .collect::<Vec<_>>()
2820            .into_iter()
2821            .rev()
2822            .collect::<BTreeMap<_, _>>();
2823
2824        // Apply values to HashedState, and remove the account if it's None.
2825        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2826        for (hashed_address, account) in &hashed_accounts {
2827            if let Some(account) = account {
2828                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2829            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2830                hashed_accounts_cursor.delete_current()?;
2831            }
2832        }
2833
2834        Ok(hashed_accounts)
2835    }
2836
2837    fn unwind_account_hashing_range(
2838        &self,
2839        range: impl RangeBounds<BlockNumber>,
2840    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2841        let changesets = self
2842            .tx
2843            .cursor_read::<tables::AccountChangeSets>()?
2844            .walk_range(range)?
2845            .collect::<Result<Vec<_>, _>>()?;
2846        self.unwind_account_hashing(changesets.iter())
2847    }
2848
2849    fn insert_account_for_hashing(
2850        &self,
2851        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2852    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2853        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2854        let hashed_accounts =
2855            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2856        for (hashed_address, account) in &hashed_accounts {
2857            if let Some(account) = account {
2858                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2859            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2860                hashed_accounts_cursor.delete_current()?;
2861            }
2862        }
2863        Ok(hashed_accounts)
2864    }
2865
2866    fn unwind_storage_hashing(
2867        &self,
2868        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2869    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2870        // Aggregate all block changesets and make list of accounts that have been changed.
2871        let mut hashed_storages = changesets
2872            .into_iter()
2873            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2874                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2875            })
2876            .collect::<Vec<_>>();
2877        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2878
2879        // Apply values to HashedState, and remove the account if it's None.
2880        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2881            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2882        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2883        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2884            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2885
2886            if hashed_storage
2887                .seek_by_key_subkey(hashed_address, key)?
2888                .filter(|entry| entry.key == key)
2889                .is_some()
2890            {
2891                hashed_storage.delete_current()?;
2892            }
2893
2894            if !value.is_zero() {
2895                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2896            }
2897        }
2898        Ok(hashed_storage_keys)
2899    }
2900
2901    fn unwind_storage_hashing_range(
2902        &self,
2903        range: impl RangeBounds<BlockNumberAddress>,
2904    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2905        let changesets = self
2906            .tx
2907            .cursor_read::<tables::StorageChangeSets>()?
2908            .walk_range(range)?
2909            .collect::<Result<Vec<_>, _>>()?;
2910        self.unwind_storage_hashing(changesets.into_iter())
2911    }
2912
2913    fn insert_storage_for_hashing(
2914        &self,
2915        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2916    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2917        // hash values
2918        let hashed_storages =
2919            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2920                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2921                    map.insert(keccak256(entry.key), entry.value);
2922                    map
2923                });
2924                map.insert(keccak256(address), storage);
2925                map
2926            });
2927
2928        let hashed_storage_keys = hashed_storages
2929            .iter()
2930            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2931            .collect();
2932
2933        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2934        // Hash the address and key and apply them to HashedStorage (if Storage is None
2935        // just remove it);
2936        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2937            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2938                if hashed_storage_cursor
2939                    .seek_by_key_subkey(hashed_address, key)?
2940                    .filter(|entry| entry.key == key)
2941                    .is_some()
2942                {
2943                    hashed_storage_cursor.delete_current()?;
2944                }
2945
2946                if !value.is_zero() {
2947                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2948                }
2949                Ok(())
2950            })
2951        })?;
2952
2953        Ok(hashed_storage_keys)
2954    }
2955}
2956
2957impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2958    fn unwind_account_history_indices<'a>(
2959        &self,
2960        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2961    ) -> ProviderResult<usize> {
2962        let mut last_indices = changesets
2963            .into_iter()
2964            .map(|(index, account)| (account.address, *index))
2965            .collect::<Vec<_>>();
2966        last_indices.sort_unstable_by_key(|(a, _)| *a);
2967
2968        if self.cached_storage_settings().account_history_in_rocksdb {
2969            #[cfg(all(unix, feature = "rocksdb"))]
2970            {
2971                let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
2972                self.pending_rocksdb_batches.lock().push(batch);
2973            }
2974        } else {
2975            // Unwind the account history index in MDBX.
2976            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2977            for &(address, rem_index) in &last_indices {
2978                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2979                    &mut cursor,
2980                    ShardedKey::last(address),
2981                    rem_index,
2982                    |sharded_key| sharded_key.key == address,
2983                )?;
2984
2985                // Check the last returned partial shard.
2986                // If it's not empty, the shard needs to be reinserted.
2987                if !partial_shard.is_empty() {
2988                    cursor.insert(
2989                        ShardedKey::last(address),
2990                        &BlockNumberList::new_pre_sorted(partial_shard),
2991                    )?;
2992                }
2993            }
2994        }
2995
2996        let changesets = last_indices.len();
2997        Ok(changesets)
2998    }
2999
3000    fn unwind_account_history_indices_range(
3001        &self,
3002        range: impl RangeBounds<BlockNumber>,
3003    ) -> ProviderResult<usize> {
3004        let changesets = self
3005            .tx
3006            .cursor_read::<tables::AccountChangeSets>()?
3007            .walk_range(range)?
3008            .collect::<Result<Vec<_>, _>>()?;
3009        self.unwind_account_history_indices(changesets.iter())
3010    }
3011
3012    fn insert_account_history_index(
3013        &self,
3014        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3015    ) -> ProviderResult<()> {
3016        self.append_history_index::<_, tables::AccountsHistory>(
3017            account_transitions,
3018            ShardedKey::new,
3019        )
3020    }
3021
3022    fn unwind_storage_history_indices(
3023        &self,
3024        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3025    ) -> ProviderResult<usize> {
3026        let mut storage_changesets = changesets
3027            .into_iter()
3028            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3029            .collect::<Vec<_>>();
3030        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
3031
3032        if self.cached_storage_settings().storages_history_in_rocksdb {
3033            #[cfg(all(unix, feature = "rocksdb"))]
3034            {
3035                let batch =
3036                    self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3037                self.pending_rocksdb_batches.lock().push(batch);
3038            }
3039        } else {
3040            // Unwind the storage history index in MDBX.
3041            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3042            for &(address, storage_key, rem_index) in &storage_changesets {
3043                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3044                    &mut cursor,
3045                    StorageShardedKey::last(address, storage_key),
3046                    rem_index,
3047                    |storage_sharded_key| {
3048                        storage_sharded_key.address == address &&
3049                            storage_sharded_key.sharded_key.key == storage_key
3050                    },
3051                )?;
3052
3053                // Check the last returned partial shard.
3054                // If it's not empty, the shard needs to be reinserted.
3055                if !partial_shard.is_empty() {
3056                    cursor.insert(
3057                        StorageShardedKey::last(address, storage_key),
3058                        &BlockNumberList::new_pre_sorted(partial_shard),
3059                    )?;
3060                }
3061            }
3062        }
3063
3064        let changesets = storage_changesets.len();
3065        Ok(changesets)
3066    }
3067
3068    fn unwind_storage_history_indices_range(
3069        &self,
3070        range: impl RangeBounds<BlockNumberAddress>,
3071    ) -> ProviderResult<usize> {
3072        let changesets = self
3073            .tx
3074            .cursor_read::<tables::StorageChangeSets>()?
3075            .walk_range(range)?
3076            .collect::<Result<Vec<_>, _>>()?;
3077        self.unwind_storage_history_indices(changesets.into_iter())
3078    }
3079
3080    fn insert_storage_history_index(
3081        &self,
3082        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3083    ) -> ProviderResult<()> {
3084        self.append_history_index::<_, tables::StoragesHistory>(
3085            storage_transitions,
3086            |(address, storage_key), highest_block_number| {
3087                StorageShardedKey::new(address, storage_key, highest_block_number)
3088            },
3089        )
3090    }
3091
3092    #[instrument(level = "debug", target = "providers::db", skip_all)]
3093    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3094        let storage_settings = self.cached_storage_settings();
3095        if !storage_settings.account_history_in_rocksdb {
3096            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3097            self.insert_account_history_index(indices)?;
3098        }
3099
3100        if !storage_settings.storages_history_in_rocksdb {
3101            let indices = self.changed_storages_and_blocks_with_range(range)?;
3102            self.insert_storage_history_index(indices)?;
3103        }
3104
3105        Ok(())
3106    }
3107}
3108
3109impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3110    for DatabaseProvider<TX, N>
3111{
3112    fn take_block_and_execution_above(
3113        &self,
3114        block: BlockNumber,
3115    ) -> ProviderResult<Chain<Self::Primitives>> {
3116        let range = block + 1..=self.last_block_number()?;
3117
3118        self.unwind_trie_state_from(block + 1)?;
3119
3120        // get execution res
3121        let execution_state = self.take_state_above(block)?;
3122
3123        let blocks = self.recovered_block_range(range)?;
3124
3125        // remove block bodies it is needed for both get block range and get block execution results
3126        // that is why it is deleted afterwards.
3127        self.remove_blocks_above(block)?;
3128
3129        // Update pipeline progress
3130        self.update_pipeline_stages(block, true)?;
3131
3132        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3133    }
3134
3135    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3136        self.unwind_trie_state_from(block + 1)?;
3137
3138        // remove execution res
3139        self.remove_state_above(block)?;
3140
3141        // remove block bodies it is needed for both get block range and get block execution results
3142        // that is why it is deleted afterwards.
3143        self.remove_blocks_above(block)?;
3144
3145        // Update pipeline progress
3146        self.update_pipeline_stages(block, true)?;
3147
3148        Ok(())
3149    }
3150}
3151
3152impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3153    for DatabaseProvider<TX, N>
3154{
3155    type Block = BlockTy<N>;
3156    type Receipt = ReceiptTy<N>;
3157
3158    /// Inserts the block into the database, writing to both static files and MDBX.
3159    ///
3160    /// This is a convenience method primarily used in tests. For production use,
3161    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3162    fn insert_block(
3163        &self,
3164        block: &RecoveredBlock<Self::Block>,
3165    ) -> ProviderResult<StoredBlockBodyIndices> {
3166        let block_number = block.number();
3167
3168        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3169        let executed_block = ExecutedBlock::new(
3170            Arc::new(block.clone()),
3171            Arc::new(BlockExecutionOutput {
3172                result: BlockExecutionResult {
3173                    receipts: Default::default(),
3174                    requests: Default::default(),
3175                    gas_used: 0,
3176                    blob_gas_used: 0,
3177                },
3178                state: Default::default(),
3179            }),
3180            ComputedTrieData::default(),
3181        );
3182
3183        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3184        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3185
3186        // Return the body indices
3187        self.block_body_indices(block_number)?
3188            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3189    }
3190
3191    fn append_block_bodies(
3192        &self,
3193        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3194    ) -> ProviderResult<()> {
3195        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3196
3197        // Initialize writer if we will be writing transactions to staticfiles
3198        let mut tx_writer =
3199            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3200
3201        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3202        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3203
3204        // Get id for the next tx_num or zero if there are no transactions.
3205        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3206
3207        for (block_number, body) in &bodies {
3208            // Increment block on static file header.
3209            tx_writer.increment_block(*block_number)?;
3210
3211            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3212            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3213
3214            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3215
3216            // insert block meta
3217            block_indices_cursor.append(*block_number, &block_indices)?;
3218
3219            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3220
3221            let Some(body) = body else { continue };
3222
3223            // write transaction block index
3224            if !body.transactions().is_empty() {
3225                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3226                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3227            }
3228
3229            // write transactions
3230            for transaction in body.transactions() {
3231                tx_writer.append_transaction(next_tx_num, transaction)?;
3232
3233                // Increment transaction id for each transaction.
3234                next_tx_num += 1;
3235            }
3236        }
3237
3238        self.storage.writer().write_block_bodies(self, bodies)?;
3239
3240        Ok(())
3241    }
3242
3243    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3244        let last_block_number = self.last_block_number()?;
3245        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3246        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3247            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3248        }
3249
3250        // Get highest static file block for the total block range
3251        let highest_static_file_block = self
3252            .static_file_provider()
3253            .get_highest_static_file_block(StaticFileSegment::Headers)
3254            .expect("todo: error handling, headers should exist");
3255
3256        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3257        // we remove only what is ABOVE the block.
3258        //
3259        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3260        // will have three blocks to remove, which will be block 8, 7, and 6.
3261        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3262        self.static_file_provider()
3263            .get_writer(block, StaticFileSegment::Headers)?
3264            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3265
3266        // First transaction to be removed
3267        let unwind_tx_from = self
3268            .block_body_indices(block)?
3269            .map(|b| b.next_tx_num())
3270            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3271
3272        // Last transaction to be removed
3273        let unwind_tx_to = self
3274            .tx
3275            .cursor_read::<tables::BlockBodyIndices>()?
3276            .last()?
3277            // shouldn't happen because this was OK above
3278            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3279            .1
3280            .last_tx_num();
3281
3282        if unwind_tx_from <= unwind_tx_to {
3283            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3284            self.with_rocksdb_batch(|batch| {
3285                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3286                for (hash, _) in hashes {
3287                    writer.delete_transaction_hash_number(hash)?;
3288                }
3289                Ok(((), writer.into_raw_rocksdb_batch()))
3290            })?;
3291        }
3292
3293        EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
3294
3295        self.remove_bodies_above(block)?;
3296
3297        Ok(())
3298    }
3299
3300    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3301        self.storage.writer().remove_block_bodies_above(self, block)?;
3302
3303        // First transaction to be removed
3304        let unwind_tx_from = self
3305            .block_body_indices(block)?
3306            .map(|b| b.next_tx_num())
3307            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3308
3309        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3310        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3311
3312        let static_file_tx_num =
3313            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3314
3315        let to_delete = static_file_tx_num
3316            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3317            .unwrap_or_default();
3318
3319        self.static_file_provider
3320            .latest_writer(StaticFileSegment::Transactions)?
3321            .prune_transactions(to_delete, block)?;
3322
3323        Ok(())
3324    }
3325
3326    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3327    fn append_blocks_with_state(
3328        &self,
3329        blocks: Vec<RecoveredBlock<Self::Block>>,
3330        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3331        hashed_state: HashedPostStateSorted,
3332    ) -> ProviderResult<()> {
3333        if blocks.is_empty() {
3334            debug!(target: "providers::db", "Attempted to append empty block range");
3335            return Ok(())
3336        }
3337
3338        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3339        // `None`.
3340        let first_number = blocks[0].number();
3341
3342        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3343        // `None`.
3344        let last_block_number = blocks[blocks.len() - 1].number();
3345
3346        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3347
3348        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3349        // This is necessary because with edge storage, changesets are written to static files
3350        // whose index isn't updated until commit, making them invisible to subsequent reads
3351        // within the same transaction.
3352        let (account_transitions, storage_transitions) = {
3353            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3354            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3355            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3356                let block_number = first_number + block_idx as u64;
3357                for (address, account_revert) in block_reverts {
3358                    account_transitions.entry(*address).or_default().push(block_number);
3359                    for storage_key in account_revert.storage.keys() {
3360                        let key = B256::new(storage_key.to_be_bytes());
3361                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3362                    }
3363                }
3364            }
3365            (account_transitions, storage_transitions)
3366        };
3367
3368        // Insert the blocks
3369        for block in blocks {
3370            self.insert_block(&block)?;
3371            durations_recorder.record_relative(metrics::Action::InsertBlock);
3372        }
3373
3374        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3375        durations_recorder.record_relative(metrics::Action::InsertState);
3376
3377        // insert hashes and intermediate merkle nodes
3378        self.write_hashed_state(&hashed_state)?;
3379        durations_recorder.record_relative(metrics::Action::InsertHashes);
3380
3381        // Use pre-computed transitions for history indices since static file
3382        // writes aren't visible until commit.
3383        self.insert_account_history_index(account_transitions)?;
3384        self.insert_storage_history_index(storage_transitions)?;
3385        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3386
3387        // Update pipeline progress
3388        self.update_pipeline_stages(last_block_number, false)?;
3389        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3390
3391        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3392
3393        Ok(())
3394    }
3395}
3396
3397impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3398    fn get_prune_checkpoint(
3399        &self,
3400        segment: PruneSegment,
3401    ) -> ProviderResult<Option<PruneCheckpoint>> {
3402        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3403    }
3404
3405    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3406        Ok(PruneSegment::variants()
3407            .filter_map(|segment| {
3408                self.tx
3409                    .get::<tables::PruneCheckpoints>(segment)
3410                    .transpose()
3411                    .map(|chk| chk.map(|chk| (segment, chk)))
3412            })
3413            .collect::<Result<_, _>>()?)
3414    }
3415}
3416
3417impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3418    fn save_prune_checkpoint(
3419        &self,
3420        segment: PruneSegment,
3421        checkpoint: PruneCheckpoint,
3422    ) -> ProviderResult<()> {
3423        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3424    }
3425}
3426
3427impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3428    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3429        let db_entries = self.tx.entries::<T>()?;
3430        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3431            Ok(entries) => entries,
3432            Err(ProviderError::UnsupportedProvider) => 0,
3433            Err(err) => return Err(err),
3434        };
3435
3436        Ok(db_entries + static_file_entries)
3437    }
3438}
3439
3440impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3441    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3442        let mut finalized_blocks = self
3443            .tx
3444            .cursor_read::<tables::ChainState>()?
3445            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3446            .take(1)
3447            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3448
3449        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3450        Ok(last_finalized_block_number)
3451    }
3452
3453    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3454        let mut finalized_blocks = self
3455            .tx
3456            .cursor_read::<tables::ChainState>()?
3457            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3458            .take(1)
3459            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3460
3461        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3462        Ok(last_finalized_block_number)
3463    }
3464}
3465
3466impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3467    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3468        Ok(self
3469            .tx
3470            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3471    }
3472
3473    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3474        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3475    }
3476}
3477
3478impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3479    type Tx = TX;
3480
3481    fn tx_ref(&self) -> &Self::Tx {
3482        &self.tx
3483    }
3484
3485    fn tx_mut(&mut self) -> &mut Self::Tx {
3486        &mut self.tx
3487    }
3488
3489    fn into_tx(self) -> Self::Tx {
3490        self.tx
3491    }
3492
3493    fn prune_modes_ref(&self) -> &PruneModes {
3494        self.prune_modes_ref()
3495    }
3496
3497    /// Commit database transaction, static files, and pending `RocksDB` batches.
3498    fn commit(self) -> ProviderResult<()> {
3499        // For unwinding it makes more sense to commit the database first, since if
3500        // it is interrupted before the static files commit, we can just
3501        // truncate the static files according to the
3502        // checkpoints on the next start-up.
3503        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3504            self.tx.commit()?;
3505
3506            #[cfg(all(unix, feature = "rocksdb"))]
3507            {
3508                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3509                for batch in batches {
3510                    self.rocksdb_provider.commit_batch(batch)?;
3511                }
3512            }
3513
3514            self.static_file_provider.commit()?;
3515        } else {
3516            // Normal path: finalize() will call sync_all() if not already synced
3517            let mut timings = metrics::CommitTimings::default();
3518
3519            let start = Instant::now();
3520            self.static_file_provider.finalize()?;
3521            timings.sf = start.elapsed();
3522
3523            #[cfg(all(unix, feature = "rocksdb"))]
3524            {
3525                let start = Instant::now();
3526                let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3527                for batch in batches {
3528                    self.rocksdb_provider.commit_batch(batch)?;
3529                }
3530                timings.rocksdb = start.elapsed();
3531            }
3532
3533            let start = Instant::now();
3534            self.tx.commit()?;
3535            timings.mdbx = start.elapsed();
3536
3537            self.metrics.record_commit(&timings);
3538        }
3539
3540        Ok(())
3541    }
3542}
3543
3544impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3545    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3546        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3547    }
3548}
3549
3550impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3551    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3552        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3553    }
3554}
3555
3556impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3557    fn cached_storage_settings(&self) -> StorageSettings {
3558        *self.storage_settings.read()
3559    }
3560
3561    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3562        *self.storage_settings.write() = settings;
3563    }
3564}
3565
3566#[cfg(test)]
3567mod tests {
3568    use super::*;
3569    use crate::{
3570        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3571        BlockWriter,
3572    };
3573    use alloy_primitives::map::B256Map;
3574    use reth_ethereum_primitives::Receipt;
3575    use reth_testing_utils::generators::{self, random_block, BlockParams};
3576    use reth_trie::{Nibbles, StoredNibblesSubKey};
3577
3578    #[test]
3579    fn test_receipts_by_block_range_empty_range() {
3580        let factory = create_test_provider_factory();
3581        let provider = factory.provider().unwrap();
3582
3583        // empty range should return empty vec
3584        let start = 10u64;
3585        let end = 9u64;
3586        let result = provider.receipts_by_block_range(start..=end).unwrap();
3587        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3588    }
3589
3590    #[test]
3591    fn test_receipts_by_block_range_nonexistent_blocks() {
3592        let factory = create_test_provider_factory();
3593        let provider = factory.provider().unwrap();
3594
3595        // non-existent blocks should return empty vecs for each block
3596        let result = provider.receipts_by_block_range(10..=12).unwrap();
3597        assert_eq!(result, vec![vec![], vec![], vec![]]);
3598    }
3599
3600    #[test]
3601    fn test_receipts_by_block_range_single_block() {
3602        let factory = create_test_provider_factory();
3603        let data = BlockchainTestData::default();
3604
3605        let provider_rw = factory.provider_rw().unwrap();
3606        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3607        provider_rw
3608            .write_state(
3609                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3610                crate::OriginalValuesKnown::No,
3611                StateWriteConfig::default(),
3612            )
3613            .unwrap();
3614        provider_rw.insert_block(&data.blocks[0].0).unwrap();
3615        provider_rw
3616            .write_state(
3617                &data.blocks[0].1,
3618                crate::OriginalValuesKnown::No,
3619                StateWriteConfig::default(),
3620            )
3621            .unwrap();
3622        provider_rw.commit().unwrap();
3623
3624        let provider = factory.provider().unwrap();
3625        let result = provider.receipts_by_block_range(1..=1).unwrap();
3626
3627        // should have one vec with one receipt
3628        assert_eq!(result.len(), 1);
3629        assert_eq!(result[0].len(), 1);
3630        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3631    }
3632
3633    #[test]
3634    fn test_receipts_by_block_range_multiple_blocks() {
3635        let factory = create_test_provider_factory();
3636        let data = BlockchainTestData::default();
3637
3638        let provider_rw = factory.provider_rw().unwrap();
3639        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3640        provider_rw
3641            .write_state(
3642                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3643                crate::OriginalValuesKnown::No,
3644                StateWriteConfig::default(),
3645            )
3646            .unwrap();
3647        for i in 0..3 {
3648            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3649            provider_rw
3650                .write_state(
3651                    &data.blocks[i].1,
3652                    crate::OriginalValuesKnown::No,
3653                    StateWriteConfig::default(),
3654                )
3655                .unwrap();
3656        }
3657        provider_rw.commit().unwrap();
3658
3659        let provider = factory.provider().unwrap();
3660        let result = provider.receipts_by_block_range(1..=3).unwrap();
3661
3662        // should have 3 vecs, each with one receipt
3663        assert_eq!(result.len(), 3);
3664        for (i, block_receipts) in result.iter().enumerate() {
3665            assert_eq!(block_receipts.len(), 1);
3666            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3667        }
3668    }
3669
3670    #[test]
3671    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3672        let factory = create_test_provider_factory();
3673        let data = BlockchainTestData::default();
3674
3675        let provider_rw = factory.provider_rw().unwrap();
3676        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3677        provider_rw
3678            .write_state(
3679                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3680                crate::OriginalValuesKnown::No,
3681                StateWriteConfig::default(),
3682            )
3683            .unwrap();
3684
3685        // insert blocks 1-3 with receipts
3686        for i in 0..3 {
3687            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3688            provider_rw
3689                .write_state(
3690                    &data.blocks[i].1,
3691                    crate::OriginalValuesKnown::No,
3692                    StateWriteConfig::default(),
3693                )
3694                .unwrap();
3695        }
3696        provider_rw.commit().unwrap();
3697
3698        let provider = factory.provider().unwrap();
3699        let result = provider.receipts_by_block_range(1..=3).unwrap();
3700
3701        // verify each block has one receipt
3702        assert_eq!(result.len(), 3);
3703        for block_receipts in &result {
3704            assert_eq!(block_receipts.len(), 1);
3705        }
3706    }
3707
3708    #[test]
3709    fn test_receipts_by_block_range_partial_range() {
3710        let factory = create_test_provider_factory();
3711        let data = BlockchainTestData::default();
3712
3713        let provider_rw = factory.provider_rw().unwrap();
3714        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3715        provider_rw
3716            .write_state(
3717                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3718                crate::OriginalValuesKnown::No,
3719                StateWriteConfig::default(),
3720            )
3721            .unwrap();
3722        for i in 0..3 {
3723            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3724            provider_rw
3725                .write_state(
3726                    &data.blocks[i].1,
3727                    crate::OriginalValuesKnown::No,
3728                    StateWriteConfig::default(),
3729                )
3730                .unwrap();
3731        }
3732        provider_rw.commit().unwrap();
3733
3734        let provider = factory.provider().unwrap();
3735
3736        // request range that includes both existing and non-existing blocks
3737        let result = provider.receipts_by_block_range(2..=5).unwrap();
3738        assert_eq!(result.len(), 4);
3739
3740        // blocks 2-3 should have receipts, blocks 4-5 should be empty
3741        assert_eq!(result[0].len(), 1); // block 2
3742        assert_eq!(result[1].len(), 1); // block 3
3743        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
3744        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
3745
3746        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3747        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3748    }
3749
3750    #[test]
3751    fn test_receipts_by_block_range_all_empty_blocks() {
3752        let factory = create_test_provider_factory();
3753        let mut rng = generators::rng();
3754
3755        // create blocks with no transactions
3756        let mut blocks = Vec::new();
3757        for i in 0..3 {
3758            let block =
3759                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3760            blocks.push(block);
3761        }
3762
3763        let provider_rw = factory.provider_rw().unwrap();
3764        for block in blocks {
3765            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
3766        }
3767        provider_rw.commit().unwrap();
3768
3769        let provider = factory.provider().unwrap();
3770        let result = provider.receipts_by_block_range(1..=3).unwrap();
3771
3772        assert_eq!(result.len(), 3);
3773        for block_receipts in result {
3774            assert_eq!(block_receipts.len(), 0);
3775        }
3776    }
3777
3778    #[test]
3779    fn test_receipts_by_block_range_consistency_with_individual_calls() {
3780        let factory = create_test_provider_factory();
3781        let data = BlockchainTestData::default();
3782
3783        let provider_rw = factory.provider_rw().unwrap();
3784        provider_rw.insert_block(&data.genesis.clone().try_recover().unwrap()).unwrap();
3785        provider_rw
3786            .write_state(
3787                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3788                crate::OriginalValuesKnown::No,
3789                StateWriteConfig::default(),
3790            )
3791            .unwrap();
3792        for i in 0..3 {
3793            provider_rw.insert_block(&data.blocks[i].0).unwrap();
3794            provider_rw
3795                .write_state(
3796                    &data.blocks[i].1,
3797                    crate::OriginalValuesKnown::No,
3798                    StateWriteConfig::default(),
3799                )
3800                .unwrap();
3801        }
3802        provider_rw.commit().unwrap();
3803
3804        let provider = factory.provider().unwrap();
3805
3806        // get receipts using block range method
3807        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3808
3809        // get receipts using individual block calls
3810        let mut individual_results = Vec::new();
3811        for block_num in 1..=3 {
3812            let receipts =
3813                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3814            individual_results.push(receipts);
3815        }
3816
3817        assert_eq!(range_result, individual_results);
3818    }
3819
3820    #[test]
3821    fn test_write_trie_updates_sorted() {
3822        use reth_trie::{
3823            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
3824            BranchNodeCompact, StorageTrieEntry,
3825        };
3826
3827        let factory = create_test_provider_factory();
3828        let provider_rw = factory.provider_rw().unwrap();
3829
3830        // Pre-populate account trie with data that will be deleted
3831        {
3832            let tx = provider_rw.tx_ref();
3833            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
3834
3835            // Add account node that will be deleted
3836            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
3837            cursor
3838                .upsert(
3839                    to_delete,
3840                    &BranchNodeCompact::new(
3841                        0b1010_1010_1010_1010, // state_mask
3842                        0b0000_0000_0000_0000, // tree_mask
3843                        0b0000_0000_0000_0000, // hash_mask
3844                        vec![],
3845                        None,
3846                    ),
3847                )
3848                .unwrap();
3849
3850            // Add account node that will be updated
3851            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
3852            cursor
3853                .upsert(
3854                    to_update,
3855                    &BranchNodeCompact::new(
3856                        0b0101_0101_0101_0101, // old state_mask (will be updated)
3857                        0b0000_0000_0000_0000, // tree_mask
3858                        0b0000_0000_0000_0000, // hash_mask
3859                        vec![],
3860                        None,
3861                    ),
3862                )
3863                .unwrap();
3864        }
3865
3866        // Pre-populate storage tries with data
3867        let storage_address1 = B256::from([1u8; 32]);
3868        let storage_address2 = B256::from([2u8; 32]);
3869        {
3870            let tx = provider_rw.tx_ref();
3871            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
3872
3873            // Add storage nodes for address1 (one will be deleted)
3874            storage_cursor
3875                .upsert(
3876                    storage_address1,
3877                    &StorageTrieEntry {
3878                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
3879                        node: BranchNodeCompact::new(
3880                            0b0011_0011_0011_0011, // will be deleted
3881                            0b0000_0000_0000_0000,
3882                            0b0000_0000_0000_0000,
3883                            vec![],
3884                            None,
3885                        ),
3886                    },
3887                )
3888                .unwrap();
3889
3890            // Add storage nodes for address2 (will be wiped)
3891            storage_cursor
3892                .upsert(
3893                    storage_address2,
3894                    &StorageTrieEntry {
3895                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
3896                        node: BranchNodeCompact::new(
3897                            0b1100_1100_1100_1100, // will be wiped
3898                            0b0000_0000_0000_0000,
3899                            0b0000_0000_0000_0000,
3900                            vec![],
3901                            None,
3902                        ),
3903                    },
3904                )
3905                .unwrap();
3906            storage_cursor
3907                .upsert(
3908                    storage_address2,
3909                    &StorageTrieEntry {
3910                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
3911                        node: BranchNodeCompact::new(
3912                            0b0011_1100_0011_1100, // will be wiped
3913                            0b0000_0000_0000_0000,
3914                            0b0000_0000_0000_0000,
3915                            vec![],
3916                            None,
3917                        ),
3918                    },
3919                )
3920                .unwrap();
3921        }
3922
3923        // Create sorted account trie updates
3924        let account_nodes = vec![
3925            (
3926                Nibbles::from_nibbles([0x1, 0x2]),
3927                Some(BranchNodeCompact::new(
3928                    0b1111_1111_1111_1111, // state_mask (updated)
3929                    0b0000_0000_0000_0000, // tree_mask
3930                    0b0000_0000_0000_0000, // hash_mask (no hashes)
3931                    vec![],
3932                    None,
3933                )),
3934            ),
3935            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
3936            (
3937                Nibbles::from_nibbles([0x5, 0x6]),
3938                Some(BranchNodeCompact::new(
3939                    0b1111_1111_1111_1111, // state_mask
3940                    0b0000_0000_0000_0000, // tree_mask
3941                    0b0000_0000_0000_0000, // hash_mask (no hashes)
3942                    vec![],
3943                    None,
3944                )),
3945            ),
3946        ];
3947
3948        // Create sorted storage trie updates
3949        let storage_trie1 = StorageTrieUpdatesSorted {
3950            is_deleted: false,
3951            storage_nodes: vec![
3952                (
3953                    Nibbles::from_nibbles([0x1, 0x0]),
3954                    Some(BranchNodeCompact::new(
3955                        0b1111_0000_0000_0000, // state_mask
3956                        0b0000_0000_0000_0000, // tree_mask
3957                        0b0000_0000_0000_0000, // hash_mask (no hashes)
3958                        vec![],
3959                        None,
3960                    )),
3961                ),
3962                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
3963            ],
3964        };
3965
3966        let storage_trie2 = StorageTrieUpdatesSorted {
3967            is_deleted: true, // Wipe all storage for this address
3968            storage_nodes: vec![],
3969        };
3970
3971        let mut storage_tries = B256Map::default();
3972        storage_tries.insert(storage_address1, storage_trie1);
3973        storage_tries.insert(storage_address2, storage_trie2);
3974
3975        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3976
3977        // Write the sorted trie updates
3978        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
3979
3980        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
3981        // storage deletion = 5
3982        assert_eq!(num_entries, 5);
3983
3984        // Verify account trie updates were written correctly
3985        let tx = provider_rw.tx_ref();
3986        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
3987
3988        // Check first account node was updated
3989        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
3990        let entry1 = cursor.seek_exact(nibbles1).unwrap();
3991        assert!(entry1.is_some(), "Updated account node should exist");
3992        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
3993        assert_eq!(
3994            entry1.unwrap().1.state_mask,
3995            expected_mask,
3996            "Account node should have updated state_mask"
3997        );
3998
3999        // Check deleted account node no longer exists
4000        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4001        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4002        assert!(entry2.is_none(), "Deleted account node should not exist");
4003
4004        // Check new account node exists
4005        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4006        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4007        assert!(entry3.is_some(), "New account node should exist");
4008
4009        // Verify storage trie updates were written correctly
4010        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4011
4012        // Check storage for address1
4013        let storage_entries1: Vec<_> = storage_cursor
4014            .walk_dup(Some(storage_address1), None)
4015            .unwrap()
4016            .collect::<Result<Vec<_>, _>>()
4017            .unwrap();
4018        assert_eq!(
4019            storage_entries1.len(),
4020            1,
4021            "Storage address1 should have 1 entry after deletion"
4022        );
4023        assert_eq!(
4024            storage_entries1[0].1.nibbles.0,
4025            Nibbles::from_nibbles([0x1, 0x0]),
4026            "Remaining entry should be [0x1, 0x0]"
4027        );
4028
4029        // Check storage for address2 was wiped
4030        let storage_entries2: Vec<_> = storage_cursor
4031            .walk_dup(Some(storage_address2), None)
4032            .unwrap()
4033            .collect::<Result<Vec<_>, _>>()
4034            .unwrap();
4035        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4036
4037        provider_rw.commit().unwrap();
4038    }
4039
4040    #[test]
4041    fn test_prunable_receipts_logic() {
4042        let insert_blocks =
4043            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4044                let mut rng = generators::rng();
4045                for block_num in 0..=tip_block {
4046                    let block = random_block(
4047                        &mut rng,
4048                        block_num,
4049                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4050                    );
4051                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4052                }
4053            };
4054
4055        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4056            let outcome = ExecutionOutcome {
4057                first_block: block,
4058                receipts: vec![vec![Receipt {
4059                    tx_type: Default::default(),
4060                    success: true,
4061                    cumulative_gas_used: block, // identifier to assert against
4062                    logs: vec![],
4063                }]],
4064                ..Default::default()
4065            };
4066            provider_rw
4067                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4068                .unwrap();
4069            provider_rw.commit().unwrap();
4070        };
4071
4072        // Legacy mode (receipts in DB) - should be prunable
4073        {
4074            let factory = create_test_provider_factory();
4075            let storage_settings = StorageSettings::legacy();
4076            factory.set_storage_settings_cache(storage_settings);
4077            let factory = factory.with_prune_modes(PruneModes {
4078                receipts: Some(PruneMode::Before(100)),
4079                ..Default::default()
4080            });
4081
4082            let tip_block = 200u64;
4083            let first_block = 1u64;
4084
4085            // create chain
4086            let provider_rw = factory.provider_rw().unwrap();
4087            insert_blocks(&provider_rw, tip_block, 1);
4088            provider_rw.commit().unwrap();
4089
4090            write_receipts(
4091                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4092                first_block,
4093            );
4094            write_receipts(
4095                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4096                tip_block - 1,
4097            );
4098
4099            let provider = factory.provider().unwrap();
4100
4101            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4102                assert!(provider
4103                    .receipts_by_block(block.into())
4104                    .unwrap()
4105                    .is_some_and(|r| r.len() == num_receipts));
4106            }
4107        }
4108
4109        // Static files mode
4110        {
4111            let factory = create_test_provider_factory();
4112            let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
4113            factory.set_storage_settings_cache(storage_settings);
4114            let factory = factory.with_prune_modes(PruneModes {
4115                receipts: Some(PruneMode::Before(2)),
4116                ..Default::default()
4117            });
4118
4119            let tip_block = 200u64;
4120
4121            // create chain
4122            let provider_rw = factory.provider_rw().unwrap();
4123            insert_blocks(&provider_rw, tip_block, 1);
4124            provider_rw.commit().unwrap();
4125
4126            // Attempt to write receipts for block 0 and 1 (should be skipped)
4127            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4128            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4129
4130            assert!(factory
4131                .static_file_provider()
4132                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4133                .is_none(),);
4134            assert!(factory
4135                .static_file_provider()
4136                .get_highest_static_file_block(StaticFileSegment::Receipts)
4137                .is_some_and(|b| b == 1),);
4138
4139            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4140            // static files.
4141            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4142            assert!(factory
4143                .static_file_provider()
4144                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4145                .is_some_and(|num| num == 2),);
4146
4147            // After having a receipt already in static files, attempt to skip the next receipt by
4148            // changing the prune mode. It should NOT skip it and should still write the receipt,
4149            // since static files do not support gaps.
4150            let factory = factory.with_prune_modes(PruneModes {
4151                receipts: Some(PruneMode::Before(100)),
4152                ..Default::default()
4153            });
4154            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4155            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4156            write_receipts(provider_rw, 3);
4157
4158            // Ensure we can only fetch the 2 last receipts.
4159            //
4160            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4161            // to the block number it belongs to easily identify and assert.
4162            let provider = factory.provider().unwrap();
4163            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4164            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4165                assert!(provider
4166                    .receipts_by_block(num.into())
4167                    .unwrap()
4168                    .is_some_and(|r| r.len() == num_receipts));
4169
4170                let receipt = provider.receipt(num).unwrap();
4171                if num_receipts > 0 {
4172                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4173                } else {
4174                    assert!(receipt.is_none());
4175                }
4176            }
4177        }
4178    }
4179}