Skip to main content

reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
6        static_file::{StaticFileWriteCtx, StaticFileWriter},
7        NodeTypesForProvider, StaticFileProvider,
8    },
9    to_range,
10    traits::{
11        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
12    },
13    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
14    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
15    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
16    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
17    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
18    PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
19    RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
20    StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
21    TransactionsProvider, TransactionsProviderExt, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader, TxReceipt,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, AddressSet, B256Map, HashMap},
31    Address, BlockHash, BlockNumber, StorageKey, StorageValue, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use parking_lot::RwLock;
35use rayon::slice::ParallelSliceMut;
36use reth_chain_state::{ComputedTrieData, ExecutedBlock};
37use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
38use reth_db_api::{
39    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
40    database::{Database, ReaderTxnTracker},
41    models::{
42        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
43        BlockNumberAddressRange, ShardedKey, StorageBeforeTx, StorageSettings,
44        StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList,
50};
51use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, FastInstant as Instant, RecoveredBlock,
55    SealedHeader, StorageEntry,
56};
57use reth_prune_types::{
58    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_UNWIND_SAFE_DISTANCE,
59};
60use reth_stages_types::{StageCheckpoint, StageId};
61use reth_static_file_types::StaticFileSegment;
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
64    NodePrimitivesProvider, StateProvider, StateReader, StateWriteConfig, StorageChangeSetReader,
65    StoragePath, StorageSettingsCache, TryIntoHistoricalStateProvider, WriteStateInput,
66};
67use reth_storage_errors::provider::{ProviderResult, StaticFileWriterError};
68use reth_trie::{
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    HashedPostStateSorted,
71};
72use reth_trie_db::{ChangesetCache, DatabaseStorageTrieCursor, TrieTableAdapter};
73use revm_database::states::{
74    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
75};
76use smallvec::SmallVec;
77use std::{
78    cmp::Ordering,
79    collections::{BTreeMap, BTreeSet},
80    fmt::Debug,
81    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
82    path::PathBuf,
83    sync::Arc,
84};
85use tracing::{debug, instrument, trace};
86
87/// Determines the commit order for database operations.
88#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
89pub enum CommitOrder {
90    /// Normal commit order: static files first, then `RocksDB`, then MDBX.
91    #[default]
92    Normal,
93    /// Unwind commit order: MDBX first, then `RocksDB`, then static files.
94    /// Used for unwind operations to allow recovery by truncating static files on restart.
95    Unwind,
96}
97
98impl CommitOrder {
99    /// Returns true if this is unwind commit order.
100    pub const fn is_unwind(&self) -> bool {
101        matches!(self, Self::Unwind)
102    }
103}
104
105/// A [`DatabaseProvider`] that holds a read-only database transaction.
106pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
107
108/// A [`DatabaseProvider`] that holds a read-write database transaction.
109///
110/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
111/// Once that issue is solved, we can probably revert back to being an alias type.
112#[derive(Debug)]
113pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
114    pub DatabaseProvider<<DB as Database>::TXMut, N>,
115);
116
117impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
118    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
119
120    fn deref(&self) -> &Self::Target {
121        &self.0
122    }
123}
124
125impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
126    fn deref_mut(&mut self) -> &mut Self::Target {
127        &mut self.0
128    }
129}
130
131impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
132    for DatabaseProviderRW<DB, N>
133{
134    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
135        &self.0
136    }
137}
138
139impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
140    /// Commit database transaction and static file if it exists.
141    pub fn commit(self) -> ProviderResult<()> {
142        self.0.commit()
143    }
144
145    /// Consume `DbTx` or `DbTxMut`.
146    pub fn into_tx(self) -> <DB as Database>::TXMut {
147        self.0.into_tx()
148    }
149
150    /// Override the minimum pruning distance for testing purposes.
151    #[cfg(any(test, feature = "test-utils"))]
152    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
153        self.0.minimum_pruning_distance = distance;
154        self
155    }
156}
157
158impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
159    for DatabaseProvider<<DB as Database>::TXMut, N>
160{
161    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
162        provider.0
163    }
164}
165
166/// Mode for [`DatabaseProvider::save_blocks`].
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
168pub enum SaveBlocksMode {
169    /// Full mode: write block structure + receipts + state + trie.
170    /// Used by engine/production code.
171    Full,
172    /// Blocks only: write block structure (headers, txs, senders, indices).
173    /// Receipts/state/trie are skipped - they may come later via separate calls.
174    /// Used by `insert_block`.
175    BlocksOnly,
176}
177
178impl SaveBlocksMode {
179    /// Returns `true` if this is [`SaveBlocksMode::Full`].
180    pub const fn with_state(self) -> bool {
181        matches!(self, Self::Full)
182    }
183}
184
185/// A provider struct that fetches data from the database.
186/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
187pub struct DatabaseProvider<TX, N: NodeTypes> {
188    /// Database transaction.
189    tx: TX,
190    /// Chain spec
191    chain_spec: Arc<N::ChainSpec>,
192    /// Static File provider
193    static_file_provider: StaticFileProvider<N::Primitives>,
194    /// Pruning configuration
195    prune_modes: PruneModes,
196    /// Node storage handler.
197    storage: Arc<N::Storage>,
198    /// Storage configuration settings for this node
199    storage_settings: Arc<RwLock<StorageSettings>>,
200    /// `RocksDB` provider
201    rocksdb_provider: RocksDBProvider,
202    /// Changeset cache for trie unwinding
203    changeset_cache: ChangesetCache,
204    /// Task runtime for spawning parallel I/O work.
205    runtime: reth_tasks::Runtime,
206    /// Path to the database directory.
207    db_path: PathBuf,
208    /// Pending `RocksDB` batches to be committed at provider commit time.
209    pending_rocksdb_batches: PendingRocksDBBatches,
210    /// Commit order for database operations.
211    commit_order: CommitOrder,
212    /// Minimum distance from tip required for pruning
213    minimum_pruning_distance: u64,
214    /// Database provider metrics
215    metrics: metrics::DatabaseProviderMetrics,
216    /// Database handle used to inspect active MDBX readers during unwind commits.
217    reader_txn_tracker: Option<Arc<dyn ReaderTxnTracker>>,
218}
219
220impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        let mut s = f.debug_struct("DatabaseProvider");
223        s.field("tx", &self.tx)
224            .field("chain_spec", &self.chain_spec)
225            .field("static_file_provider", &self.static_file_provider)
226            .field("prune_modes", &self.prune_modes)
227            .field("storage", &self.storage)
228            .field("storage_settings", &self.storage_settings)
229            .field("rocksdb_provider", &self.rocksdb_provider)
230            .field("changeset_cache", &self.changeset_cache)
231            .field("runtime", &self.runtime)
232            .field("pending_rocksdb_batches", &"<pending batches>")
233            .field("commit_order", &self.commit_order)
234            .field("minimum_pruning_distance", &self.minimum_pruning_distance)
235            .field("reader_txn_tracker", &"<reader txn tracker>")
236            .finish()
237    }
238}
239
240impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
241    /// Returns reference to prune modes.
242    pub const fn prune_modes_ref(&self) -> &PruneModes {
243        &self.prune_modes
244    }
245
246    /// Sets the minimum pruning distance.
247    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
248        self.minimum_pruning_distance = distance;
249        self
250    }
251
252    /// Attaches reader tracking so unwind commits can wait on active readers.
253    pub(crate) fn with_reader_txn_tracker<T>(mut self, reader_txn_tracker: T) -> Self
254    where
255        T: ReaderTxnTracker + 'static,
256    {
257        self.reader_txn_tracker = Some(Arc::new(reader_txn_tracker));
258        self
259    }
260}
261
262impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
263    /// Commits unwind writes in MDBX -> `RocksDB` -> static-file order.
264    ///
265    /// This keeps MDBX as the first durable step so an interrupted unwind can be recovered by
266    /// truncating static files from checkpoints on the next startup.
267    ///
268    /// This waits after the MDBX commit so readers holding older MDBX-visible views cannot overlap
269    /// later cross-store unwind steps.
270    ///
271    /// Historical `storage_v2` reads ignore `RocksDB` history entries above their MDBX-visible tip,
272    /// so no additional post-`RocksDB` wait is needed before static-file commit.
273    fn commit_unwind(self) -> ProviderResult<()> {
274        let storage_v2 = self.cached_storage_settings().storage_v2;
275        let reader_txn_tracker = self.reader_txn_tracker.clone();
276        self.tx.commit()?;
277
278        if let Some(reader_txn_tracker) = reader_txn_tracker.as_ref() {
279            reader_txn_tracker.wait_for_pre_commit_readers();
280        }
281
282        if storage_v2 {
283            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
284            for batch in batches {
285                self.rocksdb_provider.commit_batch(batch)?;
286            }
287        }
288
289        self.static_file_provider.commit()?;
290        Ok(())
291    }
292
293    /// State provider for latest state
294    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
295        trace!(target: "providers::db", "Returning latest state provider");
296        Box::new(LatestStateProviderRef::new(self))
297    }
298
299    /// Storage provider for state at that given block hash
300    pub fn history_by_block_hash<'a>(
301        &'a self,
302        block_hash: BlockHash,
303    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
304        let block_number =
305            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
306        self.history_by_block_number(block_number)
307    }
308
309    /// Storage provider for state at that given block number
310    pub fn history_by_block_number<'a>(
311        &'a self,
312        mut block_number: BlockNumber,
313    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
314        if block_number == self.best_block_number().unwrap_or_default() &&
315            block_number == self.last_block_number().unwrap_or_default()
316        {
317            return Ok(Box::new(LatestStateProviderRef::new(self)))
318        }
319
320        // +1 as the changeset that we want is the one that was applied after this block.
321        block_number += 1;
322
323        let account_history_prune_checkpoint =
324            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
325        let storage_history_prune_checkpoint =
326            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
327
328        let mut state_provider =
329            HistoricalStateProviderRef::new(self, block_number, self.changeset_cache.clone());
330        // If we pruned account or storage history, we can't return state on every historical block.
331        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
332        if let Some(prune_checkpoint_block_number) =
333            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
334        {
335            state_provider = state_provider.with_lowest_available_account_history_block_number(
336                prune_checkpoint_block_number + 1,
337            );
338        }
339        if let Some(prune_checkpoint_block_number) =
340            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
341        {
342            state_provider = state_provider.with_lowest_available_storage_history_block_number(
343                prune_checkpoint_block_number + 1,
344            );
345        }
346
347        Ok(Box::new(state_provider))
348    }
349
350    #[cfg(feature = "test-utils")]
351    /// Sets the prune modes for provider.
352    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
353        self.prune_modes = prune_modes;
354    }
355}
356
357impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
358    type Primitives = N::Primitives;
359}
360
361impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
362    /// Returns a static file provider
363    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
364        self.static_file_provider.clone()
365    }
366
367    fn get_static_file_writer(
368        &self,
369        block: BlockNumber,
370        segment: StaticFileSegment,
371    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
372        self.static_file_provider.get_writer(block, segment)
373    }
374}
375
376impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
377    /// Returns the `RocksDB` provider.
378    fn rocksdb_provider(&self) -> RocksDBProvider {
379        self.rocksdb_provider.clone()
380    }
381
382    fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
383        self.pending_rocksdb_batches.lock().push(batch);
384    }
385
386    fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> {
387        let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
388        for batch in batches {
389            self.rocksdb_provider.commit_batch(batch)?;
390        }
391        Ok(())
392    }
393}
394
395impl<TX: Debug + Send, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
396    for DatabaseProvider<TX, N>
397{
398    type ChainSpec = N::ChainSpec;
399
400    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
401        self.chain_spec.clone()
402    }
403}
404
405impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
406    /// Creates a provider with an inner read-write transaction.
407    #[expect(clippy::too_many_arguments)]
408    fn new_rw_inner(
409        tx: TX,
410        chain_spec: Arc<N::ChainSpec>,
411        static_file_provider: StaticFileProvider<N::Primitives>,
412        prune_modes: PruneModes,
413        storage: Arc<N::Storage>,
414        storage_settings: Arc<RwLock<StorageSettings>>,
415        rocksdb_provider: RocksDBProvider,
416        changeset_cache: ChangesetCache,
417        runtime: reth_tasks::Runtime,
418        db_path: PathBuf,
419        commit_order: CommitOrder,
420    ) -> Self {
421        Self {
422            tx,
423            chain_spec,
424            static_file_provider,
425            prune_modes,
426            storage,
427            storage_settings,
428            rocksdb_provider,
429            changeset_cache,
430            runtime,
431            db_path,
432            pending_rocksdb_batches: Default::default(),
433            commit_order,
434            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
435            metrics: metrics::DatabaseProviderMetrics::default(),
436            reader_txn_tracker: None,
437        }
438    }
439
440    /// Creates a provider with an inner read-write transaction using normal commit order.
441    #[expect(clippy::too_many_arguments)]
442    pub fn new_rw(
443        tx: TX,
444        chain_spec: Arc<N::ChainSpec>,
445        static_file_provider: StaticFileProvider<N::Primitives>,
446        prune_modes: PruneModes,
447        storage: Arc<N::Storage>,
448        storage_settings: Arc<RwLock<StorageSettings>>,
449        rocksdb_provider: RocksDBProvider,
450        changeset_cache: ChangesetCache,
451        runtime: reth_tasks::Runtime,
452        db_path: PathBuf,
453    ) -> Self {
454        Self::new_rw_inner(
455            tx,
456            chain_spec,
457            static_file_provider,
458            prune_modes,
459            storage,
460            storage_settings,
461            rocksdb_provider,
462            changeset_cache,
463            runtime,
464            db_path,
465            CommitOrder::Normal,
466        )
467    }
468
469    /// Creates a provider with an inner read-write transaction using unwind commit order.
470    #[expect(clippy::too_many_arguments)]
471    pub fn new_unwind_rw(
472        tx: TX,
473        chain_spec: Arc<N::ChainSpec>,
474        static_file_provider: StaticFileProvider<N::Primitives>,
475        prune_modes: PruneModes,
476        storage: Arc<N::Storage>,
477        storage_settings: Arc<RwLock<StorageSettings>>,
478        rocksdb_provider: RocksDBProvider,
479        changeset_cache: ChangesetCache,
480        runtime: reth_tasks::Runtime,
481        db_path: PathBuf,
482    ) -> Self {
483        Self::new_rw_inner(
484            tx,
485            chain_spec,
486            static_file_provider,
487            prune_modes,
488            storage,
489            storage_settings,
490            rocksdb_provider,
491            changeset_cache,
492            runtime,
493            db_path,
494            CommitOrder::Unwind,
495        )
496    }
497}
498
499impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
500    fn as_ref(&self) -> &Self {
501        self
502    }
503}
504
505impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
506    /// Executes a closure with a `RocksDB` batch, automatically registering it for commit.
507    ///
508    /// This helper encapsulates all the cfg-gated `RocksDB` batch handling.
509    pub fn with_rocksdb_batch<F, R>(&self, f: F) -> ProviderResult<R>
510    where
511        F: FnOnce(RocksBatchArg<'_>) -> ProviderResult<(R, Option<RawRocksDBBatch>)>,
512    {
513        let rocksdb = self.rocksdb_provider();
514        let rocksdb_batch = rocksdb.batch();
515
516        let (result, raw_batch) = f(rocksdb_batch)?;
517
518        if let Some(batch) = raw_batch {
519            self.set_pending_rocksdb_batch(batch);
520        }
521        let _ = raw_batch; // silence unused warning when rocksdb feature is disabled
522
523        Ok(result)
524    }
525
526    /// Creates the context for static file writes.
527    fn static_file_write_ctx(
528        &self,
529        save_mode: SaveBlocksMode,
530        first_block: BlockNumber,
531        last_block: BlockNumber,
532    ) -> ProviderResult<StaticFileWriteCtx> {
533        let tip = self.last_block_number()?.max(last_block);
534        Ok(StaticFileWriteCtx {
535            write_senders: EitherWriterDestination::senders(self).is_static_file() &&
536                self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()),
537            write_receipts: save_mode.with_state() &&
538                EitherWriter::receipts_destination(self).is_static_file(),
539            write_account_changesets: save_mode.with_state() &&
540                EitherWriterDestination::account_changesets(self).is_static_file(),
541            write_storage_changesets: save_mode.with_state() &&
542                EitherWriterDestination::storage_changesets(self).is_static_file(),
543            tip,
544            receipts_prune_mode: self.prune_modes.receipts,
545            // Receipts are prunable if no receipts exist in SF yet and within pruning distance
546            receipts_prunable: self
547                .static_file_provider
548                .get_highest_static_file_tx(StaticFileSegment::Receipts)
549                .is_none() &&
550                PruneMode::Distance(self.minimum_pruning_distance)
551                    .should_prune(first_block, tip),
552        })
553    }
554
555    /// Creates the context for `RocksDB` writes.
556    fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
557        RocksDBWriteCtx {
558            first_block_number: first_block,
559            prune_tx_lookup: self.prune_modes.transaction_lookup,
560            storage_settings: self.cached_storage_settings(),
561            pending_batches: self.pending_rocksdb_batches.clone(),
562        }
563    }
564
565    /// Writes executed blocks and state to storage.
566    ///
567    /// This method parallelizes static file (SF) writes with MDBX writes.
568    /// The SF thread writes headers, transactions, senders (if SF), and receipts (if SF, Full mode
569    /// only). The main thread writes MDBX data (indices, state, trie - Full mode only).
570    ///
571    /// Use [`SaveBlocksMode::Full`] for production (includes receipts, state, trie).
572    /// Use [`SaveBlocksMode::BlocksOnly`] for block structure only (used by `insert_block`).
573    #[instrument(level = "debug", target = "providers::db", skip_all, fields(block_count = blocks.len()))]
574    pub fn save_blocks(
575        &self,
576        blocks: Vec<ExecutedBlock<N::Primitives>>,
577        save_mode: SaveBlocksMode,
578    ) -> ProviderResult<()> {
579        if blocks.is_empty() {
580            debug!(target: "providers::db", "Attempted to write empty block range");
581            return Ok(())
582        }
583
584        let total_start = Instant::now();
585        let block_count = blocks.len() as u64;
586        let first_number = blocks.first().unwrap().recovered_block().number();
587        let last_block_number = blocks.last().unwrap().recovered_block().number();
588
589        debug!(target: "providers::db", block_count, "Writing blocks and execution data to storage");
590
591        // Compute tx_nums upfront (both threads need these)
592        let first_tx_num = self
593            .tx
594            .cursor_read::<tables::TransactionBlocks>()?
595            .last()?
596            .map(|(n, _)| n + 1)
597            .unwrap_or_default();
598
599        let tx_nums: SmallVec<[TxNumber; 4]> = {
600            let mut nums = SmallVec::with_capacity(blocks.len());
601            let mut current = first_tx_num;
602            for block in &blocks {
603                nums.push(current);
604                current += block.recovered_block().body().transaction_count() as u64;
605            }
606            nums
607        };
608
609        let mut timings =
610            metrics::SaveBlocksTimings { batch_size: block_count, ..Default::default() };
611
612        // avoid capturing &self.tx in scope below.
613        let sf_provider = &self.static_file_provider;
614        let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
615        let rocksdb_provider = self.rocksdb_provider.clone();
616        let rocksdb_ctx = self.rocksdb_write_ctx(first_number);
617        let rocksdb_enabled = rocksdb_ctx.storage_settings.storage_v2;
618
619        let mut sf_result = None;
620        let mut rocksdb_result = None;
621
622        // Write to all backends in parallel.
623        let runtime = &self.runtime;
624        // Propagate tracing context into rayon-spawned threads so that static file
625        // and RocksDB write spans appear as children of save_blocks in traces.
626        let span = tracing::Span::current();
627        runtime.storage_pool().in_place_scope(|s| {
628            // SF writes
629            s.spawn(|_| {
630                let _guard = span.enter();
631                let start = Instant::now();
632                sf_result = Some(
633                    sf_provider
634                        .write_blocks_data(&blocks, &tx_nums, sf_ctx, runtime)
635                        .map(|()| start.elapsed()),
636                );
637            });
638
639            // RocksDB writes
640            if rocksdb_enabled {
641                s.spawn(|_| {
642                    let _guard = span.enter();
643                    let start = Instant::now();
644                    rocksdb_result = Some(
645                        rocksdb_provider
646                            .write_blocks_data(&blocks, &tx_nums, rocksdb_ctx, runtime)
647                            .map(|()| start.elapsed()),
648                    );
649                });
650            }
651
652            // MDBX writes
653            let mdbx_start = Instant::now();
654
655            // Collect all transaction hashes across all blocks, sort them, and write in batch
656            if !self.cached_storage_settings().storage_v2 &&
657                self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full())
658            {
659                let start = Instant::now();
660                let total_tx_count: usize =
661                    blocks.iter().map(|b| b.recovered_block().body().transaction_count()).sum();
662                let mut all_tx_hashes = Vec::with_capacity(total_tx_count);
663                for (i, block) in blocks.iter().enumerate() {
664                    let recovered_block = block.recovered_block();
665                    for (tx_num, transaction) in
666                        (tx_nums[i]..).zip(recovered_block.body().transactions_iter())
667                    {
668                        all_tx_hashes.push((*transaction.tx_hash(), tx_num));
669                    }
670                }
671
672                // Sort by hash for optimal MDBX insertion performance
673                all_tx_hashes.sort_unstable_by_key(|(hash, _)| *hash);
674
675                // Write all transaction hash numbers in a single batch
676                self.with_rocksdb_batch(|batch| {
677                    let mut tx_hash_writer =
678                        EitherWriter::new_transaction_hash_numbers(self, batch)?;
679                    tx_hash_writer.put_transaction_hash_numbers_batch(all_tx_hashes, false)?;
680                    let raw_batch = tx_hash_writer.into_raw_rocksdb_batch();
681                    Ok(((), raw_batch))
682                })?;
683                self.metrics.record_duration(
684                    metrics::Action::InsertTransactionHashNumbers,
685                    start.elapsed(),
686                );
687            }
688
689            for (i, block) in blocks.iter().enumerate() {
690                let recovered_block = block.recovered_block();
691
692                let start = Instant::now();
693                self.insert_block_mdbx_only(recovered_block, tx_nums[i])?;
694                timings.insert_block += start.elapsed();
695
696                if save_mode.with_state() {
697                    let execution_output = block.execution_outcome();
698
699                    // Write state and changesets to the database.
700                    // Must be written after blocks because of the receipt lookup.
701                    // Skip receipts/account changesets if they're being written to static files.
702                    let start = Instant::now();
703                    self.write_state(
704                        WriteStateInput::Single {
705                            outcome: execution_output,
706                            block: recovered_block.number(),
707                        },
708                        OriginalValuesKnown::No,
709                        StateWriteConfig {
710                            write_receipts: !sf_ctx.write_receipts,
711                            write_account_changesets: !sf_ctx.write_account_changesets,
712                            write_storage_changesets: !sf_ctx.write_storage_changesets,
713                        },
714                    )?;
715                    timings.write_state += start.elapsed();
716                }
717            }
718
719            // Write all hashed state and trie updates in single batches.
720            // This reduces cursor open/close overhead from N calls to 1.
721            if save_mode.with_state() {
722                // Blocks are oldest-to-newest, merge_batch expects newest-to-oldest.
723                let start = Instant::now();
724                let merged_hashed_state = HashedPostStateSorted::merge_batch(
725                    blocks.iter().rev().map(|b| b.trie_data().hashed_state),
726                );
727                if !merged_hashed_state.is_empty() {
728                    self.write_hashed_state(&merged_hashed_state)?;
729                }
730                timings.write_hashed_state += start.elapsed();
731
732                let start = Instant::now();
733                let merged_trie =
734                    TrieUpdatesSorted::merge_batch(blocks.iter().rev().map(|b| b.trie_updates()));
735                if !merged_trie.is_empty() {
736                    self.write_trie_updates_sorted(&merged_trie)?;
737                }
738                timings.write_trie_updates += start.elapsed();
739            }
740
741            // Full mode: update history indices
742            if save_mode.with_state() {
743                let start = Instant::now();
744                self.update_history_indices(first_number..=last_block_number)?;
745                timings.update_history_indices = start.elapsed();
746            }
747
748            // Update pipeline progress
749            let start = Instant::now();
750            self.update_pipeline_stages(last_block_number, false)?;
751            timings.update_pipeline_stages = start.elapsed();
752
753            timings.mdbx = mdbx_start.elapsed();
754
755            Ok::<_, ProviderError>(())
756        })?;
757
758        // Collect results from spawned tasks
759        timings.sf = sf_result.ok_or(StaticFileWriterError::ThreadPanic("static file"))??;
760
761        if rocksdb_enabled {
762            timings.rocksdb = rocksdb_result.ok_or_else(|| {
763                ProviderError::Database(reth_db_api::DatabaseError::Other(
764                    "RocksDB thread panicked".into(),
765                ))
766            })??;
767        }
768
769        timings.total = total_start.elapsed();
770
771        self.metrics.record_save_blocks(&timings);
772        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
773
774        Ok(())
775    }
776
777    /// Writes MDBX-only data for a block (indices, lookups, and senders if configured for MDBX).
778    ///
779    /// SF data (headers, transactions, senders if SF, receipts if SF) must be written separately.
780    #[instrument(level = "debug", target = "providers::db", skip_all)]
781    fn insert_block_mdbx_only(
782        &self,
783        block: &RecoveredBlock<BlockTy<N>>,
784        first_tx_num: TxNumber,
785    ) -> ProviderResult<StoredBlockBodyIndices> {
786        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) &&
787            EitherWriterDestination::senders(self).is_database()
788        {
789            let start = Instant::now();
790            let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
791            let mut cursor = self.tx.cursor_write::<tables::TransactionSenders>()?;
792            for (tx_num, sender) in tx_nums_iter.zip(block.senders_iter().copied()) {
793                cursor.append(tx_num, &sender)?;
794            }
795            self.metrics
796                .record_duration(metrics::Action::InsertTransactionSenders, start.elapsed());
797        }
798
799        let block_number = block.number();
800        let tx_count = block.body().transaction_count() as u64;
801
802        let start = Instant::now();
803        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
804        self.metrics.record_duration(metrics::Action::InsertHeaderNumbers, start.elapsed());
805
806        self.write_block_body_indices(block_number, block.body(), first_tx_num, tx_count)?;
807
808        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
809    }
810
811    /// Writes MDBX block body indices (`BlockBodyIndices`, `TransactionBlocks`,
812    /// `Ommers`/`Withdrawals`).
813    fn write_block_body_indices(
814        &self,
815        block_number: BlockNumber,
816        body: &BodyTy<N>,
817        first_tx_num: TxNumber,
818        tx_count: u64,
819    ) -> ProviderResult<()> {
820        // MDBX: BlockBodyIndices
821        let start = Instant::now();
822        self.tx
823            .cursor_write::<tables::BlockBodyIndices>()?
824            .append(block_number, &StoredBlockBodyIndices { first_tx_num, tx_count })?;
825        self.metrics.record_duration(metrics::Action::InsertBlockBodyIndices, start.elapsed());
826
827        // MDBX: TransactionBlocks (last tx -> block mapping)
828        if tx_count > 0 {
829            let start = Instant::now();
830            self.tx
831                .cursor_write::<tables::TransactionBlocks>()?
832                .append(first_tx_num + tx_count - 1, &block_number)?;
833            self.metrics.record_duration(metrics::Action::InsertTransactionBlocks, start.elapsed());
834        }
835
836        // MDBX: Ommers/Withdrawals
837        self.storage.writer().write_block_bodies(self, vec![(block_number, Some(body))])?;
838
839        Ok(())
840    }
841
842    /// Unwinds trie state starting at and including the given block.
843    ///
844    /// This includes calculating the resulted state root and comparing it with the parent block
845    /// state root.
846    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
847        let changed_accounts = self.account_changesets_range(from..)?;
848
849        // Unwind account hashes.
850        self.unwind_account_hashing(changed_accounts.iter())?;
851
852        // Unwind account history indices.
853        self.unwind_account_history_indices(changed_accounts.iter())?;
854
855        let changed_storages = self.storage_changesets_range(from..)?;
856
857        // Unwind storage hashes.
858        self.unwind_storage_hashing(changed_storages.iter().copied())?;
859
860        // Unwind storage history indices.
861        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
862
863        // Unwind accounts/storages trie tables using the revert.
864        // Get the database tip block number
865        let db_tip_block = self
866            .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
867            .as_ref()
868            .map(|chk| chk.block_number)
869            .ok_or_else(|| ProviderError::InsufficientChangesets {
870                requested: from,
871                available: 0..=0,
872            })?;
873
874        let trie_revert = self.changeset_cache.get_or_compute_range(self, from..=db_tip_block)?;
875        self.write_trie_updates_sorted(&trie_revert)?;
876
877        Ok(())
878    }
879
880    /// Removes receipts from all transactions starting with provided number (inclusive).
881    fn remove_receipts_from(
882        &self,
883        from_tx: TxNumber,
884        last_block: BlockNumber,
885    ) -> ProviderResult<()> {
886        // iterate over block body and remove receipts
887        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
888
889        if EitherWriter::receipts_destination(self).is_static_file() {
890            let static_file_receipt_num =
891                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
892
893            let to_delete = static_file_receipt_num
894                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
895                .unwrap_or_default();
896
897            self.static_file_provider
898                .latest_writer(StaticFileSegment::Receipts)?
899                .prune_receipts(to_delete, last_block)?;
900        }
901
902        Ok(())
903    }
904
905    /// Writes bytecodes to MDBX.
906    fn write_bytecodes(
907        &self,
908        bytecodes: impl IntoIterator<Item = (B256, Bytecode)>,
909    ) -> ProviderResult<()> {
910        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
911        for (hash, bytecode) in bytecodes {
912            bytecodes_cursor.upsert(hash, &bytecode)?;
913        }
914        Ok(())
915    }
916}
917
918impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
919    fn try_into_history_at_block(
920        self,
921        mut block_number: BlockNumber,
922    ) -> ProviderResult<StateProviderBox> {
923        let best_block = self.best_block_number().unwrap_or_default();
924
925        // Reject requests for blocks beyond the best block
926        if block_number > best_block {
927            return Err(ProviderError::BlockNotExecuted {
928                requested: block_number,
929                executed: best_block,
930            });
931        }
932
933        // If requesting state at the best block, use the latest state provider
934        if block_number == best_block {
935            return Ok(Box::new(LatestStateProvider::new(self)));
936        }
937
938        // +1 as the changeset that we want is the one that was applied after this block.
939        block_number += 1;
940
941        let account_history_prune_checkpoint =
942            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
943        let storage_history_prune_checkpoint =
944            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
945        let changeset_cache = self.changeset_cache.clone();
946
947        let mut state_provider = HistoricalStateProvider::new(self, block_number, changeset_cache);
948
949        // If we pruned account or storage history, we can't return state on every historical block.
950        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
951        if let Some(prune_checkpoint_block_number) =
952            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
953        {
954            state_provider = state_provider.with_lowest_available_account_history_block_number(
955                prune_checkpoint_block_number + 1,
956            );
957        }
958        if let Some(prune_checkpoint_block_number) =
959            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
960        {
961            state_provider = state_provider.with_lowest_available_storage_history_block_number(
962                prune_checkpoint_block_number + 1,
963            );
964        }
965
966        Ok(Box::new(state_provider))
967    }
968}
969
970/// For a given key, unwind all history shards that contain block numbers at or above the given
971/// block number.
972///
973/// S - Sharded key subtype.
974/// T - Table to walk over.
975/// C - Cursor implementation.
976///
977/// This function walks the entries from the given start key and deletes all shards that belong to
978/// the key and contain block numbers at or above the given block number. Shards entirely below
979/// the block number are preserved.
980///
981/// The boundary shard (the shard that spans across the block number) is removed from the database.
982/// Any indices that are below the block number are filtered out and returned for reinsertion.
983/// The boundary shard is returned for reinsertion (if it's not empty).
984fn unwind_history_shards<S, T, C>(
985    cursor: &mut C,
986    start_key: T::Key,
987    block_number: BlockNumber,
988    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
989) -> ProviderResult<Vec<u64>>
990where
991    T: Table<Value = BlockNumberList>,
992    T::Key: AsRef<ShardedKey<S>>,
993    C: DbCursorRO<T> + DbCursorRW<T>,
994{
995    // Start from the given key and iterate through shards
996    let mut item = cursor.seek_exact(start_key)?;
997    while let Some((sharded_key, list)) = item {
998        // If the shard does not belong to the key, break.
999        if !shard_belongs_to_key(&sharded_key) {
1000            break
1001        }
1002
1003        // Always delete the current shard from the database first
1004        // We'll decide later what (if anything) to reinsert
1005        cursor.delete_current()?;
1006
1007        // Get the first (lowest) block number in this shard
1008        // All block numbers in a shard are sorted in ascending order
1009        let first = list.iter().next().expect("List can't be empty");
1010
1011        // Case 1: Entire shard is at or above the unwinding point
1012        // Keep it deleted (don't return anything for reinsertion)
1013        if first >= block_number {
1014            item = cursor.prev()?;
1015            continue
1016        }
1017        // Case 2: This is a boundary shard (spans across the unwinding point)
1018        // The shard contains some blocks below and some at/above the unwinding point
1019        else if block_number <= sharded_key.as_ref().highest_block_number {
1020            // Return only the block numbers that are below the unwinding point
1021            // These will be reinserted to preserve the historical data
1022            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
1023        }
1024        // Case 3: Entire shard is below the unwinding point
1025        // Return all block numbers for reinsertion (preserve entire shard)
1026        return Ok(list.iter().collect::<Vec<_>>())
1027    }
1028
1029    // No shards found or all processed
1030    Ok(Vec::new())
1031}
1032
1033impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1034    /// Creates a provider with an inner read-only transaction.
1035    #[expect(clippy::too_many_arguments)]
1036    pub fn new(
1037        tx: TX,
1038        chain_spec: Arc<N::ChainSpec>,
1039        static_file_provider: StaticFileProvider<N::Primitives>,
1040        prune_modes: PruneModes,
1041        storage: Arc<N::Storage>,
1042        storage_settings: Arc<RwLock<StorageSettings>>,
1043        rocksdb_provider: RocksDBProvider,
1044        changeset_cache: ChangesetCache,
1045        runtime: reth_tasks::Runtime,
1046        db_path: PathBuf,
1047    ) -> Self {
1048        Self {
1049            tx,
1050            chain_spec,
1051            static_file_provider,
1052            prune_modes,
1053            storage,
1054            storage_settings,
1055            rocksdb_provider,
1056            changeset_cache,
1057            runtime,
1058            db_path,
1059            pending_rocksdb_batches: Default::default(),
1060            commit_order: CommitOrder::Normal,
1061            minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
1062            metrics: metrics::DatabaseProviderMetrics::default(),
1063            reader_txn_tracker: None,
1064        }
1065    }
1066
1067    /// Consume `DbTx` or `DbTxMut`.
1068    pub fn into_tx(self) -> TX {
1069        self.tx
1070    }
1071
1072    /// Pass `DbTx` or `DbTxMut` mutable reference.
1073    pub const fn tx_mut(&mut self) -> &mut TX {
1074        &mut self.tx
1075    }
1076
1077    /// Pass `DbTx` or `DbTxMut` immutable reference.
1078    pub const fn tx_ref(&self) -> &TX {
1079        &self.tx
1080    }
1081
1082    /// Returns a reference to the chain specification.
1083    pub fn chain_spec(&self) -> &N::ChainSpec {
1084        &self.chain_spec
1085    }
1086}
1087
1088impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
1089    fn recovered_block<H, HF, B, BF>(
1090        &self,
1091        id: BlockHashOrNumber,
1092        _transaction_kind: TransactionVariant,
1093        header_by_number: HF,
1094        construct_block: BF,
1095    ) -> ProviderResult<Option<B>>
1096    where
1097        H: AsRef<HeaderTy<N>>,
1098        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
1099        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
1100    {
1101        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
1102        let earliest_available = self.static_file_provider.earliest_history_height();
1103        if block_number < earliest_available {
1104            return Err(ProviderError::BlockExpired { requested: block_number, earliest_available })
1105        }
1106        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
1107
1108        // Get the block body
1109        //
1110        // If the body indices are not found, this means that the transactions either do not exist
1111        // in the database yet, or they do exit but are not indexed. If they exist but are not
1112        // indexed, we don't have enough information to return the block anyways, so we return
1113        // `None`.
1114        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
1115
1116        let tx_range = body.tx_num_range();
1117
1118        let transactions = if tx_range.is_empty() {
1119            vec![]
1120        } else {
1121            self.transactions_by_tx_range(tx_range.clone())?
1122        };
1123
1124        let body = self
1125            .storage
1126            .reader()
1127            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
1128            .pop()
1129            .ok_or(ProviderError::InvalidStorageOutput)?;
1130
1131        let senders = if tx_range.is_empty() {
1132            vec![]
1133        } else {
1134            let known_senders: HashMap<TxNumber, Address> =
1135                EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1136
1137            let mut senders = Vec::with_capacity(body.transactions().len());
1138            for (tx_num, tx) in tx_range.zip(body.transactions()) {
1139                match known_senders.get(&tx_num) {
1140                    None => {
1141                        let sender = tx.recover_signer_unchecked()?;
1142                        senders.push(sender);
1143                    }
1144                    Some(sender) => senders.push(*sender),
1145                }
1146            }
1147            senders
1148        };
1149
1150        construct_block(header, body, senders)
1151    }
1152
1153    /// Returns a range of blocks from the database.
1154    ///
1155    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1156    /// construct blocks from the following inputs:
1157    ///     – Header
1158    ///     - Range of transaction numbers
1159    ///     – Ommers
1160    ///     – Withdrawals
1161    ///     – Senders
1162    fn block_range<F, H, HF, R>(
1163        &self,
1164        range: RangeInclusive<BlockNumber>,
1165        headers_range: HF,
1166        mut assemble_block: F,
1167    ) -> ProviderResult<Vec<R>>
1168    where
1169        H: AsRef<HeaderTy<N>>,
1170        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1171        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
1172    {
1173        if range.is_empty() {
1174            return Ok(Vec::new())
1175        }
1176
1177        let len = range.end().saturating_sub(*range.start()) as usize + 1;
1178        let mut blocks = Vec::with_capacity(len);
1179
1180        let headers = headers_range(range.clone())?;
1181
1182        // If the body indices are not found, this means that the transactions either do
1183        // not exist in the database yet, or they do exit but are
1184        // not indexed. If they exist but are not indexed, we don't
1185        // have enough information to return the block anyways, so
1186        // we skip the block.
1187        let present_headers = self
1188            .block_body_indices_range(range)?
1189            .into_iter()
1190            .map(|b| b.tx_num_range())
1191            .zip(headers)
1192            .collect::<Vec<_>>();
1193
1194        let mut inputs = Vec::with_capacity(present_headers.len());
1195        for (tx_range, header) in &present_headers {
1196            let transactions = if tx_range.is_empty() {
1197                Vec::new()
1198            } else {
1199                self.transactions_by_tx_range(tx_range.clone())?
1200            };
1201
1202            inputs.push((header.as_ref(), transactions));
1203        }
1204
1205        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
1206
1207        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
1208            blocks.push(assemble_block(header, body, tx_range)?);
1209        }
1210
1211        Ok(blocks)
1212    }
1213
1214    /// Returns a range of blocks from the database, along with the senders of each
1215    /// transaction in the blocks.
1216    ///
1217    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
1218    /// construct blocks from the following inputs:
1219    ///     – Header
1220    ///     - Transactions
1221    ///     – Ommers
1222    ///     – Withdrawals
1223    ///     – Senders
1224    fn block_with_senders_range<H, HF, B, BF>(
1225        &self,
1226        range: RangeInclusive<BlockNumber>,
1227        headers_range: HF,
1228        assemble_block: BF,
1229    ) -> ProviderResult<Vec<B>>
1230    where
1231        H: AsRef<HeaderTy<N>>,
1232        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
1233        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
1234    {
1235        self.block_range(range, headers_range, |header, body, tx_range| {
1236            let senders = if tx_range.is_empty() {
1237                Vec::new()
1238            } else {
1239                let known_senders: HashMap<TxNumber, Address> =
1240                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
1241
1242                let mut senders = Vec::with_capacity(body.transactions().len());
1243                for (tx_num, tx) in tx_range.zip(body.transactions()) {
1244                    match known_senders.get(&tx_num) {
1245                        None => {
1246                            // recover the sender from the transaction if not found
1247                            let sender = tx.recover_signer_unchecked()?;
1248                            senders.push(sender);
1249                        }
1250                        Some(sender) => senders.push(*sender),
1251                    }
1252                }
1253
1254                senders
1255            };
1256
1257            assemble_block(header, body, senders)
1258        })
1259    }
1260
1261    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
1262    /// [`tables::PlainAccountState`] and [`tables::PlainStorageState`] tables, based on the given
1263    /// storage and account changesets.
1264    fn populate_bundle_state(
1265        &self,
1266        account_changeset: Vec<(u64, AccountBeforeTx)>,
1267        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1268        mut get_account: impl FnMut(Address) -> ProviderResult<Option<Account>>,
1269        mut get_storage: impl FnMut(Address, StorageKey) -> ProviderResult<Option<StorageValue>>,
1270    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1271        // iterate previous value and get plain state value to create changeset
1272        // Double option around Account represent if Account state is know (first option) and
1273        // account is removed (Second Option)
1274        let mut state: BundleStateInit = HashMap::default();
1275
1276        // This is not working for blocks that are not at tip. as plain state is not the last
1277        // state of end range. We should rename the functions or add support to access
1278        // History state. Accessing history state can be tricky but we are not gaining
1279        // anything.
1280
1281        let mut reverts: RevertsInit = HashMap::default();
1282
1283        // add account changeset changes
1284        for (block_number, account_before) in account_changeset.into_iter().rev() {
1285            let AccountBeforeTx { info: old_info, address } = account_before;
1286            match state.entry(address) {
1287                hash_map::Entry::Vacant(entry) => {
1288                    let new_info = get_account(address)?;
1289                    entry.insert((old_info, new_info, HashMap::default()));
1290                }
1291                hash_map::Entry::Occupied(mut entry) => {
1292                    // overwrite old account state.
1293                    entry.get_mut().0 = old_info;
1294                }
1295            }
1296            // insert old info into reverts.
1297            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
1298        }
1299
1300        // add storage changeset changes
1301        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
1302            let BlockNumberAddress((block_number, address)) = block_and_address;
1303            // get account state or insert from plain state.
1304            let account_state = match state.entry(address) {
1305                hash_map::Entry::Vacant(entry) => {
1306                    let present_info = get_account(address)?;
1307                    entry.insert((present_info, present_info, HashMap::default()))
1308                }
1309                hash_map::Entry::Occupied(entry) => entry.into_mut(),
1310            };
1311
1312            // match storage.
1313            match account_state.2.entry(old_storage.key) {
1314                hash_map::Entry::Vacant(entry) => {
1315                    let new_storage = get_storage(address, old_storage.key)?.unwrap_or_default();
1316                    entry.insert((old_storage.value, new_storage));
1317                }
1318                hash_map::Entry::Occupied(mut entry) => {
1319                    entry.get_mut().0 = old_storage.value;
1320                }
1321            };
1322
1323            reverts
1324                .entry(block_number)
1325                .or_default()
1326                .entry(address)
1327                .or_default()
1328                .1
1329                .push(old_storage);
1330        }
1331
1332        Ok((state, reverts))
1333    }
1334
1335    /// Invokes [`populate_bundle_state`](Self::populate_bundle_state) with the given plain state
1336    /// cursors.
1337    fn populate_bundle_state_plain(
1338        &self,
1339        account_changeset: Vec<(u64, AccountBeforeTx)>,
1340        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1341        plain_accounts_cursor: &mut impl DbCursorRO<tables::PlainAccountState>,
1342        plain_storage_cursor: &mut impl DbDupCursorRO<tables::PlainStorageState>,
1343    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1344        self.populate_bundle_state(
1345            account_changeset,
1346            storage_changeset,
1347            |address| Ok(plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1)),
1348            |address, storage_key| {
1349                Ok(plain_storage_cursor
1350                    .seek_by_key_subkey(address, storage_key)?
1351                    .filter(|s| s.key == storage_key)
1352                    .map(|s| s.value))
1353            },
1354        )
1355    }
1356
1357    /// Like [`populate_bundle_state`](Self::populate_bundle_state), but reads current values from
1358    /// `HashedAccounts`/`HashedStorages`. Addresses and storage keys are hashed via `keccak256`
1359    /// for DB lookups. The output `BundleStateInit`/`RevertsInit` structures remain keyed by
1360    /// plain address and plain storage key.
1361    fn populate_bundle_state_hashed(
1362        &self,
1363        account_changeset: Vec<(u64, AccountBeforeTx)>,
1364        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1365        hashed_accounts_cursor: &mut impl DbCursorRO<tables::HashedAccounts>,
1366        hashed_storage_cursor: &mut impl DbDupCursorRO<tables::HashedStorages>,
1367    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1368        self.populate_bundle_state(
1369            account_changeset,
1370            storage_changeset,
1371            |address| Ok(hashed_accounts_cursor.seek_exact(keccak256(address))?.map(|kv| kv.1)),
1372            |address, storage_key| {
1373                let hashed_storage_key = keccak256(storage_key);
1374                Ok(hashed_storage_cursor
1375                    .seek_by_key_subkey(keccak256(address), hashed_storage_key)?
1376                    .filter(|s| s.key == hashed_storage_key)
1377                    .map(|s| s.value))
1378            },
1379        )
1380    }
1381
1382    fn populate_bundle_state_with_provider(
1383        &self,
1384        account_changeset: Vec<(u64, AccountBeforeTx)>,
1385        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
1386        state_provider: impl StateProvider,
1387    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
1388        self.populate_bundle_state(
1389            account_changeset,
1390            storage_changeset,
1391            |address| state_provider.basic_account(&address),
1392            |address, storage_key| state_provider.storage(address, storage_key),
1393        )
1394    }
1395}
1396
1397impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
1398    /// Insert history index to the database.
1399    ///
1400    /// For each updated partial key, this function retrieves the last shard from the database
1401    /// (if any), appends the new indices to it, chunks the resulting list if needed, and upserts
1402    /// the shards back into the database.
1403    ///
1404    /// This function is used by history indexing stages.
1405    fn append_history_index<P, T>(
1406        &self,
1407        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
1408        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
1409    ) -> ProviderResult<()>
1410    where
1411        P: Copy,
1412        T: Table<Value = BlockNumberList>,
1413    {
1414        // This function cannot be used with DUPSORT tables because `upsert` on DUPSORT tables
1415        // will append duplicate entries instead of updating existing ones, causing data corruption.
1416        assert!(!T::DUPSORT, "append_history_index cannot be used with DUPSORT tables");
1417
1418        let mut cursor = self.tx.cursor_write::<T>()?;
1419
1420        for (partial_key, indices) in index_updates {
1421            let last_key = sharded_key_factory(partial_key, u64::MAX);
1422            let mut last_shard = cursor
1423                .seek_exact(last_key.clone())?
1424                .map(|(_, list)| list)
1425                .unwrap_or_else(BlockNumberList::empty);
1426
1427            last_shard.append(indices).map_err(ProviderError::other)?;
1428
1429            // fast path: all indices fit in one shard
1430            if last_shard.len() <= sharded_key::NUM_OF_INDICES_IN_SHARD as u64 {
1431                cursor.upsert(last_key, &last_shard)?;
1432                continue;
1433            }
1434
1435            // slow path: rechunk into multiple shards
1436            let chunks = last_shard.iter().chunks(sharded_key::NUM_OF_INDICES_IN_SHARD);
1437            let mut chunks_peekable = chunks.into_iter().peekable();
1438
1439            while let Some(chunk) = chunks_peekable.next() {
1440                let shard = BlockNumberList::new_pre_sorted(chunk);
1441                let highest_block_number = if chunks_peekable.peek().is_some() {
1442                    shard.iter().next_back().expect("`chunks` does not return empty list")
1443                } else {
1444                    // Insert last list with `u64::MAX`.
1445                    u64::MAX
1446                };
1447
1448                cursor.upsert(sharded_key_factory(partial_key, highest_block_number), &shard)?;
1449            }
1450        }
1451
1452        Ok(())
1453    }
1454}
1455
1456impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
1457    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1458        if self.cached_storage_settings().use_hashed_state() {
1459            let hashed_address = keccak256(address);
1460            Ok(self.tx.get_by_encoded_key::<tables::HashedAccounts>(&hashed_address)?)
1461        } else {
1462            Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
1463        }
1464    }
1465}
1466
1467impl<TX: DbTx + 'static, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
1468    fn changed_accounts_with_range(
1469        &self,
1470        range: RangeInclusive<BlockNumber>,
1471    ) -> ProviderResult<BTreeSet<Address>> {
1472        let mut reader = EitherReader::new_account_changesets(self)?;
1473
1474        reader.changed_accounts_with_range(range)
1475    }
1476
1477    fn basic_accounts(
1478        &self,
1479        iter: impl IntoIterator<Item = Address>,
1480    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
1481        if self.cached_storage_settings().use_hashed_state() {
1482            let mut hashed_accounts = self.tx.cursor_read::<tables::HashedAccounts>()?;
1483            Ok(iter
1484                .into_iter()
1485                .map(|address| {
1486                    let hashed_address = keccak256(address);
1487                    hashed_accounts.seek_exact(hashed_address).map(|a| (address, a.map(|(_, v)| v)))
1488                })
1489                .collect::<Result<Vec<_>, _>>()?)
1490        } else {
1491            let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
1492            Ok(iter
1493                .into_iter()
1494                .map(|address| {
1495                    plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
1496                })
1497                .collect::<Result<Vec<_>, _>>()?)
1498        }
1499    }
1500
1501    fn changed_accounts_and_blocks_with_range(
1502        &self,
1503        range: RangeInclusive<BlockNumber>,
1504    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
1505        let highest_static_block = self
1506            .static_file_provider
1507            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
1508
1509        if let Some(highest) = highest_static_block &&
1510            self.cached_storage_settings().storage_v2
1511        {
1512            let start = *range.start();
1513            let static_end = (*range.end()).min(highest);
1514
1515            let mut changed_accounts_and_blocks: BTreeMap<_, Vec<u64>> = BTreeMap::default();
1516            if start <= static_end {
1517                for block in start..=static_end {
1518                    let block_changesets = self.account_block_changeset(block)?;
1519                    for changeset in block_changesets {
1520                        changed_accounts_and_blocks
1521                            .entry(changeset.address)
1522                            .or_default()
1523                            .push(block);
1524                    }
1525                }
1526            }
1527
1528            Ok(changed_accounts_and_blocks)
1529        } else {
1530            let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
1531
1532            let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
1533                BTreeMap::new(),
1534                |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
1535                    let (index, account) = entry?;
1536                    accounts.entry(account.address).or_default().push(index);
1537                    Ok(accounts)
1538                },
1539            )?;
1540
1541            Ok(account_transitions)
1542        }
1543    }
1544}
1545
1546impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
1547    fn storage_changeset(
1548        &self,
1549        block_number: BlockNumber,
1550    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1551        if self.cached_storage_settings().storage_v2 {
1552            self.static_file_provider.storage_changeset(block_number)
1553        } else {
1554            let range = block_number..=block_number;
1555            let storage_range = BlockNumberAddress::range(range);
1556            self.tx
1557                .cursor_dup_read::<tables::StorageChangeSets>()?
1558                .walk_range(storage_range)?
1559                .map(|r| {
1560                    let (bna, entry) = r?;
1561                    Ok((bna, entry))
1562                })
1563                .collect()
1564        }
1565    }
1566
1567    fn get_storage_before_block(
1568        &self,
1569        block_number: BlockNumber,
1570        address: Address,
1571        storage_key: B256,
1572    ) -> ProviderResult<Option<StorageEntry>> {
1573        if self.cached_storage_settings().storage_v2 {
1574            self.static_file_provider.get_storage_before_block(block_number, address, storage_key)
1575        } else {
1576            Ok(self
1577                .tx
1578                .cursor_dup_read::<tables::StorageChangeSets>()?
1579                .seek_by_key_subkey(BlockNumberAddress((block_number, address)), storage_key)?
1580                .filter(|entry| entry.key == storage_key))
1581        }
1582    }
1583
1584    fn storage_changesets_range(
1585        &self,
1586        range: impl RangeBounds<BlockNumber>,
1587    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1588        if self.cached_storage_settings().storage_v2 {
1589            self.static_file_provider.storage_changesets_range(range)
1590        } else {
1591            self.tx
1592                .cursor_dup_read::<tables::StorageChangeSets>()?
1593                .walk_range(BlockNumberAddressRange::from(range))?
1594                .map(|r| {
1595                    let (bna, entry) = r?;
1596                    Ok((bna, entry))
1597                })
1598                .collect()
1599        }
1600    }
1601}
1602
1603impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
1604    fn account_block_changeset(
1605        &self,
1606        block_number: BlockNumber,
1607    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1608        if self.cached_storage_settings().storage_v2 {
1609            let static_changesets =
1610                self.static_file_provider.account_block_changeset(block_number)?;
1611            Ok(static_changesets)
1612        } else {
1613            let range = block_number..=block_number;
1614            self.tx
1615                .cursor_read::<tables::AccountChangeSets>()?
1616                .walk_range(range)?
1617                .map(|result| -> ProviderResult<_> {
1618                    let (_, account_before) = result?;
1619                    Ok(account_before)
1620                })
1621                .collect()
1622        }
1623    }
1624
1625    fn get_account_before_block(
1626        &self,
1627        block_number: BlockNumber,
1628        address: Address,
1629    ) -> ProviderResult<Option<AccountBeforeTx>> {
1630        if self.cached_storage_settings().storage_v2 {
1631            Ok(self.static_file_provider.get_account_before_block(block_number, address)?)
1632        } else {
1633            self.tx
1634                .cursor_dup_read::<tables::AccountChangeSets>()?
1635                .seek_by_key_subkey(block_number, address)?
1636                .filter(|acc| acc.address == address)
1637                .map(Ok)
1638                .transpose()
1639        }
1640    }
1641
1642    fn account_changesets_range(
1643        &self,
1644        range: impl core::ops::RangeBounds<BlockNumber>,
1645    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1646        if self.cached_storage_settings().storage_v2 {
1647            self.static_file_provider.account_changesets_range(range)
1648        } else {
1649            self.tx
1650                .cursor_read::<tables::AccountChangeSets>()?
1651                .walk_range(to_range(range))?
1652                .map(|r| r.map_err(Into::into))
1653                .collect()
1654        }
1655    }
1656}
1657
1658impl<Tx: DbTx + 'static, N: NodeTypesForProvider> StateReader for DatabaseProvider<Tx, N> {
1659    type Receipt = ReceiptTy<N>;
1660
1661    fn get_state(
1662        &self,
1663        block: BlockNumber,
1664    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1665        let Some(block_body) = self.block_body_indices(block)? else { return Ok(None) };
1666
1667        let from_transaction_num = block_body.first_tx_num();
1668        let to_transaction_num = block_body.last_tx_num();
1669
1670        let account_changeset = self.account_changesets_range(block..=block)?;
1671        let storage_changeset = self.storage_changeset(block)?;
1672
1673        let Some(block_hash) = self.block_hash(block)? else { return Ok(None) };
1674        let state_provider = self.history_by_block_hash(block_hash)?;
1675        let (state, reverts) = self.populate_bundle_state_with_provider(
1676            account_changeset,
1677            storage_changeset,
1678            state_provider,
1679        )?;
1680
1681        let receipts = self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?;
1682
1683        Ok(Some(ExecutionOutcome::new_init(
1684            state,
1685            reverts,
1686            // We skip new contracts since we never delete them from the database
1687            Vec::new(),
1688            vec![receipts],
1689            block,
1690            Vec::new(),
1691        )))
1692    }
1693}
1694
1695impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
1696    for DatabaseProvider<TX, N>
1697{
1698    type Header = HeaderTy<N>;
1699
1700    fn local_tip_header(
1701        &self,
1702        highest_uninterrupted_block: BlockNumber,
1703    ) -> ProviderResult<SealedHeader<Self::Header>> {
1704        let static_file_provider = self.static_file_provider();
1705
1706        // Make sure Headers static file is at the same height. If it's further, this
1707        // input execution was interrupted previously and we need to unwind the static file.
1708        let next_static_file_block_num = static_file_provider
1709            .get_highest_static_file_block(StaticFileSegment::Headers)
1710            .map(|id| id + 1)
1711            .unwrap_or_default();
1712        let next_block = highest_uninterrupted_block + 1;
1713
1714        match next_static_file_block_num.cmp(&next_block) {
1715            // The node shutdown between an executed static file commit and before the database
1716            // commit, so we need to unwind the static files.
1717            Ordering::Greater => {
1718                let mut static_file_producer =
1719                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
1720                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
1721                // Since this is a database <-> static file inconsistency, we commit the change
1722                // straight away.
1723                static_file_producer.commit()?
1724            }
1725            Ordering::Less => {
1726                // There's either missing or corrupted files.
1727                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
1728            }
1729            Ordering::Equal => {}
1730        }
1731
1732        let local_head = static_file_provider
1733            .sealed_header(highest_uninterrupted_block)?
1734            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
1735
1736        Ok(local_head)
1737    }
1738}
1739
1740impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1741    type Header = HeaderTy<N>;
1742
1743    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1744        if let Some(num) = self.block_number(block_hash)? {
1745            Ok(self.header_by_number(num)?)
1746        } else {
1747            Ok(None)
1748        }
1749    }
1750
1751    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1752        self.static_file_provider.header_by_number(num)
1753    }
1754
1755    fn headers_range(
1756        &self,
1757        range: impl RangeBounds<BlockNumber>,
1758    ) -> ProviderResult<Vec<Self::Header>> {
1759        self.static_file_provider.headers_range(range)
1760    }
1761
1762    fn sealed_header(
1763        &self,
1764        number: BlockNumber,
1765    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1766        self.static_file_provider.sealed_header(number)
1767    }
1768
1769    fn sealed_headers_while(
1770        &self,
1771        range: impl RangeBounds<BlockNumber>,
1772        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1773    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1774        self.static_file_provider.sealed_headers_while(range, predicate)
1775    }
1776}
1777
1778impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1779    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1780        self.static_file_provider.block_hash(number)
1781    }
1782
1783    fn canonical_hashes_range(
1784        &self,
1785        start: BlockNumber,
1786        end: BlockNumber,
1787    ) -> ProviderResult<Vec<B256>> {
1788        self.static_file_provider.canonical_hashes_range(start, end)
1789    }
1790}
1791
1792impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1793    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1794        let best_number = self.best_block_number()?;
1795        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1796        Ok(ChainInfo { best_hash, best_number })
1797    }
1798
1799    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1800        // The best block number is tracked via the finished stage which gets updated in the same tx
1801        // when new blocks committed
1802        Ok(self
1803            .get_stage_checkpoint(StageId::Finish)?
1804            .map(|checkpoint| checkpoint.block_number)
1805            .unwrap_or_default())
1806    }
1807
1808    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1809        self.static_file_provider.last_block_number()
1810    }
1811
1812    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1813        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1814    }
1815}
1816
1817impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1818    type Block = BlockTy<N>;
1819
1820    fn find_block_by_hash(
1821        &self,
1822        hash: B256,
1823        source: BlockSource,
1824    ) -> ProviderResult<Option<Self::Block>> {
1825        if source.is_canonical() {
1826            self.block(hash.into())
1827        } else {
1828            Ok(None)
1829        }
1830    }
1831
1832    /// Returns the block with matching number from database.
1833    ///
1834    /// If the header for this block is not found, this returns `None`.
1835    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1836    /// will return None.
1837    ///
1838    /// Returns an error if the requested block is below the earliest available history.
1839    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1840        if let Some(number) = self.convert_hash_or_number(id)? {
1841            let earliest_available = self.static_file_provider.earliest_history_height();
1842            if number < earliest_available {
1843                return Err(ProviderError::BlockExpired { requested: number, earliest_available })
1844            }
1845
1846            let Some(header) = self.header_by_number(number)? else { return Ok(None) };
1847
1848            // If the body indices are not found, this means that the transactions either do not
1849            // exist in the database yet, or they do exit but are not indexed.
1850            // If they exist but are not indexed, we don't have enough
1851            // information to return the block anyways, so we return `None`.
1852            let Some(transactions) = self.transactions_by_block(number.into())? else {
1853                return Ok(None)
1854            };
1855
1856            let body = self
1857                .storage
1858                .reader()
1859                .read_block_bodies(self, vec![(&header, transactions)])?
1860                .pop()
1861                .ok_or(ProviderError::InvalidStorageOutput)?;
1862
1863            return Ok(Some(Self::Block::new(header, body)))
1864        }
1865
1866        Ok(None)
1867    }
1868
1869    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1870        Ok(None)
1871    }
1872
1873    fn pending_block_and_receipts(
1874        &self,
1875    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1876        Ok(None)
1877    }
1878
1879    /// Returns the block with senders with matching number or hash from database.
1880    ///
1881    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1882    /// spot, and we want fast querying.**
1883    ///
1884    /// If the header for this block is not found, this returns `None`.
1885    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1886    /// will return None.
1887    fn recovered_block(
1888        &self,
1889        id: BlockHashOrNumber,
1890        transaction_kind: TransactionVariant,
1891    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1892        self.recovered_block(
1893            id,
1894            transaction_kind,
1895            |block_number| self.header_by_number(block_number),
1896            |header, body, senders| {
1897                Self::Block::new(header, body)
1898                    // Note: we're using unchecked here because we know the block contains valid txs
1899                    // wrt to its height and can ignore the s value check so pre
1900                    // EIP-2 txs are allowed
1901                    .try_into_recovered_unchecked(senders)
1902                    .map(Some)
1903                    .map_err(|_| ProviderError::SenderRecoveryError)
1904            },
1905        )
1906    }
1907
1908    fn sealed_block_with_senders(
1909        &self,
1910        id: BlockHashOrNumber,
1911        transaction_kind: TransactionVariant,
1912    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1913        self.recovered_block(
1914            id,
1915            transaction_kind,
1916            |block_number| self.sealed_header(block_number),
1917            |header, body, senders| {
1918                Self::Block::new_sealed(header, body)
1919                    // Note: we're using unchecked here because we know the block contains valid txs
1920                    // wrt to its height and can ignore the s value check so pre
1921                    // EIP-2 txs are allowed
1922                    .try_with_senders_unchecked(senders)
1923                    .map(Some)
1924                    .map_err(|_| ProviderError::SenderRecoveryError)
1925            },
1926        )
1927    }
1928
1929    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1930        self.block_range(
1931            range,
1932            |range| self.headers_range(range),
1933            |header, body, _| Ok(Self::Block::new(header, body)),
1934        )
1935    }
1936
1937    fn block_with_senders_range(
1938        &self,
1939        range: RangeInclusive<BlockNumber>,
1940    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1941        self.block_with_senders_range(
1942            range,
1943            |range| self.headers_range(range),
1944            |header, body, senders| {
1945                Self::Block::new(header, body)
1946                    .try_into_recovered_unchecked(senders)
1947                    .map_err(|_| ProviderError::SenderRecoveryError)
1948            },
1949        )
1950    }
1951
1952    fn recovered_block_range(
1953        &self,
1954        range: RangeInclusive<BlockNumber>,
1955    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1956        self.block_with_senders_range(
1957            range,
1958            |range| self.sealed_headers_range(range),
1959            |header, body, senders| {
1960                Self::Block::new_sealed(header, body)
1961                    .try_with_senders(senders)
1962                    .map_err(|_| ProviderError::SenderRecoveryError)
1963            },
1964        )
1965    }
1966
1967    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1968        Ok(self
1969            .tx
1970            .cursor_read::<tables::TransactionBlocks>()?
1971            .seek(id)
1972            .map(|b| b.map(|(_, bn)| bn))?)
1973    }
1974}
1975
1976impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1977    for DatabaseProvider<TX, N>
1978{
1979    /// Recovers transaction hashes by walking through `Transactions` table and
1980    /// calculating them in a parallel manner. Returned unsorted.
1981    fn transaction_hashes_by_range(
1982        &self,
1983        tx_range: Range<TxNumber>,
1984    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1985        self.static_file_provider.transaction_hashes_by_range(tx_range)
1986    }
1987}
1988
1989// Calculates the hash of the given transaction
1990impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1991    type Transaction = TxTy<N>;
1992
1993    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1994        self.with_rocksdb_snapshot(|rocksdb_ref| {
1995            let mut reader = EitherReader::new_transaction_hash_numbers(self, rocksdb_ref)?;
1996            reader.get_transaction_hash_number(tx_hash)
1997        })
1998    }
1999
2000    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2001        self.static_file_provider.transaction_by_id(id)
2002    }
2003
2004    fn transaction_by_id_unhashed(
2005        &self,
2006        id: TxNumber,
2007    ) -> ProviderResult<Option<Self::Transaction>> {
2008        self.static_file_provider.transaction_by_id_unhashed(id)
2009    }
2010
2011    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2012        if let Some(id) = self.transaction_id(hash)? {
2013            Ok(self.transaction_by_id_unhashed(id)?)
2014        } else {
2015            Ok(None)
2016        }
2017    }
2018
2019    fn transaction_by_hash_with_meta(
2020        &self,
2021        tx_hash: TxHash,
2022    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2023        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
2024            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
2025            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
2026            let Some(sealed_header) = self.sealed_header(block_number)?
2027        {
2028            let (header, block_hash) = sealed_header.split();
2029            if let Some(block_body) = self.block_body_indices(block_number)? {
2030                // the index of the tx in the block is the offset:
2031                // len([start..tx_id])
2032                // NOTE: `transaction_id` is always `>=` the block's first
2033                // index
2034                let index = transaction_id - block_body.first_tx_num();
2035
2036                let meta = TransactionMeta {
2037                    tx_hash,
2038                    index,
2039                    block_hash,
2040                    block_number,
2041                    base_fee: header.base_fee_per_gas(),
2042                    excess_blob_gas: header.excess_blob_gas(),
2043                    timestamp: header.timestamp(),
2044                };
2045
2046                return Ok(Some((transaction, meta)))
2047            }
2048        }
2049
2050        Ok(None)
2051    }
2052
2053    fn transactions_by_block(
2054        &self,
2055        id: BlockHashOrNumber,
2056    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2057        if let Some(block_number) = self.convert_hash_or_number(id)? &&
2058            let Some(body) = self.block_body_indices(block_number)?
2059        {
2060            let tx_range = body.tx_num_range();
2061            return if tx_range.is_empty() {
2062                Ok(Some(Vec::new()))
2063            } else {
2064                self.transactions_by_tx_range(tx_range).map(Some)
2065            }
2066        }
2067        Ok(None)
2068    }
2069
2070    fn transactions_by_block_range(
2071        &self,
2072        range: impl RangeBounds<BlockNumber>,
2073    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2074        let range = to_range(range);
2075
2076        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
2077            .into_iter()
2078            .map(|body| {
2079                let tx_num_range = body.tx_num_range();
2080                if tx_num_range.is_empty() {
2081                    Ok(Vec::new())
2082                } else {
2083                    self.transactions_by_tx_range(tx_num_range)
2084                }
2085            })
2086            .collect()
2087    }
2088
2089    fn transactions_by_tx_range(
2090        &self,
2091        range: impl RangeBounds<TxNumber>,
2092    ) -> ProviderResult<Vec<Self::Transaction>> {
2093        self.static_file_provider.transactions_by_tx_range(range)
2094    }
2095
2096    fn senders_by_tx_range(
2097        &self,
2098        range: impl RangeBounds<TxNumber>,
2099    ) -> ProviderResult<Vec<Address>> {
2100        if EitherWriterDestination::senders(self).is_static_file() {
2101            self.static_file_provider.senders_by_tx_range(range)
2102        } else {
2103            self.cursor_read_collect::<tables::TransactionSenders>(range)
2104        }
2105    }
2106
2107    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2108        if EitherWriterDestination::senders(self).is_static_file() {
2109            self.static_file_provider.transaction_sender(id)
2110        } else {
2111            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
2112        }
2113    }
2114}
2115
2116impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
2117    type Receipt = ReceiptTy<N>;
2118
2119    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2120        self.static_file_provider.get_with_static_file_or_database(
2121            StaticFileSegment::Receipts,
2122            id,
2123            |static_file| static_file.receipt(id),
2124            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
2125        )
2126    }
2127
2128    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2129        if let Some(id) = self.transaction_id(hash)? {
2130            self.receipt(id)
2131        } else {
2132            Ok(None)
2133        }
2134    }
2135
2136    fn receipts_by_block(
2137        &self,
2138        block: BlockHashOrNumber,
2139    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2140        if let Some(number) = self.convert_hash_or_number(block)? &&
2141            let Some(body) = self.block_body_indices(number)?
2142        {
2143            let tx_range = body.tx_num_range();
2144            return if tx_range.is_empty() {
2145                Ok(Some(Vec::new()))
2146            } else {
2147                let receipts = self.receipts_by_tx_range(tx_range)?;
2148
2149                if receipts.len() != body.tx_count as usize {
2150                    return Ok(None)
2151                }
2152
2153                Ok(Some(receipts))
2154            }
2155        }
2156        Ok(None)
2157    }
2158
2159    fn receipts_by_tx_range(
2160        &self,
2161        range: impl RangeBounds<TxNumber>,
2162    ) -> ProviderResult<Vec<Self::Receipt>> {
2163        self.static_file_provider.get_range_with_static_file_or_database(
2164            StaticFileSegment::Receipts,
2165            to_range(range),
2166            |static_file, range, _| static_file.receipts_by_tx_range(range),
2167            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
2168            |_| true,
2169        )
2170    }
2171
2172    fn receipts_by_block_range(
2173        &self,
2174        block_range: RangeInclusive<BlockNumber>,
2175    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2176        if block_range.is_empty() {
2177            return Ok(Vec::new());
2178        }
2179
2180        // collect block body indices for each block in the range
2181        let range_len = block_range.end().saturating_sub(*block_range.start()) as usize + 1;
2182        let mut block_body_indices = Vec::with_capacity(range_len);
2183        for block_num in block_range {
2184            if let Some(indices) = self.block_body_indices(block_num)? {
2185                block_body_indices.push(indices);
2186            } else {
2187                // use default indices for missing blocks (empty block)
2188                block_body_indices.push(StoredBlockBodyIndices::default());
2189            }
2190        }
2191
2192        if block_body_indices.is_empty() {
2193            return Ok(Vec::new());
2194        }
2195
2196        // find blocks with transactions to determine transaction range
2197        let non_empty_blocks: Vec<_> =
2198            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
2199
2200        if non_empty_blocks.is_empty() {
2201            // all blocks are empty
2202            return Ok(vec![Vec::new(); block_body_indices.len()]);
2203        }
2204
2205        // calculate the overall transaction range
2206        let first_tx = non_empty_blocks[0].first_tx_num();
2207        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
2208
2209        // fetch all receipts in the transaction range
2210        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
2211        let mut receipts_iter = all_receipts.into_iter();
2212
2213        // distribute receipts to their respective blocks
2214        let mut result = Vec::with_capacity(block_body_indices.len());
2215        for indices in &block_body_indices {
2216            if indices.tx_count == 0 {
2217                result.push(Vec::new());
2218            } else {
2219                let block_receipts =
2220                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
2221                result.push(block_receipts);
2222            }
2223        }
2224
2225        Ok(result)
2226    }
2227}
2228
2229impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
2230    for DatabaseProvider<TX, N>
2231{
2232    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2233        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
2234    }
2235
2236    fn block_body_indices_range(
2237        &self,
2238        range: RangeInclusive<BlockNumber>,
2239    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2240        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
2241    }
2242}
2243
2244impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
2245    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
2246        Ok(if let Some(encoded) = id.get_pre_encoded() {
2247            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
2248        } else {
2249            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
2250        })
2251    }
2252
2253    /// Get stage checkpoint progress.
2254    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
2255        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
2256    }
2257
2258    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
2259        self.tx
2260            .cursor_read::<tables::StageCheckpoints>()?
2261            .walk(None)?
2262            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
2263            .map_err(ProviderError::Database)
2264    }
2265}
2266
2267impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
2268    /// Save stage checkpoint.
2269    fn save_stage_checkpoint(
2270        &self,
2271        id: StageId,
2272        checkpoint: StageCheckpoint,
2273    ) -> ProviderResult<()> {
2274        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
2275    }
2276
2277    /// Save stage checkpoint progress.
2278    fn save_stage_checkpoint_progress(
2279        &self,
2280        id: StageId,
2281        checkpoint: Vec<u8>,
2282    ) -> ProviderResult<()> {
2283        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
2284    }
2285
2286    #[instrument(level = "debug", target = "providers::db", skip_all)]
2287    fn update_pipeline_stages(
2288        &self,
2289        block_number: BlockNumber,
2290        drop_stage_checkpoint: bool,
2291    ) -> ProviderResult<()> {
2292        // iterate over all existing stages in the table and update its progress.
2293        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
2294        for stage_id in StageId::ALL {
2295            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
2296            cursor.upsert(
2297                stage_id.to_string(),
2298                &StageCheckpoint {
2299                    block_number,
2300                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
2301                },
2302            )?;
2303        }
2304
2305        Ok(())
2306    }
2307}
2308
2309impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
2310    fn plain_state_storages(
2311        &self,
2312        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
2313    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
2314        if self.cached_storage_settings().use_hashed_state() {
2315            let mut hashed_storage = self.tx.cursor_dup_read::<tables::HashedStorages>()?;
2316
2317            addresses_with_keys
2318                .into_iter()
2319                .map(|(address, storage)| {
2320                    let hashed_address = keccak256(address);
2321                    storage
2322                        .into_iter()
2323                        .map(|key| -> ProviderResult<_> {
2324                            let hashed_key = keccak256(key);
2325                            let value = hashed_storage
2326                                .seek_by_key_subkey(hashed_address, hashed_key)?
2327                                .filter(|v| v.key == hashed_key)
2328                                .map(|v| v.value)
2329                                .unwrap_or_default();
2330                            Ok(StorageEntry { key, value })
2331                        })
2332                        .collect::<ProviderResult<Vec<_>>>()
2333                        .map(|storage| (address, storage))
2334                })
2335                .collect::<ProviderResult<Vec<(_, _)>>>()
2336        } else {
2337            let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
2338
2339            addresses_with_keys
2340                .into_iter()
2341                .map(|(address, storage)| {
2342                    storage
2343                        .into_iter()
2344                        .map(|key| -> ProviderResult<_> {
2345                            Ok(plain_storage
2346                                .seek_by_key_subkey(address, key)?
2347                                .filter(|v| v.key == key)
2348                                .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
2349                        })
2350                        .collect::<ProviderResult<Vec<_>>>()
2351                        .map(|storage| (address, storage))
2352                })
2353                .collect::<ProviderResult<Vec<(_, _)>>>()
2354        }
2355    }
2356
2357    fn changed_storages_with_range(
2358        &self,
2359        range: RangeInclusive<BlockNumber>,
2360    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
2361        if self.cached_storage_settings().storage_v2 {
2362            self.storage_changesets_range(range)?.into_iter().try_fold(
2363                BTreeMap::new(),
2364                |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2365                    let (BlockNumberAddress((_, address)), storage_entry) = entry;
2366                    accounts.entry(address).or_default().insert(storage_entry.key);
2367                    Ok(accounts)
2368                },
2369            )
2370        } else {
2371            self.tx
2372                .cursor_read::<tables::StorageChangeSets>()?
2373                .walk_range(BlockNumberAddress::range(range))?
2374                // fold all storages and save its old state so we can remove it from HashedStorage
2375                // it is needed as it is dup table.
2376                .try_fold(
2377                    BTreeMap::new(),
2378                    |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
2379                        let (BlockNumberAddress((_, address)), storage_entry) = entry?;
2380                        accounts.entry(address).or_default().insert(storage_entry.key);
2381                        Ok(accounts)
2382                    },
2383                )
2384        }
2385    }
2386
2387    fn changed_storages_and_blocks_with_range(
2388        &self,
2389        range: RangeInclusive<BlockNumber>,
2390    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
2391        if self.cached_storage_settings().storage_v2 {
2392            self.storage_changesets_range(range)?.into_iter().try_fold(
2393                BTreeMap::new(),
2394                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, (index, storage)| {
2395                    storages
2396                        .entry((index.address(), storage.key))
2397                        .or_default()
2398                        .push(index.block_number());
2399                    Ok(storages)
2400                },
2401            )
2402        } else {
2403            let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
2404
2405            let storage_changeset_lists =
2406                changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
2407                    BTreeMap::new(),
2408                    |mut storages: BTreeMap<(Address, B256), Vec<u64>>,
2409                     entry|
2410                     -> ProviderResult<_> {
2411                        let (index, storage) = entry?;
2412                        storages
2413                            .entry((index.address(), storage.key))
2414                            .or_default()
2415                            .push(index.block_number());
2416                        Ok(storages)
2417                    },
2418                )?;
2419
2420            Ok(storage_changeset_lists)
2421        }
2422    }
2423}
2424
2425impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
2426    for DatabaseProvider<TX, N>
2427{
2428    type Receipt = ReceiptTy<N>;
2429
2430    #[instrument(level = "debug", target = "providers::db", skip_all)]
2431    fn write_state<'a>(
2432        &self,
2433        execution_outcome: impl Into<WriteStateInput<'a, Self::Receipt>>,
2434        is_value_known: OriginalValuesKnown,
2435        config: StateWriteConfig,
2436    ) -> ProviderResult<()> {
2437        let execution_outcome = execution_outcome.into();
2438
2439        if self.cached_storage_settings().use_hashed_state() &&
2440            !config.write_receipts &&
2441            !config.write_account_changesets &&
2442            !config.write_storage_changesets
2443        {
2444            // In storage v2 with all outputs directed to static files, plain state and changesets
2445            // are written elsewhere. Only bytecodes need MDBX writes, so skip the expensive
2446            // to_plain_state_and_reverts conversion that iterates all accounts and storage.
2447            self.write_bytecodes(
2448                execution_outcome.state().contracts.iter().map(|(h, b)| (*h, Bytecode(b.clone()))),
2449            )?;
2450            return Ok(());
2451        }
2452
2453        let first_block = execution_outcome.first_block();
2454        let (plain_state, reverts) =
2455            execution_outcome.state().to_plain_state_and_reverts(is_value_known);
2456
2457        self.write_state_reverts(reverts, first_block, config)?;
2458        self.write_state_changes(plain_state)?;
2459
2460        if !config.write_receipts {
2461            return Ok(());
2462        }
2463
2464        let block_count = execution_outcome.len() as u64;
2465        let last_block = execution_outcome.last_block();
2466        let block_range = first_block..=last_block;
2467
2468        let tip = self.last_block_number()?.max(last_block);
2469
2470        // Fetch the first transaction number for each block in the range
2471        let block_indices: Vec<_> = self
2472            .block_body_indices_range(block_range)?
2473            .into_iter()
2474            .map(|b| b.first_tx_num)
2475            .collect();
2476
2477        // Ensure all expected blocks are present.
2478        if block_indices.len() < block_count as usize {
2479            let missing_blocks = block_count - block_indices.len() as u64;
2480            return Err(ProviderError::BlockBodyIndicesNotFound(
2481                last_block.saturating_sub(missing_blocks - 1),
2482            ));
2483        }
2484
2485        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
2486
2487        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
2488        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
2489
2490        // All receipts from the last 128 blocks are required for blockchain tree, even with
2491        // [`PruneSegment::ContractLogs`].
2492        //
2493        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
2494        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
2495        // files yet. Once receipts exist in static files, we must continue writing to maintain
2496        // continuity and have no gaps.
2497        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
2498            self.static_file_provider()
2499                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2500                .is_none()) &&
2501            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
2502
2503        // Prepare set of addresses which logs should not be pruned.
2504        let mut allowed_addresses: AddressSet = AddressSet::default();
2505        for (_, addresses) in contract_log_pruner.range(..first_block) {
2506            allowed_addresses.extend(addresses.iter().copied());
2507        }
2508
2509        for (idx, (receipts, first_tx_index)) in
2510            execution_outcome.receipts().zip(block_indices).enumerate()
2511        {
2512            let block_number = first_block + idx as u64;
2513
2514            // Increment block number for receipts static file writer
2515            receipts_writer.increment_block(block_number)?;
2516
2517            // Skip writing receipts if pruning configuration requires us to.
2518            if prunable_receipts &&
2519                self.prune_modes
2520                    .receipts
2521                    .is_some_and(|mode| mode.should_prune(block_number, tip))
2522            {
2523                continue
2524            }
2525
2526            // If there are new addresses to retain after this block number, track them
2527            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
2528                allowed_addresses.extend(new_addresses.iter().copied());
2529            }
2530
2531            for (idx, receipt) in receipts.iter().enumerate() {
2532                let receipt_idx = first_tx_index + idx as u64;
2533                // Skip writing receipt if log filter is active and it does not have any logs to
2534                // retain
2535                if prunable_receipts &&
2536                    has_contract_log_filter &&
2537                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
2538                {
2539                    continue
2540                }
2541
2542                receipts_writer.append_receipt(receipt_idx, receipt)?;
2543            }
2544        }
2545
2546        Ok(())
2547    }
2548
2549    fn write_state_reverts(
2550        &self,
2551        reverts: PlainStateReverts,
2552        first_block: BlockNumber,
2553        config: StateWriteConfig,
2554    ) -> ProviderResult<()> {
2555        // Write storage changes
2556        if config.write_storage_changesets {
2557            tracing::trace!("Writing storage changes");
2558            let mut storages_cursor =
2559                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2560            for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
2561                let block_number = first_block + block_index as BlockNumber;
2562
2563                tracing::trace!(block_number, "Writing block change");
2564                // sort changes by address.
2565                storage_changes.par_sort_unstable_by_key(|a| a.address);
2566                let total_changes =
2567                    storage_changes.iter().map(|change| change.storage_revert.len()).sum();
2568                let mut changeset = Vec::with_capacity(total_changes);
2569                for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
2570                    let mut storage = storage_revert
2571                        .into_iter()
2572                        .map(|(k, v)| (B256::from(k.to_be_bytes()), v))
2573                        .collect::<Vec<_>>();
2574                    // sort storage slots by key.
2575                    storage.par_sort_unstable_by_key(|a| a.0);
2576
2577                    // If we are writing the primary storage wipe transition, the pre-existing
2578                    // storage state has to be taken from the database and written to storage
2579                    // history. See [StorageWipe::Primary] for more details.
2580                    //
2581                    // TODO(mediocregopher): This could be rewritten in a way which doesn't
2582                    // require collecting wiped entries into a Vec like this, see
2583                    // `write_storage_trie_changesets`.
2584                    let mut wiped_storage = Vec::new();
2585                    if wiped {
2586                        tracing::trace!(?address, "Wiping storage");
2587                        if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
2588                            wiped_storage.push((entry.key, entry.value));
2589                            while let Some(entry) = storages_cursor.next_dup_val()? {
2590                                wiped_storage.push((entry.key, entry.value))
2591                            }
2592                        }
2593                    }
2594
2595                    tracing::trace!(?address, ?storage, "Writing storage reverts");
2596                    for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
2597                        changeset.push(StorageBeforeTx { address, key, value });
2598                    }
2599                }
2600
2601                let mut storage_changesets_writer =
2602                    EitherWriter::new_storage_changesets(self, block_number)?;
2603                storage_changesets_writer.append_storage_changeset(block_number, changeset)?;
2604            }
2605        }
2606
2607        if !config.write_account_changesets {
2608            return Ok(());
2609        }
2610
2611        // Write account changes
2612        tracing::trace!(?first_block, "Writing account changes");
2613        for (block_index, account_block_reverts) in reverts.accounts.into_iter().enumerate() {
2614            let block_number = first_block + block_index as BlockNumber;
2615            let changeset = account_block_reverts
2616                .into_iter()
2617                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
2618                .collect::<Vec<_>>();
2619            let mut account_changesets_writer =
2620                EitherWriter::new_account_changesets(self, block_number)?;
2621
2622            account_changesets_writer.append_account_changeset(block_number, changeset)?;
2623        }
2624
2625        Ok(())
2626    }
2627
2628    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
2629        // sort all entries so they can be written to database in more performant way.
2630        // and take smaller memory footprint.
2631        changes.accounts.par_sort_by_key(|a| a.0);
2632        changes.storage.par_sort_by_key(|a| a.address);
2633        changes.contracts.par_sort_by_key(|a| a.0);
2634
2635        if !self.cached_storage_settings().use_hashed_state() {
2636            // Write new account state
2637            tracing::trace!(len = changes.accounts.len(), "Writing new account state");
2638            let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
2639            // write account to database.
2640            for (address, account) in changes.accounts {
2641                if let Some(account) = account {
2642                    tracing::trace!(?address, "Updating plain state account");
2643                    accounts_cursor.upsert(address, &account.into())?;
2644                } else if accounts_cursor.seek_exact(address)?.is_some() {
2645                    tracing::trace!(?address, "Deleting plain state account");
2646                    accounts_cursor.delete_current()?;
2647                }
2648            }
2649
2650            // Write new storage state and wipe storage if needed.
2651            tracing::trace!(len = changes.storage.len(), "Writing new storage state");
2652            let mut storages_cursor =
2653                self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
2654            for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2655                // Wiping of storage.
2656                if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2657                    storages_cursor.delete_current_duplicates()?;
2658                }
2659                // cast storages to B256.
2660                let mut storage = storage
2661                    .into_iter()
2662                    .map(|(k, value)| StorageEntry { key: k.into(), value })
2663                    .collect::<Vec<_>>();
2664                // sort storage slots by key.
2665                storage.par_sort_unstable_by_key(|a| a.key);
2666
2667                for entry in storage {
2668                    tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2669                    if let Some(db_entry) =
2670                        storages_cursor.seek_by_key_subkey(address, entry.key)? &&
2671                        db_entry.key == entry.key
2672                    {
2673                        storages_cursor.delete_current()?;
2674                    }
2675
2676                    if !entry.value.is_zero() {
2677                        storages_cursor.upsert(address, &entry)?;
2678                    }
2679                }
2680            }
2681        }
2682
2683        // Write bytecode
2684        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
2685        self.write_bytecodes(
2686            changes.contracts.into_iter().map(|(hash, bytecode)| (hash, Bytecode(bytecode))),
2687        )?;
2688
2689        Ok(())
2690    }
2691
2692    #[instrument(level = "debug", target = "providers::db", skip_all)]
2693    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2694        // Write hashed account updates.
2695        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2696        for (hashed_address, account) in hashed_state.accounts() {
2697            if let Some(account) = account {
2698                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2699            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2700                hashed_accounts_cursor.delete_current()?;
2701            }
2702        }
2703
2704        // Write hashed storage changes.
2705        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2706        let mut hashed_storage_cursor =
2707            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2708        for (hashed_address, storage) in sorted_storages {
2709            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2710                hashed_storage_cursor.delete_current_duplicates()?;
2711            }
2712
2713            for (hashed_slot, value) in storage.storage_slots_ref() {
2714                let entry = StorageEntry { key: *hashed_slot, value: *value };
2715
2716                if let Some(db_entry) =
2717                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
2718                    db_entry.key == entry.key
2719                {
2720                    hashed_storage_cursor.delete_current()?;
2721                }
2722
2723                if !entry.value.is_zero() {
2724                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2725                }
2726            }
2727        }
2728
2729        Ok(())
2730    }
2731
2732    /// Remove the last N blocks of state.
2733    ///
2734    /// The latest state will be unwound
2735    ///
2736    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2737    ///    transaction ids.
2738    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2739    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2740    ///    the changesets.
2741    ///    - In order to have both the old and new values in the changesets, we also access the
2742    ///      plain state tables.
2743    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2744    ///    we:
2745    ///     1. Take the old value from the changeset
2746    ///     2. Take the new value from the plain state
2747    ///     3. Save the old value to the local state
2748    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2749    ///    have seen before we:
2750    ///     1. Take the old value from the changeset
2751    ///     2. Take the new value from the local state
2752    ///     3. Set the local state to the value in the changeset
2753    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
2754        let range = block + 1..=self.last_block_number()?;
2755
2756        if range.is_empty() {
2757            return Ok(());
2758        }
2759
2760        // We are not removing block meta as it is used to get block changesets.
2761        let block_bodies = self.block_body_indices_range(range.clone())?;
2762
2763        // get transaction receipts
2764        let from_transaction_num =
2765            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2766
2767        let storage_range = BlockNumberAddress::range(range.clone());
2768        let storage_changeset = if self.cached_storage_settings().storage_v2 {
2769            let changesets = self.storage_changesets_range(range.clone())?;
2770            let mut changeset_writer =
2771                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2772            changeset_writer.prune_storage_changesets(block)?;
2773            changesets
2774        } else {
2775            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2776        };
2777        let account_changeset = if self.cached_storage_settings().storage_v2 {
2778            let changesets = self.account_changesets_range(range)?;
2779            let mut changeset_writer =
2780                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2781            changeset_writer.prune_account_changesets(block)?;
2782            changesets
2783        } else {
2784            self.take::<tables::AccountChangeSets>(range)?
2785        };
2786
2787        if self.cached_storage_settings().use_hashed_state() {
2788            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2789            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2790
2791            let (state, _) = self.populate_bundle_state_hashed(
2792                account_changeset,
2793                storage_changeset,
2794                &mut hashed_accounts_cursor,
2795                &mut hashed_storage_cursor,
2796            )?;
2797
2798            for (address, (old_account, new_account, storage)) in &state {
2799                if old_account != new_account {
2800                    let hashed_address = keccak256(address);
2801                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2802                    if let Some(account) = old_account {
2803                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2804                    } else if existing_entry.is_some() {
2805                        hashed_accounts_cursor.delete_current()?;
2806                    }
2807                }
2808
2809                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2810                    let hashed_address = keccak256(address);
2811                    let hashed_storage_key = keccak256(storage_key);
2812                    let storage_entry =
2813                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2814                    if hashed_storage_cursor
2815                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2816                        .is_some_and(|s| s.key == hashed_storage_key)
2817                    {
2818                        hashed_storage_cursor.delete_current()?
2819                    }
2820
2821                    if !old_storage_value.is_zero() {
2822                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2823                    }
2824                }
2825            }
2826        } else {
2827            // This is not working for blocks that are not at tip. as plain state is not the last
2828            // state of end range. We should rename the functions or add support to access
2829            // History state. Accessing history state can be tricky but we are not gaining
2830            // anything.
2831            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2832            let mut plain_storage_cursor =
2833                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2834
2835            let (state, _) = self.populate_bundle_state_plain(
2836                account_changeset,
2837                storage_changeset,
2838                &mut plain_accounts_cursor,
2839                &mut plain_storage_cursor,
2840            )?;
2841
2842            for (address, (old_account, new_account, storage)) in &state {
2843                if old_account != new_account {
2844                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2845                    if let Some(account) = old_account {
2846                        plain_accounts_cursor.upsert(*address, account)?;
2847                    } else if existing_entry.is_some() {
2848                        plain_accounts_cursor.delete_current()?;
2849                    }
2850                }
2851
2852                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2853                    let storage_entry =
2854                        StorageEntry { key: *storage_key, value: *old_storage_value };
2855                    if plain_storage_cursor
2856                        .seek_by_key_subkey(*address, *storage_key)?
2857                        .is_some_and(|s| s.key == *storage_key)
2858                    {
2859                        plain_storage_cursor.delete_current()?
2860                    }
2861
2862                    if !old_storage_value.is_zero() {
2863                        plain_storage_cursor.upsert(*address, &storage_entry)?;
2864                    }
2865                }
2866            }
2867        }
2868
2869        self.remove_receipts_from(from_transaction_num, block)?;
2870
2871        Ok(())
2872    }
2873
2874    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2875    ///
2876    /// The latest state will be unwound and returned back with all the blocks
2877    ///
2878    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2879    ///    transaction ids.
2880    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2881    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2882    ///    the changesets.
2883    ///    - In order to have both the old and new values in the changesets, we also access the
2884    ///      plain state tables.
2885    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2886    ///    we:
2887    ///     1. Take the old value from the changeset
2888    ///     2. Take the new value from the plain state
2889    ///     3. Save the old value to the local state
2890    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2891    ///    have seen before we:
2892    ///     1. Take the old value from the changeset
2893    ///     2. Take the new value from the local state
2894    ///     3. Set the local state to the value in the changeset
2895    fn take_state_above(
2896        &self,
2897        block: BlockNumber,
2898    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2899        let range = block + 1..=self.last_block_number()?;
2900
2901        if range.is_empty() {
2902            return Ok(ExecutionOutcome::default())
2903        }
2904        let start_block_number = *range.start();
2905
2906        // We are not removing block meta as it is used to get block changesets.
2907        let block_bodies = self.block_body_indices_range(range.clone())?;
2908
2909        // get transaction receipts
2910        let from_transaction_num =
2911            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2912        let to_transaction_num =
2913            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2914
2915        let storage_range = BlockNumberAddress::range(range.clone());
2916        let storage_changeset = if let Some(highest_block) = self
2917            .static_file_provider
2918            .get_highest_static_file_block(StaticFileSegment::StorageChangeSets) &&
2919            self.cached_storage_settings().storage_v2
2920        {
2921            let changesets = self.storage_changesets_range(block + 1..=highest_block)?;
2922            let mut changeset_writer =
2923                self.static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets)?;
2924            changeset_writer.prune_storage_changesets(block)?;
2925            changesets
2926        } else {
2927            self.take::<tables::StorageChangeSets>(storage_range)?.into_iter().collect()
2928        };
2929
2930        // if there are static files for this segment, prune them.
2931        let highest_changeset_block = self
2932            .static_file_provider
2933            .get_highest_static_file_block(StaticFileSegment::AccountChangeSets);
2934        let account_changeset = if let Some(highest_block) = highest_changeset_block &&
2935            self.cached_storage_settings().storage_v2
2936        {
2937            // TODO: add a `take` method that removes and returns the items instead of doing this
2938            let changesets = self.account_changesets_range(block + 1..highest_block + 1)?;
2939            let mut changeset_writer =
2940                self.static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets)?;
2941            changeset_writer.prune_account_changesets(block)?;
2942
2943            changesets
2944        } else {
2945            // Have to remove from static files if they exist, otherwise remove using `take` for the
2946            // changeset tables
2947            self.take::<tables::AccountChangeSets>(range)?
2948        };
2949
2950        let (state, reverts) = if self.cached_storage_settings().use_hashed_state() {
2951            let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2952            let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2953
2954            let (state, reverts) = self.populate_bundle_state_hashed(
2955                account_changeset,
2956                storage_changeset,
2957                &mut hashed_accounts_cursor,
2958                &mut hashed_storage_cursor,
2959            )?;
2960
2961            for (address, (old_account, new_account, storage)) in &state {
2962                if old_account != new_account {
2963                    let hashed_address = keccak256(address);
2964                    let existing_entry = hashed_accounts_cursor.seek_exact(hashed_address)?;
2965                    if let Some(account) = old_account {
2966                        hashed_accounts_cursor.upsert(hashed_address, account)?;
2967                    } else if existing_entry.is_some() {
2968                        hashed_accounts_cursor.delete_current()?;
2969                    }
2970                }
2971
2972                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2973                    let hashed_address = keccak256(address);
2974                    let hashed_storage_key = keccak256(storage_key);
2975                    let storage_entry =
2976                        StorageEntry { key: hashed_storage_key, value: *old_storage_value };
2977                    if hashed_storage_cursor
2978                        .seek_by_key_subkey(hashed_address, hashed_storage_key)?
2979                        .is_some_and(|s| s.key == hashed_storage_key)
2980                    {
2981                        hashed_storage_cursor.delete_current()?
2982                    }
2983
2984                    if !old_storage_value.is_zero() {
2985                        hashed_storage_cursor.upsert(hashed_address, &storage_entry)?;
2986                    }
2987                }
2988            }
2989
2990            (state, reverts)
2991        } else {
2992            // This is not working for blocks that are not at tip. as plain state is not the last
2993            // state of end range. We should rename the functions or add support to access
2994            // History state. Accessing history state can be tricky but we are not gaining
2995            // anything.
2996            let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2997            let mut plain_storage_cursor =
2998                self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2999
3000            let (state, reverts) = self.populate_bundle_state_plain(
3001                account_changeset,
3002                storage_changeset,
3003                &mut plain_accounts_cursor,
3004                &mut plain_storage_cursor,
3005            )?;
3006
3007            for (address, (old_account, new_account, storage)) in &state {
3008                if old_account != new_account {
3009                    let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
3010                    if let Some(account) = old_account {
3011                        plain_accounts_cursor.upsert(*address, account)?;
3012                    } else if existing_entry.is_some() {
3013                        plain_accounts_cursor.delete_current()?;
3014                    }
3015                }
3016
3017                for (storage_key, (old_storage_value, _new_storage_value)) in storage {
3018                    let storage_entry =
3019                        StorageEntry { key: *storage_key, value: *old_storage_value };
3020                    if plain_storage_cursor
3021                        .seek_by_key_subkey(*address, *storage_key)?
3022                        .is_some_and(|s| s.key == *storage_key)
3023                    {
3024                        plain_storage_cursor.delete_current()?
3025                    }
3026
3027                    if !old_storage_value.is_zero() {
3028                        plain_storage_cursor.upsert(*address, &storage_entry)?;
3029                    }
3030                }
3031            }
3032
3033            (state, reverts)
3034        };
3035
3036        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
3037        let mut receipts_iter = self
3038            .static_file_provider
3039            .get_range_with_static_file_or_database(
3040                StaticFileSegment::Receipts,
3041                from_transaction_num..to_transaction_num + 1,
3042                |static_file, range, _| {
3043                    static_file
3044                        .receipts_by_tx_range(range.clone())
3045                        .map(|r| range.into_iter().zip(r).collect())
3046                },
3047                |range, _| {
3048                    self.tx
3049                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
3050                        .walk_range(range)?
3051                        .map(|r| r.map_err(Into::into))
3052                        .collect()
3053                },
3054                |_| true,
3055            )?
3056            .into_iter()
3057            .peekable();
3058
3059        let mut receipts = Vec::with_capacity(block_bodies.len());
3060        // loop break if we are at the end of the blocks.
3061        for block_body in block_bodies {
3062            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
3063            for num in block_body.tx_num_range() {
3064                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
3065                    block_receipts.push(receipts_iter.next().unwrap().1);
3066                }
3067            }
3068            receipts.push(block_receipts);
3069        }
3070
3071        self.remove_receipts_from(from_transaction_num, block)?;
3072
3073        Ok(ExecutionOutcome::new_init(
3074            state,
3075            reverts,
3076            Vec::new(),
3077            receipts,
3078            start_block_number,
3079            Vec::new(),
3080        ))
3081    }
3082}
3083
3084impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
3085    fn write_account_trie_updates<A: TrieTableAdapter>(
3086        tx: &TX,
3087        trie_updates: &TrieUpdatesSorted,
3088        num_entries: &mut usize,
3089    ) -> ProviderResult<()>
3090    where
3091        TX: DbTxMut,
3092    {
3093        let mut account_trie_cursor = tx.cursor_write::<A::AccountTrieTable>()?;
3094        // Process sorted account nodes
3095        for (key, updated_node) in trie_updates.account_nodes_ref() {
3096            let nibbles = A::AccountKey::from(*key);
3097            match updated_node {
3098                Some(node) => {
3099                    if !key.is_empty() {
3100                        *num_entries += 1;
3101                        account_trie_cursor.upsert(nibbles, node)?;
3102                    }
3103                }
3104                None => {
3105                    *num_entries += 1;
3106                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
3107                        account_trie_cursor.delete_current()?;
3108                    }
3109                }
3110            }
3111        }
3112        Ok(())
3113    }
3114
3115    fn write_storage_tries<A: TrieTableAdapter>(
3116        tx: &TX,
3117        storage_tries: Vec<(&B256, &StorageTrieUpdatesSorted)>,
3118        num_entries: &mut usize,
3119    ) -> ProviderResult<()>
3120    where
3121        TX: DbTxMut,
3122    {
3123        let mut cursor = tx.cursor_dup_write::<A::StorageTrieTable>()?;
3124        for (hashed_address, storage_trie_updates) in storage_tries {
3125            let mut db_storage_trie_cursor: DatabaseStorageTrieCursor<_, A> =
3126                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
3127            *num_entries +=
3128                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
3129            cursor = db_storage_trie_cursor.cursor;
3130        }
3131        Ok(())
3132    }
3133}
3134
3135impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
3136    /// Writes trie updates to the database with already sorted updates.
3137    ///
3138    /// Returns the number of entries modified.
3139    #[instrument(level = "debug", target = "providers::db", skip_all)]
3140    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
3141        if trie_updates.is_empty() {
3142            return Ok(0)
3143        }
3144
3145        // Track the number of inserted entries.
3146        let mut num_entries = 0;
3147
3148        reth_trie_db::with_adapter!(self, |A| {
3149            Self::write_account_trie_updates::<A>(self.tx_ref(), trie_updates, &mut num_entries)?;
3150        });
3151
3152        num_entries +=
3153            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
3154
3155        Ok(num_entries)
3156    }
3157}
3158
3159impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
3160    /// Writes storage trie updates from the given storage trie map with already sorted updates.
3161    ///
3162    /// Expects the storage trie updates to already be sorted by the hashed address key.
3163    ///
3164    /// Returns the number of entries modified.
3165    fn write_storage_trie_updates_sorted<'a>(
3166        &self,
3167        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
3168    ) -> ProviderResult<usize> {
3169        let mut num_entries = 0;
3170        let mut storage_tries = storage_tries.collect::<Vec<_>>();
3171        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
3172        reth_trie_db::with_adapter!(self, |A| {
3173            Self::write_storage_tries::<A>(self.tx_ref(), storage_tries, &mut num_entries)?;
3174        });
3175        Ok(num_entries)
3176    }
3177}
3178
3179impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
3180    fn unwind_account_hashing<'a>(
3181        &self,
3182        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3183    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3184        // Aggregate all block changesets and make a list of accounts that have been changed.
3185        // Note that collecting and then reversing the order is necessary to ensure that the
3186        // changes are applied in the correct order.
3187        let hashed_accounts = changesets
3188            .into_iter()
3189            .map(|(_, e)| (keccak256(e.address), e.info))
3190            .collect::<Vec<_>>()
3191            .into_iter()
3192            .rev()
3193            .collect::<BTreeMap<_, _>>();
3194
3195        // Apply values to HashedState, and remove the account if it's None.
3196        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3197        for (hashed_address, account) in &hashed_accounts {
3198            if let Some(account) = account {
3199                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3200            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3201                hashed_accounts_cursor.delete_current()?;
3202            }
3203        }
3204
3205        Ok(hashed_accounts)
3206    }
3207
3208    fn unwind_account_hashing_range(
3209        &self,
3210        range: impl RangeBounds<BlockNumber>,
3211    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3212        let changesets = self.account_changesets_range(range)?;
3213        self.unwind_account_hashing(changesets.iter())
3214    }
3215
3216    fn insert_account_for_hashing(
3217        &self,
3218        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
3219    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
3220        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
3221        let hashed_accounts =
3222            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
3223        for (hashed_address, account) in &hashed_accounts {
3224            if let Some(account) = account {
3225                hashed_accounts_cursor.upsert(*hashed_address, account)?;
3226            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
3227                hashed_accounts_cursor.delete_current()?;
3228            }
3229        }
3230        Ok(hashed_accounts)
3231    }
3232
3233    fn unwind_storage_hashing(
3234        &self,
3235        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3236    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3237        // Aggregate all block changesets and make list of accounts that have been changed.
3238        let mut hashed_storages = changesets
3239            .into_iter()
3240            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
3241                let hashed_key = keccak256(storage_entry.key);
3242                (keccak256(address), hashed_key, storage_entry.value)
3243            })
3244            .collect::<Vec<_>>();
3245        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
3246
3247        // Apply values to HashedState, and remove the account if it's None.
3248        let mut hashed_storage_keys: B256Map<BTreeSet<B256>> =
3249            B256Map::with_capacity_and_hasher(hashed_storages.len(), Default::default());
3250        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3251        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
3252            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
3253
3254            if hashed_storage
3255                .seek_by_key_subkey(hashed_address, key)?
3256                .is_some_and(|entry| entry.key == key)
3257            {
3258                hashed_storage.delete_current()?;
3259            }
3260
3261            if !value.is_zero() {
3262                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
3263            }
3264        }
3265        Ok(hashed_storage_keys)
3266    }
3267
3268    fn unwind_storage_hashing_range(
3269        &self,
3270        range: impl RangeBounds<BlockNumber>,
3271    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3272        let changesets = self.storage_changesets_range(range)?;
3273        self.unwind_storage_hashing(changesets.into_iter())
3274    }
3275
3276    fn insert_storage_for_hashing(
3277        &self,
3278        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
3279    ) -> ProviderResult<B256Map<BTreeSet<B256>>> {
3280        // hash values
3281        let hashed_storages =
3282            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
3283                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
3284                    map.insert(keccak256(entry.key), entry.value);
3285                    map
3286                });
3287                map.insert(keccak256(address), storage);
3288                map
3289            });
3290
3291        let hashed_storage_keys = hashed_storages
3292            .iter()
3293            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
3294            .collect();
3295
3296        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
3297        // Hash the address and key and apply them to HashedStorage (if Storage is None
3298        // just remove it);
3299        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
3300            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
3301                if hashed_storage_cursor
3302                    .seek_by_key_subkey(hashed_address, key)?
3303                    .is_some_and(|entry| entry.key == key)
3304                {
3305                    hashed_storage_cursor.delete_current()?;
3306                }
3307
3308                if !value.is_zero() {
3309                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
3310                }
3311                Ok(())
3312            })
3313        })?;
3314
3315        Ok(hashed_storage_keys)
3316    }
3317}
3318
3319impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
3320    fn unwind_account_history_indices<'a>(
3321        &self,
3322        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
3323    ) -> ProviderResult<usize> {
3324        let mut last_indices = changesets
3325            .into_iter()
3326            .map(|(index, account)| (account.address, *index))
3327            .collect::<Vec<_>>();
3328        last_indices.sort_unstable_by_key(|(a, _)| *a);
3329
3330        if self.cached_storage_settings().storage_v2 {
3331            let batch = self.rocksdb_provider.unwind_account_history_indices(&last_indices)?;
3332            self.pending_rocksdb_batches.lock().push(batch);
3333        } else {
3334            // Unwind the account history index in MDBX.
3335            let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
3336            for &(address, rem_index) in &last_indices {
3337                let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
3338                    &mut cursor,
3339                    ShardedKey::last(address),
3340                    rem_index,
3341                    |sharded_key| sharded_key.key == address,
3342                )?;
3343
3344                // Check the last returned partial shard.
3345                // If it's not empty, the shard needs to be reinserted.
3346                if !partial_shard.is_empty() {
3347                    cursor.insert(
3348                        ShardedKey::last(address),
3349                        &BlockNumberList::new_pre_sorted(partial_shard),
3350                    )?;
3351                }
3352            }
3353        }
3354
3355        let changesets = last_indices.len();
3356        Ok(changesets)
3357    }
3358
3359    fn unwind_account_history_indices_range(
3360        &self,
3361        range: impl RangeBounds<BlockNumber>,
3362    ) -> ProviderResult<usize> {
3363        let changesets = self.account_changesets_range(range)?;
3364        self.unwind_account_history_indices(changesets.iter())
3365    }
3366
3367    fn insert_account_history_index(
3368        &self,
3369        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
3370    ) -> ProviderResult<()> {
3371        self.append_history_index::<_, tables::AccountsHistory>(
3372            account_transitions,
3373            ShardedKey::new,
3374        )
3375    }
3376
3377    fn unwind_storage_history_indices(
3378        &self,
3379        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
3380    ) -> ProviderResult<usize> {
3381        let mut storage_changesets = changesets
3382            .into_iter()
3383            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
3384            .collect::<Vec<_>>();
3385        storage_changesets.sort_unstable_by_key(|(address, key, _)| (*address, *key));
3386
3387        if self.cached_storage_settings().storage_v2 {
3388            let batch =
3389                self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?;
3390            self.pending_rocksdb_batches.lock().push(batch);
3391        } else {
3392            // Unwind the storage history index in MDBX.
3393            let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
3394            for &(address, storage_key, rem_index) in &storage_changesets {
3395                let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
3396                    &mut cursor,
3397                    StorageShardedKey::last(address, storage_key),
3398                    rem_index,
3399                    |storage_sharded_key| {
3400                        storage_sharded_key.address == address &&
3401                            storage_sharded_key.sharded_key.key == storage_key
3402                    },
3403                )?;
3404
3405                // Check the last returned partial shard.
3406                // If it's not empty, the shard needs to be reinserted.
3407                if !partial_shard.is_empty() {
3408                    cursor.insert(
3409                        StorageShardedKey::last(address, storage_key),
3410                        &BlockNumberList::new_pre_sorted(partial_shard),
3411                    )?;
3412                }
3413            }
3414        }
3415
3416        let changesets = storage_changesets.len();
3417        Ok(changesets)
3418    }
3419
3420    fn unwind_storage_history_indices_range(
3421        &self,
3422        range: impl RangeBounds<BlockNumber>,
3423    ) -> ProviderResult<usize> {
3424        let changesets = self.storage_changesets_range(range)?;
3425        self.unwind_storage_history_indices(changesets.into_iter())
3426    }
3427
3428    fn insert_storage_history_index(
3429        &self,
3430        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
3431    ) -> ProviderResult<()> {
3432        self.append_history_index::<_, tables::StoragesHistory>(
3433            storage_transitions,
3434            |(address, storage_key), highest_block_number| {
3435                StorageShardedKey::new(address, storage_key, highest_block_number)
3436            },
3437        )
3438    }
3439
3440    #[instrument(level = "debug", target = "providers::db", skip_all)]
3441    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
3442        let storage_settings = self.cached_storage_settings();
3443        if !storage_settings.storage_v2 {
3444            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
3445            self.insert_account_history_index(indices)?;
3446        }
3447
3448        if !storage_settings.storage_v2 {
3449            let indices = self.changed_storages_and_blocks_with_range(range)?;
3450            self.insert_storage_history_index(indices)?;
3451        }
3452
3453        Ok(())
3454    }
3455}
3456
3457impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockExecutionWriter
3458    for DatabaseProvider<TX, N>
3459{
3460    fn take_block_and_execution_above(
3461        &self,
3462        block: BlockNumber,
3463    ) -> ProviderResult<Chain<Self::Primitives>> {
3464        let range = block + 1..=self.last_block_number()?;
3465
3466        self.unwind_trie_state_from(block + 1)?;
3467
3468        // get execution res
3469        let execution_state = self.take_state_above(block)?;
3470
3471        let blocks = self.recovered_block_range(range)?;
3472
3473        // remove block bodies it is needed for both get block range and get block execution results
3474        // that is why it is deleted afterwards.
3475        self.remove_blocks_above(block)?;
3476
3477        // Update pipeline progress
3478        self.update_pipeline_stages(block, true)?;
3479
3480        Ok(Chain::new(blocks, execution_state, BTreeMap::new()))
3481    }
3482
3483    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
3484        self.unwind_trie_state_from(block + 1)?;
3485
3486        // remove execution res
3487        self.remove_state_above(block)?;
3488
3489        // remove block bodies it is needed for both get block range and get block execution results
3490        // that is why it is deleted afterwards.
3491        self.remove_blocks_above(block)?;
3492
3493        // Update pipeline progress
3494        self.update_pipeline_stages(block, true)?;
3495
3496        Ok(())
3497    }
3498}
3499
3500impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
3501    for DatabaseProvider<TX, N>
3502{
3503    type Block = BlockTy<N>;
3504    type Receipt = ReceiptTy<N>;
3505
3506    /// Inserts the block into the database, writing to both static files and MDBX.
3507    ///
3508    /// This is a convenience method primarily used in tests. For production use,
3509    /// prefer [`Self::save_blocks`] which handles execution output and trie data.
3510    fn insert_block(
3511        &self,
3512        block: &RecoveredBlock<Self::Block>,
3513    ) -> ProviderResult<StoredBlockBodyIndices> {
3514        let block_number = block.number();
3515
3516        // Wrap block in ExecutedBlock with empty execution output (no receipts/state/trie)
3517        let executed_block = ExecutedBlock::new(
3518            Arc::new(block.clone()),
3519            Arc::new(BlockExecutionOutput {
3520                result: BlockExecutionResult {
3521                    receipts: Default::default(),
3522                    requests: Default::default(),
3523                    gas_used: 0,
3524                    blob_gas_used: 0,
3525                },
3526                state: Default::default(),
3527            }),
3528            ComputedTrieData::default(),
3529        );
3530
3531        // Delegate to save_blocks with BlocksOnly mode (skips receipts/state/trie)
3532        self.save_blocks(vec![executed_block], SaveBlocksMode::BlocksOnly)?;
3533
3534        // Return the body indices
3535        self.block_body_indices(block_number)?
3536            .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))
3537    }
3538
3539    fn append_block_bodies(
3540        &self,
3541        bodies: Vec<(BlockNumber, Option<&BodyTy<N>>)>,
3542    ) -> ProviderResult<()> {
3543        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
3544
3545        // Initialize writer if we will be writing transactions to staticfiles
3546        let mut tx_writer =
3547            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
3548
3549        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
3550        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
3551
3552        // Get id for the next tx_num or zero if there are no transactions.
3553        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
3554
3555        for (block_number, body) in &bodies {
3556            // Increment block on static file header.
3557            tx_writer.increment_block(*block_number)?;
3558
3559            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
3560            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
3561
3562            let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3563
3564            // insert block meta
3565            block_indices_cursor.append(*block_number, &block_indices)?;
3566
3567            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
3568
3569            let Some(body) = body else { continue };
3570
3571            // write transaction block index
3572            if !body.transactions().is_empty() {
3573                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
3574                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
3575            }
3576
3577            // write transactions
3578            for transaction in body.transactions() {
3579                tx_writer.append_transaction(next_tx_num, transaction)?;
3580
3581                // Increment transaction id for each transaction.
3582                next_tx_num += 1;
3583            }
3584        }
3585
3586        self.storage.writer().write_block_bodies(self, bodies)?;
3587
3588        Ok(())
3589    }
3590
3591    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
3592        let last_block_number = self.last_block_number()?;
3593        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
3594        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
3595            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3596        }
3597
3598        // Get highest static file block for the total block range
3599        let highest_static_file_block = self
3600            .static_file_provider()
3601            .get_highest_static_file_block(StaticFileSegment::Headers)
3602            .expect("todo: error handling, headers should exist");
3603
3604        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
3605        // we remove only what is ABOVE the block.
3606        //
3607        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
3608        // will have three blocks to remove, which will be block 8, 7, and 6.
3609        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
3610        self.static_file_provider()
3611            .get_writer(block, StaticFileSegment::Headers)?
3612            .prune_headers(highest_static_file_block.saturating_sub(block))?;
3613
3614        // First transaction to be removed
3615        let unwind_tx_from = self
3616            .block_body_indices(block)?
3617            .map(|b| b.next_tx_num())
3618            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3619
3620        // Last transaction to be removed
3621        let unwind_tx_to = self
3622            .tx
3623            .cursor_read::<tables::BlockBodyIndices>()?
3624            .last()?
3625            // shouldn't happen because this was OK above
3626            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3627            .1
3628            .last_tx_num();
3629
3630        if unwind_tx_from <= unwind_tx_to {
3631            let hashes = self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))?;
3632            self.with_rocksdb_batch(|batch| {
3633                let mut writer = EitherWriter::new_transaction_hash_numbers(self, batch)?;
3634                for (hash, _) in hashes {
3635                    writer.delete_transaction_hash_number(hash)?;
3636                }
3637                Ok(((), writer.into_raw_rocksdb_batch()))
3638            })?;
3639        }
3640
3641        // Skip sender pruning when sender_recovery is fully pruned, since no sender data
3642        // exists in static files or the database.
3643        if self.prune_modes.sender_recovery.is_none_or(|m| !m.is_full()) {
3644            EitherWriter::new_senders(self, last_block_number)?
3645                .prune_senders(unwind_tx_from, block)?;
3646        }
3647
3648        self.remove_bodies_above(block)?;
3649
3650        Ok(())
3651    }
3652
3653    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
3654        self.storage.writer().remove_block_bodies_above(self, block)?;
3655
3656        // First transaction to be removed
3657        let unwind_tx_from = self
3658            .block_body_indices(block)?
3659            .map(|b| b.next_tx_num())
3660            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3661
3662        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3663        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3664
3665        let static_file_tx_num =
3666            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3667
3668        let to_delete = static_file_tx_num
3669            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3670            .unwrap_or_default();
3671
3672        self.static_file_provider
3673            .latest_writer(StaticFileSegment::Transactions)?
3674            .prune_transactions(to_delete, block)?;
3675
3676        Ok(())
3677    }
3678
3679    /// Appends blocks with their execution state to the database.
3680    ///
3681    /// **Note:** This function is only used in tests.
3682    ///
3683    /// History indices are written to the appropriate backend based on storage settings:
3684    /// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
3685    ///
3686    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3687    fn append_blocks_with_state(
3688        &self,
3689        blocks: Vec<RecoveredBlock<Self::Block>>,
3690        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3691        hashed_state: HashedPostStateSorted,
3692    ) -> ProviderResult<()> {
3693        if blocks.is_empty() {
3694            debug!(target: "providers::db", "Attempted to append empty block range");
3695            return Ok(())
3696        }
3697
3698        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3699        // `None`.
3700        let first_number = blocks[0].number();
3701
3702        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3703        // `None`.
3704        let last_block_number = blocks[blocks.len() - 1].number();
3705
3706        let mut durations_recorder = metrics::DurationsRecorder::new(&self.metrics);
3707
3708        // Extract account and storage transitions from the bundle reverts BEFORE writing state.
3709        // This is necessary because with edge storage, changesets are written to static files
3710        // whose index isn't updated until commit, making them invisible to subsequent reads
3711        // within the same transaction.
3712        let (account_transitions, storage_transitions) = {
3713            let mut account_transitions: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
3714            let mut storage_transitions: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
3715            for (block_idx, block_reverts) in execution_outcome.bundle.reverts.iter().enumerate() {
3716                let block_number = first_number + block_idx as u64;
3717                for (address, account_revert) in block_reverts {
3718                    account_transitions.entry(*address).or_default().push(block_number);
3719                    for storage_key in account_revert.storage.keys() {
3720                        let key = B256::from(storage_key.to_be_bytes());
3721                        storage_transitions.entry((*address, key)).or_default().push(block_number);
3722                    }
3723                }
3724            }
3725            (account_transitions, storage_transitions)
3726        };
3727
3728        // Insert the blocks
3729        for block in blocks {
3730            self.insert_block(&block)?;
3731            durations_recorder.record_relative(metrics::Action::InsertBlock);
3732        }
3733
3734        self.write_state(execution_outcome, OriginalValuesKnown::No, StateWriteConfig::default())?;
3735        durations_recorder.record_relative(metrics::Action::InsertState);
3736
3737        // insert hashes and intermediate merkle nodes
3738        self.write_hashed_state(&hashed_state)?;
3739        durations_recorder.record_relative(metrics::Action::InsertHashes);
3740
3741        // Use pre-computed transitions for history indices since static file
3742        // writes aren't visible until commit.
3743        // Note: For MDBX we use insert_*_history_index. For RocksDB we use
3744        // append_*_history_shard which handles read-merge-write internally.
3745        let storage_settings = self.cached_storage_settings();
3746        if storage_settings.storage_v2 {
3747            self.with_rocksdb_batch(|mut batch| {
3748                for (address, blocks) in account_transitions {
3749                    batch.append_account_history_shard(address, blocks)?;
3750                }
3751                Ok(((), Some(batch.into_inner())))
3752            })?;
3753        } else {
3754            self.insert_account_history_index(account_transitions)?;
3755        }
3756        if storage_settings.storage_v2 {
3757            self.with_rocksdb_batch(|mut batch| {
3758                for ((address, key), blocks) in storage_transitions {
3759                    batch.append_storage_history_shard(address, key, blocks)?;
3760                }
3761                Ok(((), Some(batch.into_inner())))
3762            })?;
3763        } else {
3764            self.insert_storage_history_index(storage_transitions)?;
3765        }
3766        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3767
3768        // Update pipeline progress
3769        self.update_pipeline_stages(last_block_number, false)?;
3770        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3771
3772        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3773
3774        Ok(())
3775    }
3776}
3777
3778impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3779    fn get_prune_checkpoint(
3780        &self,
3781        segment: PruneSegment,
3782    ) -> ProviderResult<Option<PruneCheckpoint>> {
3783        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3784    }
3785
3786    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3787        Ok(PruneSegment::variants()
3788            .filter_map(|segment| {
3789                self.tx
3790                    .get::<tables::PruneCheckpoints>(segment)
3791                    .transpose()
3792                    .map(|chk| chk.map(|chk| (segment, chk)))
3793            })
3794            .collect::<Result<_, _>>()?)
3795    }
3796}
3797
3798impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3799    fn save_prune_checkpoint(
3800        &self,
3801        segment: PruneSegment,
3802        checkpoint: PruneCheckpoint,
3803    ) -> ProviderResult<()> {
3804        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3805    }
3806}
3807
3808impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3809    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3810        let db_entries = self.tx.entries::<T>()?;
3811        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3812            Ok(entries) => entries,
3813            Err(ProviderError::UnsupportedProvider) => 0,
3814            Err(err) => return Err(err),
3815        };
3816
3817        Ok(db_entries + static_file_entries)
3818    }
3819}
3820
3821impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3822    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3823        let mut finalized_blocks = self
3824            .tx
3825            .cursor_read::<tables::ChainState>()?
3826            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3827            .take(1)
3828            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3829
3830        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3831        Ok(last_finalized_block_number)
3832    }
3833
3834    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3835        let mut finalized_blocks = self
3836            .tx
3837            .cursor_read::<tables::ChainState>()?
3838            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3839            .take(1)
3840            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3841
3842        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3843        Ok(last_finalized_block_number)
3844    }
3845}
3846
3847impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3848    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3849        Ok(self
3850            .tx
3851            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3852    }
3853
3854    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3855        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3856    }
3857}
3858
3859impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3860    type Tx = TX;
3861
3862    fn tx_ref(&self) -> &Self::Tx {
3863        &self.tx
3864    }
3865
3866    fn tx_mut(&mut self) -> &mut Self::Tx {
3867        &mut self.tx
3868    }
3869
3870    fn into_tx(self) -> Self::Tx {
3871        self.tx
3872    }
3873
3874    fn prune_modes_ref(&self) -> &PruneModes {
3875        self.prune_modes_ref()
3876    }
3877
3878    /// Commit database transaction, static files, and pending `RocksDB` batches.
3879    #[instrument(
3880        name = "DatabaseProvider::commit",
3881        level = "debug",
3882        target = "providers::db",
3883        skip_all
3884    )]
3885    fn commit(self) -> ProviderResult<()> {
3886        if self.static_file_provider.has_unwind_queued() || self.commit_order.is_unwind() {
3887            self.commit_unwind()?;
3888        } else {
3889            // Normal path: finalize() will call sync_all() if not already synced
3890            let mut timings = metrics::CommitTimings::default();
3891
3892            let start = Instant::now();
3893            self.static_file_provider.finalize()?;
3894            timings.sf = start.elapsed();
3895
3896            let start = Instant::now();
3897            let batches = std::mem::take(&mut *self.pending_rocksdb_batches.lock());
3898            for batch in batches {
3899                self.rocksdb_provider.commit_batch(batch)?;
3900            }
3901            timings.rocksdb = start.elapsed();
3902
3903            let start = Instant::now();
3904            self.tx.commit()?;
3905            timings.mdbx = start.elapsed();
3906
3907            self.metrics.record_commit(&timings);
3908        }
3909
3910        Ok(())
3911    }
3912}
3913
3914impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3915    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3916        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3917    }
3918}
3919
3920impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3921    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3922        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3923    }
3924}
3925
3926impl<TX: Send, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3927    fn cached_storage_settings(&self) -> StorageSettings {
3928        *self.storage_settings.read()
3929    }
3930
3931    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3932        *self.storage_settings.write() = settings;
3933    }
3934}
3935
3936impl<TX: Send, N: NodeTypes> StoragePath for DatabaseProvider<TX, N> {
3937    fn storage_path(&self) -> PathBuf {
3938        self.db_path.clone()
3939    }
3940}
3941
3942#[cfg(test)]
3943mod tests {
3944    use super::*;
3945    use crate::{
3946        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3947        BlockWriter,
3948    };
3949    use alloy_consensus::Header;
3950    use alloy_primitives::{
3951        map::{AddressMap, B256Map},
3952        U256,
3953    };
3954    use reth_chain_state::ExecutedBlock;
3955    use reth_db_api::models::StorageSettings;
3956    use reth_ethereum_primitives::Receipt;
3957    use reth_execution_types::{AccountRevertInit, BlockExecutionOutput, BlockExecutionResult};
3958    use reth_primitives_traits::SealedBlock;
3959    use reth_storage_api::MetadataWriter;
3960    use reth_testing_utils::generators::{self, random_block, BlockParams};
3961    use reth_trie::{
3962        HashedPostState, KeccakKeyHasher, Nibbles, StoredNibbles, StoredNibblesSubKey,
3963    };
3964    use revm_database::BundleState;
3965    use revm_state::AccountInfo;
3966    use std::{sync::mpsc, time::Duration};
3967
3968    #[test]
3969    fn test_receipts_by_block_range_empty_range() {
3970        let factory = create_test_provider_factory();
3971        let provider = factory.provider().unwrap();
3972
3973        // empty range should return empty vec
3974        let start = 10u64;
3975        let end = 9u64;
3976        let result = provider.receipts_by_block_range(start..=end).unwrap();
3977        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3978    }
3979
3980    #[test]
3981    fn unwind_commit_waits_for_pre_commit_readers() {
3982        let factory = create_test_provider_factory();
3983
3984        let reader = factory.provider().unwrap();
3985        let provider_rw = factory.unwind_provider_rw().unwrap();
3986        provider_rw.write_metadata("unwind-wait-test", vec![1]).unwrap();
3987        let (done_tx, done_rx) = mpsc::channel();
3988
3989        let handle = std::thread::spawn(move || {
3990            let result = provider_rw.commit();
3991            done_tx.send(result).unwrap();
3992        });
3993
3994        assert!(
3995            done_rx.recv_timeout(Duration::from_millis(50)).is_err(),
3996            "unwind commit should wait while an older read transaction is still open"
3997        );
3998
3999        drop(reader);
4000
4001        done_rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
4002        handle.join().unwrap();
4003    }
4004
4005    #[test]
4006    fn test_receipts_by_block_range_nonexistent_blocks() {
4007        let factory = create_test_provider_factory();
4008        let provider = factory.provider().unwrap();
4009
4010        // non-existent blocks should return empty vecs for each block
4011        let result = provider.receipts_by_block_range(10..=12).unwrap();
4012        assert_eq!(result, vec![vec![], vec![], vec![]]);
4013    }
4014
4015    #[test]
4016    fn test_receipts_by_block_range_single_block() {
4017        let factory = create_test_provider_factory();
4018        let data = BlockchainTestData::default();
4019
4020        let provider_rw = factory.provider_rw().unwrap();
4021        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4022        provider_rw
4023            .write_state(
4024                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4025                crate::OriginalValuesKnown::No,
4026                StateWriteConfig::default(),
4027            )
4028            .unwrap();
4029        provider_rw.insert_block(&data.blocks[0].0).unwrap();
4030        provider_rw
4031            .write_state(
4032                &data.blocks[0].1,
4033                crate::OriginalValuesKnown::No,
4034                StateWriteConfig::default(),
4035            )
4036            .unwrap();
4037        provider_rw.commit().unwrap();
4038
4039        let provider = factory.provider().unwrap();
4040        let result = provider.receipts_by_block_range(1..=1).unwrap();
4041
4042        // should have one vec with one receipt
4043        assert_eq!(result.len(), 1);
4044        assert_eq!(result[0].len(), 1);
4045        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
4046    }
4047
4048    #[test]
4049    fn test_receipts_by_block_range_multiple_blocks() {
4050        let factory = create_test_provider_factory();
4051        let data = BlockchainTestData::default();
4052
4053        let provider_rw = factory.provider_rw().unwrap();
4054        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4055        provider_rw
4056            .write_state(
4057                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4058                crate::OriginalValuesKnown::No,
4059                StateWriteConfig::default(),
4060            )
4061            .unwrap();
4062        for i in 0..3 {
4063            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4064            provider_rw
4065                .write_state(
4066                    &data.blocks[i].1,
4067                    crate::OriginalValuesKnown::No,
4068                    StateWriteConfig::default(),
4069                )
4070                .unwrap();
4071        }
4072        provider_rw.commit().unwrap();
4073
4074        let provider = factory.provider().unwrap();
4075        let result = provider.receipts_by_block_range(1..=3).unwrap();
4076
4077        // should have 3 vecs, each with one receipt
4078        assert_eq!(result.len(), 3);
4079        for (i, block_receipts) in result.iter().enumerate() {
4080            assert_eq!(block_receipts.len(), 1);
4081            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
4082        }
4083    }
4084
4085    #[test]
4086    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
4087        let factory = create_test_provider_factory();
4088        let data = BlockchainTestData::default();
4089
4090        let provider_rw = factory.provider_rw().unwrap();
4091        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4092        provider_rw
4093            .write_state(
4094                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4095                crate::OriginalValuesKnown::No,
4096                StateWriteConfig::default(),
4097            )
4098            .unwrap();
4099
4100        // insert blocks 1-3 with receipts
4101        for i in 0..3 {
4102            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4103            provider_rw
4104                .write_state(
4105                    &data.blocks[i].1,
4106                    crate::OriginalValuesKnown::No,
4107                    StateWriteConfig::default(),
4108                )
4109                .unwrap();
4110        }
4111        provider_rw.commit().unwrap();
4112
4113        let provider = factory.provider().unwrap();
4114        let result = provider.receipts_by_block_range(1..=3).unwrap();
4115
4116        // verify each block has one receipt
4117        assert_eq!(result.len(), 3);
4118        for block_receipts in &result {
4119            assert_eq!(block_receipts.len(), 1);
4120        }
4121    }
4122
4123    #[test]
4124    fn test_receipts_by_block_range_partial_range() {
4125        let factory = create_test_provider_factory();
4126        let data = BlockchainTestData::default();
4127
4128        let provider_rw = factory.provider_rw().unwrap();
4129        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4130        provider_rw
4131            .write_state(
4132                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4133                crate::OriginalValuesKnown::No,
4134                StateWriteConfig::default(),
4135            )
4136            .unwrap();
4137        for i in 0..3 {
4138            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4139            provider_rw
4140                .write_state(
4141                    &data.blocks[i].1,
4142                    crate::OriginalValuesKnown::No,
4143                    StateWriteConfig::default(),
4144                )
4145                .unwrap();
4146        }
4147        provider_rw.commit().unwrap();
4148
4149        let provider = factory.provider().unwrap();
4150
4151        // request range that includes both existing and non-existing blocks
4152        let result = provider.receipts_by_block_range(2..=5).unwrap();
4153        assert_eq!(result.len(), 4);
4154
4155        // blocks 2-3 should have receipts, blocks 4-5 should be empty
4156        assert_eq!(result[0].len(), 1); // block 2
4157        assert_eq!(result[1].len(), 1); // block 3
4158        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
4159        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
4160
4161        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
4162        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
4163    }
4164
4165    #[test]
4166    fn test_receipts_by_block_range_all_empty_blocks() {
4167        let factory = create_test_provider_factory();
4168        let mut rng = generators::rng();
4169
4170        // create blocks with no transactions
4171        let mut blocks = Vec::new();
4172        for i in 0..3 {
4173            let block =
4174                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
4175            blocks.push(block);
4176        }
4177
4178        let provider_rw = factory.provider_rw().unwrap();
4179        for block in blocks {
4180            provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4181        }
4182        provider_rw.commit().unwrap();
4183
4184        let provider = factory.provider().unwrap();
4185        let result = provider.receipts_by_block_range(1..=3).unwrap();
4186
4187        assert_eq!(result.len(), 3);
4188        for block_receipts in result {
4189            assert_eq!(block_receipts.len(), 0);
4190        }
4191    }
4192
4193    #[test]
4194    fn test_receipts_by_block_range_consistency_with_individual_calls() {
4195        let factory = create_test_provider_factory();
4196        let data = BlockchainTestData::default();
4197
4198        let provider_rw = factory.provider_rw().unwrap();
4199        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4200        provider_rw
4201            .write_state(
4202                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4203                crate::OriginalValuesKnown::No,
4204                StateWriteConfig::default(),
4205            )
4206            .unwrap();
4207        for i in 0..3 {
4208            provider_rw.insert_block(&data.blocks[i].0).unwrap();
4209            provider_rw
4210                .write_state(
4211                    &data.blocks[i].1,
4212                    crate::OriginalValuesKnown::No,
4213                    StateWriteConfig::default(),
4214                )
4215                .unwrap();
4216        }
4217        provider_rw.commit().unwrap();
4218
4219        let provider = factory.provider().unwrap();
4220
4221        // get receipts using block range method
4222        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
4223
4224        // get receipts using individual block calls
4225        let mut individual_results = Vec::new();
4226        for block_num in 1..=3 {
4227            let receipts =
4228                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
4229            individual_results.push(receipts);
4230        }
4231
4232        assert_eq!(range_result, individual_results);
4233    }
4234
4235    #[test]
4236    fn test_receipts_by_block_returns_none_for_missing_unpruned_receipts() {
4237        let factory = create_test_provider_factory();
4238        let data = BlockchainTestData::default();
4239
4240        let provider_rw = factory.provider_rw().unwrap();
4241        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4242        provider_rw.insert_block(&data.blocks[0].0).unwrap();
4243        provider_rw.commit().unwrap();
4244
4245        let provider = factory.provider().unwrap();
4246        assert!(provider.receipts_by_block(1.into()).unwrap().is_none());
4247    }
4248
4249    #[test]
4250    fn test_write_trie_updates_sorted() {
4251        use reth_trie::{
4252            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4253            BranchNodeCompact, StorageTrieEntry,
4254        };
4255
4256        let factory = create_test_provider_factory();
4257        let provider_rw = factory.provider_rw().unwrap();
4258
4259        // Pre-populate account trie with data that will be deleted
4260        {
4261            let tx = provider_rw.tx_ref();
4262            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4263
4264            // Add account node that will be deleted
4265            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4266            cursor
4267                .upsert(
4268                    to_delete,
4269                    &BranchNodeCompact::new(
4270                        0b1010_1010_1010_1010, // state_mask
4271                        0b0000_0000_0000_0000, // tree_mask
4272                        0b0000_0000_0000_0000, // hash_mask
4273                        vec![],
4274                        None,
4275                    ),
4276                )
4277                .unwrap();
4278
4279            // Add account node that will be updated
4280            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4281            cursor
4282                .upsert(
4283                    to_update,
4284                    &BranchNodeCompact::new(
4285                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4286                        0b0000_0000_0000_0000, // tree_mask
4287                        0b0000_0000_0000_0000, // hash_mask
4288                        vec![],
4289                        None,
4290                    ),
4291                )
4292                .unwrap();
4293        }
4294
4295        // Pre-populate storage tries with data
4296        let storage_address1 = B256::from([1u8; 32]);
4297        let storage_address2 = B256::from([2u8; 32]);
4298        {
4299            let tx = provider_rw.tx_ref();
4300            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4301
4302            // Add storage nodes for address1 (one will be deleted)
4303            storage_cursor
4304                .upsert(
4305                    storage_address1,
4306                    &StorageTrieEntry {
4307                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4308                        node: BranchNodeCompact::new(
4309                            0b0011_0011_0011_0011, // will be deleted
4310                            0b0000_0000_0000_0000,
4311                            0b0000_0000_0000_0000,
4312                            vec![],
4313                            None,
4314                        ),
4315                    },
4316                )
4317                .unwrap();
4318
4319            // Add storage nodes for address2 (will be wiped)
4320            storage_cursor
4321                .upsert(
4322                    storage_address2,
4323                    &StorageTrieEntry {
4324                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4325                        node: BranchNodeCompact::new(
4326                            0b1100_1100_1100_1100, // will be wiped
4327                            0b0000_0000_0000_0000,
4328                            0b0000_0000_0000_0000,
4329                            vec![],
4330                            None,
4331                        ),
4332                    },
4333                )
4334                .unwrap();
4335            storage_cursor
4336                .upsert(
4337                    storage_address2,
4338                    &StorageTrieEntry {
4339                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4340                        node: BranchNodeCompact::new(
4341                            0b0011_1100_0011_1100, // will be wiped
4342                            0b0000_0000_0000_0000,
4343                            0b0000_0000_0000_0000,
4344                            vec![],
4345                            None,
4346                        ),
4347                    },
4348                )
4349                .unwrap();
4350        }
4351
4352        // Create sorted account trie updates
4353        let account_nodes = vec![
4354            (
4355                Nibbles::from_nibbles([0x1, 0x2]),
4356                Some(BranchNodeCompact::new(
4357                    0b1111_1111_1111_1111, // state_mask (updated)
4358                    0b0000_0000_0000_0000, // tree_mask
4359                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4360                    vec![],
4361                    None,
4362                )),
4363            ),
4364            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4365            (
4366                Nibbles::from_nibbles([0x5, 0x6]),
4367                Some(BranchNodeCompact::new(
4368                    0b1111_1111_1111_1111, // state_mask
4369                    0b0000_0000_0000_0000, // tree_mask
4370                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4371                    vec![],
4372                    None,
4373                )),
4374            ),
4375        ];
4376
4377        // Create sorted storage trie updates
4378        let storage_trie1 = StorageTrieUpdatesSorted {
4379            is_deleted: false,
4380            storage_nodes: vec![
4381                (
4382                    Nibbles::from_nibbles([0x1, 0x0]),
4383                    Some(BranchNodeCompact::new(
4384                        0b1111_0000_0000_0000, // state_mask
4385                        0b0000_0000_0000_0000, // tree_mask
4386                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4387                        vec![],
4388                        None,
4389                    )),
4390                ),
4391                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4392            ],
4393        };
4394
4395        let storage_trie2 = StorageTrieUpdatesSorted {
4396            is_deleted: true, // Wipe all storage for this address
4397            storage_nodes: vec![],
4398        };
4399
4400        let mut storage_tries = B256Map::default();
4401        storage_tries.insert(storage_address1, storage_trie1);
4402        storage_tries.insert(storage_address2, storage_trie2);
4403
4404        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4405
4406        // Write the sorted trie updates
4407        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4408
4409        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4410        // storage deletion = 5
4411        assert_eq!(num_entries, 5);
4412
4413        // Verify account trie updates were written correctly
4414        let tx = provider_rw.tx_ref();
4415        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4416
4417        // Check first account node was updated
4418        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4419        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4420        assert!(entry1.is_some(), "Updated account node should exist");
4421        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4422        assert_eq!(
4423            entry1.unwrap().1.state_mask,
4424            expected_mask,
4425            "Account node should have updated state_mask"
4426        );
4427
4428        // Check deleted account node no longer exists
4429        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4430        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4431        assert!(entry2.is_none(), "Deleted account node should not exist");
4432
4433        // Check new account node exists
4434        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4435        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4436        assert!(entry3.is_some(), "New account node should exist");
4437
4438        // Verify storage trie updates were written correctly
4439        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4440
4441        // Check storage for address1
4442        let storage_entries1: Vec<_> = storage_cursor
4443            .walk_dup(Some(storage_address1), None)
4444            .unwrap()
4445            .collect::<Result<Vec<_>, _>>()
4446            .unwrap();
4447        assert_eq!(
4448            storage_entries1.len(),
4449            1,
4450            "Storage address1 should have 1 entry after deletion"
4451        );
4452        assert_eq!(
4453            storage_entries1[0].1.nibbles.0,
4454            Nibbles::from_nibbles([0x1, 0x0]),
4455            "Remaining entry should be [0x1, 0x0]"
4456        );
4457
4458        // Check storage for address2 was wiped
4459        let storage_entries2: Vec<_> = storage_cursor
4460            .walk_dup(Some(storage_address2), None)
4461            .unwrap()
4462            .collect::<Result<Vec<_>, _>>()
4463            .unwrap();
4464        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4465
4466        provider_rw.commit().unwrap();
4467    }
4468
4469    #[test]
4470    fn test_prunable_receipts_logic() {
4471        let insert_blocks =
4472            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4473                let mut rng = generators::rng();
4474                for block_num in 0..=tip_block {
4475                    let block = random_block(
4476                        &mut rng,
4477                        block_num,
4478                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4479                    );
4480                    provider_rw.insert_block(&block.try_recover().unwrap()).unwrap();
4481                }
4482            };
4483
4484        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4485            let outcome = ExecutionOutcome {
4486                first_block: block,
4487                receipts: vec![vec![Receipt {
4488                    tx_type: Default::default(),
4489                    success: true,
4490                    cumulative_gas_used: block, // identifier to assert against
4491                    logs: vec![],
4492                }]],
4493                ..Default::default()
4494            };
4495            provider_rw
4496                .write_state(&outcome, crate::OriginalValuesKnown::No, StateWriteConfig::default())
4497                .unwrap();
4498            provider_rw.commit().unwrap();
4499        };
4500
4501        // Legacy mode (receipts in DB) - should be prunable
4502        {
4503            let factory = create_test_provider_factory();
4504            let storage_settings = StorageSettings::v1();
4505            factory.set_storage_settings_cache(storage_settings);
4506            let factory = factory.with_prune_modes(PruneModes {
4507                receipts: Some(PruneMode::Before(100)),
4508                ..Default::default()
4509            });
4510
4511            let tip_block = 200u64;
4512            let first_block = 1u64;
4513
4514            // create chain
4515            let provider_rw = factory.provider_rw().unwrap();
4516            insert_blocks(&provider_rw, tip_block, 1);
4517            provider_rw.commit().unwrap();
4518
4519            write_receipts(
4520                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4521                first_block,
4522            );
4523            write_receipts(
4524                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4525                tip_block - 1,
4526            );
4527
4528            let provider = factory.provider().unwrap();
4529
4530            assert!(provider.receipts_by_block(0.into()).unwrap().is_none());
4531            assert!(provider
4532                .receipts_by_block((tip_block - 1).into())
4533                .unwrap()
4534                .is_some_and(|r| r.len() == 1));
4535        }
4536
4537        // Static files mode
4538        {
4539            let factory = create_test_provider_factory();
4540            let storage_settings = StorageSettings::v2();
4541            factory.set_storage_settings_cache(storage_settings);
4542            let factory = factory.with_prune_modes(PruneModes {
4543                receipts: Some(PruneMode::Before(2)),
4544                ..Default::default()
4545            });
4546
4547            let tip_block = 200u64;
4548
4549            // create chain
4550            let provider_rw = factory.provider_rw().unwrap();
4551            insert_blocks(&provider_rw, tip_block, 1);
4552            provider_rw.commit().unwrap();
4553
4554            // Attempt to write receipts for block 0 and 1 (should be skipped)
4555            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4556            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4557
4558            assert!(factory
4559                .static_file_provider()
4560                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4561                .is_none(),);
4562            assert!(factory
4563                .static_file_provider()
4564                .get_highest_static_file_block(StaticFileSegment::Receipts)
4565                .is_some_and(|b| b == 1),);
4566
4567            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4568            // static files.
4569            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4570            assert!(factory
4571                .static_file_provider()
4572                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4573                .is_some_and(|num| num == 2),);
4574
4575            // After having a receipt already in static files, attempt to skip the next receipt by
4576            // changing the prune mode. It should NOT skip it and should still write the receipt,
4577            // since static files do not support gaps.
4578            let factory = factory.with_prune_modes(PruneModes {
4579                receipts: Some(PruneMode::Before(100)),
4580                ..Default::default()
4581            });
4582            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4583            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4584            write_receipts(provider_rw, 3);
4585
4586            // Ensure we can only fetch the 2 last receipts.
4587            //
4588            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4589            // to the block number it belongs to easily identify and assert.
4590            let provider = factory.provider().unwrap();
4591            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4592            for (num, has_receipt) in [(0, false), (1, false), (2, true), (3, true)] {
4593                let receipts = provider.receipts_by_block(num.into()).unwrap();
4594                if has_receipt {
4595                    assert!(receipts.is_some_and(|r| r.len() == 1));
4596                } else {
4597                    assert!(receipts.is_none());
4598                }
4599
4600                let receipt = provider.receipt(num).unwrap();
4601                if has_receipt {
4602                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4603                } else {
4604                    assert!(receipt.is_none());
4605                }
4606            }
4607        }
4608    }
4609
4610    #[test]
4611    fn test_try_into_history_rejects_unexecuted_blocks() {
4612        use reth_storage_api::TryIntoHistoricalStateProvider;
4613
4614        let factory = create_test_provider_factory();
4615
4616        // Insert genesis block to have some data
4617        let data = BlockchainTestData::default();
4618        let provider_rw = factory.provider_rw().unwrap();
4619        provider_rw.insert_block(&data.genesis.try_recover().unwrap()).unwrap();
4620        provider_rw
4621            .write_state(
4622                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
4623                crate::OriginalValuesKnown::No,
4624                StateWriteConfig::default(),
4625            )
4626            .unwrap();
4627        provider_rw.commit().unwrap();
4628
4629        // Get a fresh provider - Execution checkpoint is 0, no receipts written beyond genesis
4630        let provider = factory.provider().unwrap();
4631
4632        // Requesting historical state for block 0 (executed) should succeed
4633        let result = provider.try_into_history_at_block(0);
4634        assert!(result.is_ok(), "Block 0 should be available");
4635
4636        // Get another provider and request state for block 100 (not executed)
4637        let provider = factory.provider().unwrap();
4638        let result = provider.try_into_history_at_block(100);
4639
4640        // Should fail with BlockNotExecuted error
4641        match result {
4642            Err(ProviderError::BlockNotExecuted { requested: 100, .. }) => {}
4643            Err(e) => panic!("Expected BlockNotExecuted error, got: {e:?}"),
4644            Ok(_) => panic!("Expected error, got Ok"),
4645        }
4646    }
4647
4648    #[test]
4649    fn test_unwind_storage_hashing_with_hashed_state() {
4650        let factory = create_test_provider_factory();
4651        let storage_settings = StorageSettings::v2();
4652        factory.set_storage_settings_cache(storage_settings);
4653
4654        let address = Address::random();
4655        let hashed_address = keccak256(address);
4656
4657        let plain_slot = B256::random();
4658        let hashed_slot = keccak256(plain_slot);
4659
4660        let current_value = U256::from(100);
4661        let old_value = U256::from(42);
4662
4663        let provider_rw = factory.provider_rw().unwrap();
4664        provider_rw
4665            .tx
4666            .cursor_dup_write::<tables::HashedStorages>()
4667            .unwrap()
4668            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4669            .unwrap();
4670
4671        let changesets = vec![(
4672            BlockNumberAddress((1, address)),
4673            StorageEntry { key: plain_slot, value: old_value },
4674        )];
4675
4676        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4677
4678        assert_eq!(result.len(), 1);
4679        assert!(result.contains_key(&hashed_address));
4680        assert!(result[&hashed_address].contains(&hashed_slot));
4681
4682        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4683        let entry = cursor
4684            .seek_by_key_subkey(hashed_address, hashed_slot)
4685            .unwrap()
4686            .expect("entry should exist");
4687        assert_eq!(entry.key, hashed_slot);
4688        assert_eq!(entry.value, old_value);
4689    }
4690
4691    #[test]
4692    fn test_write_and_remove_state_roundtrip_legacy() {
4693        let factory = create_test_provider_factory();
4694        let storage_settings = StorageSettings::v1();
4695        assert!(!storage_settings.use_hashed_state());
4696        factory.set_storage_settings_cache(storage_settings);
4697
4698        let address = Address::with_last_byte(1);
4699        let hashed_address = keccak256(address);
4700        let slot = U256::from(5);
4701        let slot_key = B256::from(slot);
4702        let hashed_slot = keccak256(slot_key);
4703
4704        let mut rng = generators::rng();
4705        let block0 =
4706            random_block(&mut rng, 0, BlockParams { tx_count: Some(0), ..Default::default() });
4707        let block1 =
4708            random_block(&mut rng, 1, BlockParams { tx_count: Some(0), ..Default::default() });
4709
4710        {
4711            let provider_rw = factory.provider_rw().unwrap();
4712            provider_rw.insert_block(&block0.try_recover().unwrap()).unwrap();
4713            provider_rw.insert_block(&block1.try_recover().unwrap()).unwrap();
4714            provider_rw
4715                .tx
4716                .cursor_write::<tables::PlainAccountState>()
4717                .unwrap()
4718                .upsert(address, &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })
4719                .unwrap();
4720            provider_rw.commit().unwrap();
4721        }
4722
4723        let provider_rw = factory.provider_rw().unwrap();
4724
4725        let mut state_init: BundleStateInit = AddressMap::default();
4726        let mut storage_map: B256Map<(U256, U256)> = B256Map::default();
4727        storage_map.insert(slot_key, (U256::ZERO, U256::from(10)));
4728        state_init.insert(
4729            address,
4730            (
4731                Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None }),
4732                Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }),
4733                storage_map,
4734            ),
4735        );
4736
4737        let mut reverts_init: RevertsInit = HashMap::default();
4738        let mut block_reverts: AddressMap<AccountRevertInit> = AddressMap::default();
4739        block_reverts.insert(
4740            address,
4741            (
4742                Some(Some(Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None })),
4743                vec![StorageEntry { key: slot_key, value: U256::ZERO }],
4744            ),
4745        );
4746        reverts_init.insert(1, block_reverts);
4747
4748        let execution_outcome =
4749            ExecutionOutcome::new_init(state_init, reverts_init, [], vec![vec![]], 1, vec![]);
4750
4751        provider_rw
4752            .write_state(
4753                &execution_outcome,
4754                OriginalValuesKnown::Yes,
4755                StateWriteConfig {
4756                    write_receipts: false,
4757                    write_account_changesets: true,
4758                    write_storage_changesets: true,
4759                },
4760            )
4761            .unwrap();
4762
4763        let hashed_state =
4764            execution_outcome.hash_state_slow::<reth_trie::KeccakKeyHasher>().into_sorted();
4765        provider_rw.write_hashed_state(&hashed_state).unwrap();
4766
4767        let account = provider_rw
4768            .tx
4769            .cursor_read::<tables::PlainAccountState>()
4770            .unwrap()
4771            .seek_exact(address)
4772            .unwrap()
4773            .unwrap()
4774            .1;
4775        assert_eq!(account.nonce, 1);
4776
4777        let storage_entry = provider_rw
4778            .tx
4779            .cursor_dup_read::<tables::PlainStorageState>()
4780            .unwrap()
4781            .seek_by_key_subkey(address, slot_key)
4782            .unwrap()
4783            .unwrap();
4784        assert_eq!(storage_entry.key, slot_key);
4785        assert_eq!(storage_entry.value, U256::from(10));
4786
4787        let hashed_entry = provider_rw
4788            .tx
4789            .cursor_dup_read::<tables::HashedStorages>()
4790            .unwrap()
4791            .seek_by_key_subkey(hashed_address, hashed_slot)
4792            .unwrap()
4793            .unwrap();
4794        assert_eq!(hashed_entry.key, hashed_slot);
4795        assert_eq!(hashed_entry.value, U256::from(10));
4796
4797        let account_cs_entries = provider_rw
4798            .tx
4799            .cursor_dup_read::<tables::AccountChangeSets>()
4800            .unwrap()
4801            .walk(Some(1))
4802            .unwrap()
4803            .collect::<Result<Vec<_>, _>>()
4804            .unwrap();
4805        assert!(!account_cs_entries.is_empty());
4806
4807        let storage_cs_entries = provider_rw
4808            .tx
4809            .cursor_read::<tables::StorageChangeSets>()
4810            .unwrap()
4811            .walk(Some(BlockNumberAddress((1, address))))
4812            .unwrap()
4813            .collect::<Result<Vec<_>, _>>()
4814            .unwrap();
4815        assert!(!storage_cs_entries.is_empty());
4816        assert_eq!(storage_cs_entries[0].1.key, slot_key);
4817
4818        provider_rw.remove_state_above(0).unwrap();
4819
4820        let restored_account = provider_rw
4821            .tx
4822            .cursor_read::<tables::PlainAccountState>()
4823            .unwrap()
4824            .seek_exact(address)
4825            .unwrap()
4826            .unwrap()
4827            .1;
4828        assert_eq!(restored_account.nonce, 0);
4829
4830        let storage_gone = provider_rw
4831            .tx
4832            .cursor_dup_read::<tables::PlainStorageState>()
4833            .unwrap()
4834            .seek_by_key_subkey(address, slot_key)
4835            .unwrap();
4836        assert!(storage_gone.is_none() || storage_gone.unwrap().key != slot_key);
4837
4838        let account_cs_after = provider_rw
4839            .tx
4840            .cursor_dup_read::<tables::AccountChangeSets>()
4841            .unwrap()
4842            .walk(Some(1))
4843            .unwrap()
4844            .collect::<Result<Vec<_>, _>>()
4845            .unwrap();
4846        assert!(account_cs_after.is_empty());
4847
4848        let storage_cs_after = provider_rw
4849            .tx
4850            .cursor_read::<tables::StorageChangeSets>()
4851            .unwrap()
4852            .walk(Some(BlockNumberAddress((1, address))))
4853            .unwrap()
4854            .collect::<Result<Vec<_>, _>>()
4855            .unwrap();
4856        assert!(storage_cs_after.is_empty());
4857    }
4858
4859    #[test]
4860    fn test_unwind_storage_hashing_legacy() {
4861        let factory = create_test_provider_factory();
4862        let storage_settings = StorageSettings::v1();
4863        assert!(!storage_settings.use_hashed_state());
4864        factory.set_storage_settings_cache(storage_settings);
4865
4866        let address = Address::random();
4867        let hashed_address = keccak256(address);
4868
4869        let plain_slot = B256::random();
4870        let hashed_slot = keccak256(plain_slot);
4871
4872        let current_value = U256::from(100);
4873        let old_value = U256::from(42);
4874
4875        let provider_rw = factory.provider_rw().unwrap();
4876        provider_rw
4877            .tx
4878            .cursor_dup_write::<tables::HashedStorages>()
4879            .unwrap()
4880            .upsert(hashed_address, &StorageEntry { key: hashed_slot, value: current_value })
4881            .unwrap();
4882
4883        let changesets = vec![(
4884            BlockNumberAddress((1, address)),
4885            StorageEntry { key: plain_slot, value: old_value },
4886        )];
4887
4888        let result = provider_rw.unwind_storage_hashing(changesets.into_iter()).unwrap();
4889
4890        assert_eq!(result.len(), 1);
4891        assert!(result.contains_key(&hashed_address));
4892        assert!(result[&hashed_address].contains(&hashed_slot));
4893
4894        let mut cursor = provider_rw.tx.cursor_dup_read::<tables::HashedStorages>().unwrap();
4895        let entry = cursor
4896            .seek_by_key_subkey(hashed_address, hashed_slot)
4897            .unwrap()
4898            .expect("entry should exist");
4899        assert_eq!(entry.key, hashed_slot);
4900        assert_eq!(entry.value, old_value);
4901    }
4902
4903    #[test]
4904    fn test_write_state_and_historical_read_hashed() {
4905        use reth_storage_api::StateProvider;
4906        use reth_trie::{HashedPostState, KeccakKeyHasher};
4907        use revm_database::BundleState;
4908        use revm_state::AccountInfo;
4909
4910        let factory = create_test_provider_factory();
4911        factory.set_storage_settings_cache(StorageSettings::v2());
4912
4913        let address = Address::with_last_byte(1);
4914        let slot = U256::from(5);
4915        let slot_key = B256::from(slot);
4916        let hashed_address = keccak256(address);
4917        let hashed_slot = keccak256(slot_key);
4918
4919        {
4920            let sf = factory.static_file_provider();
4921            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
4922            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
4923            hw.append_header(&h0, &B256::ZERO).unwrap();
4924            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
4925            hw.append_header(&h1, &B256::ZERO).unwrap();
4926            hw.commit().unwrap();
4927
4928            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
4929            aw.append_account_changeset(vec![], 0).unwrap();
4930            aw.commit().unwrap();
4931
4932            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
4933            sw.append_storage_changeset(vec![], 0).unwrap();
4934            sw.commit().unwrap();
4935        }
4936
4937        let provider_rw = factory.provider_rw().unwrap();
4938
4939        let bundle = BundleState::builder(1..=1)
4940            .state_present_account_info(
4941                address,
4942                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
4943            )
4944            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
4945            .revert_account_info(1, address, Some(None))
4946            .revert_storage(1, address, vec![(slot, U256::ZERO)])
4947            .build();
4948
4949        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
4950
4951        provider_rw
4952            .tx
4953            .put::<tables::BlockBodyIndices>(
4954                1,
4955                StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
4956            )
4957            .unwrap();
4958
4959        provider_rw
4960            .write_state(
4961                &execution_outcome,
4962                OriginalValuesKnown::Yes,
4963                StateWriteConfig {
4964                    write_receipts: false,
4965                    write_account_changesets: true,
4966                    write_storage_changesets: true,
4967                },
4968            )
4969            .unwrap();
4970
4971        let hashed_state =
4972            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
4973        provider_rw.write_hashed_state(&hashed_state).unwrap();
4974
4975        let plain_storage_entries = provider_rw
4976            .tx
4977            .cursor_dup_read::<tables::PlainStorageState>()
4978            .unwrap()
4979            .walk(None)
4980            .unwrap()
4981            .collect::<Result<Vec<_>, _>>()
4982            .unwrap();
4983        assert!(plain_storage_entries.is_empty());
4984
4985        let hashed_entry = provider_rw
4986            .tx
4987            .cursor_dup_read::<tables::HashedStorages>()
4988            .unwrap()
4989            .seek_by_key_subkey(hashed_address, hashed_slot)
4990            .unwrap()
4991            .unwrap();
4992        assert_eq!(hashed_entry.key, hashed_slot);
4993        assert_eq!(hashed_entry.value, U256::from(10));
4994
4995        provider_rw.static_file_provider().commit().unwrap();
4996
4997        let sf = factory.static_file_provider();
4998        let storage_cs = sf.storage_changeset(1).unwrap();
4999        assert!(!storage_cs.is_empty());
5000        assert_eq!(storage_cs[0].1.key, slot_key);
5001
5002        let account_cs = sf.account_block_changeset(1).unwrap();
5003        assert!(!account_cs.is_empty());
5004        assert_eq!(account_cs[0].address, address);
5005
5006        let historical_value =
5007            HistoricalStateProviderRef::new(&*provider_rw, 0, ChangesetCache::new())
5008                .storage(address, slot_key)
5009                .unwrap();
5010        assert_eq!(historical_value, None);
5011    }
5012
5013    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
5014    enum StorageMode {
5015        V1,
5016        V2,
5017    }
5018
5019    fn run_save_blocks_and_verify(mode: StorageMode) {
5020        use alloy_primitives::map::{FbBuildHasher, HashMap};
5021
5022        let factory = create_test_provider_factory();
5023
5024        match mode {
5025            StorageMode::V1 => factory.set_storage_settings_cache(StorageSettings::v1()),
5026            StorageMode::V2 => factory.set_storage_settings_cache(StorageSettings::v2()),
5027        }
5028
5029        let num_blocks = 3u64;
5030        let accounts_per_block = 5usize;
5031        let slots_per_account = 3usize;
5032
5033        let genesis = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
5034            SealedHeader::new(
5035                Header { number: 0, difficulty: U256::from(1), ..Default::default() },
5036                B256::ZERO,
5037            ),
5038            Default::default(),
5039        );
5040
5041        let genesis_executed = ExecutedBlock::new(
5042            Arc::new(genesis.try_recover().unwrap()),
5043            Arc::new(BlockExecutionOutput {
5044                result: BlockExecutionResult {
5045                    receipts: vec![],
5046                    requests: Default::default(),
5047                    gas_used: 0,
5048                    blob_gas_used: 0,
5049                },
5050                state: Default::default(),
5051            }),
5052            ComputedTrieData::default(),
5053        );
5054        let provider_rw = factory.provider_rw().unwrap();
5055        provider_rw.save_blocks(vec![genesis_executed], SaveBlocksMode::Full).unwrap();
5056        provider_rw.commit().unwrap();
5057
5058        let mut blocks: Vec<ExecutedBlock> = Vec::new();
5059        let mut parent_hash = B256::ZERO;
5060
5061        for block_num in 1..=num_blocks {
5062            let mut builder = BundleState::builder(block_num..=block_num);
5063
5064            for acct_idx in 0..accounts_per_block {
5065                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5066                let info = AccountInfo {
5067                    nonce: block_num,
5068                    balance: U256::from(block_num * 100 + acct_idx as u64),
5069                    ..Default::default()
5070                };
5071
5072                let storage: HashMap<U256, (U256, U256), FbBuildHasher<32>> = (1..=
5073                    slots_per_account as u64)
5074                    .map(|s| {
5075                        (
5076                            U256::from(s + acct_idx as u64 * 100),
5077                            (U256::ZERO, U256::from(block_num * 1000 + s)),
5078                        )
5079                    })
5080                    .collect();
5081
5082                let revert_storage: Vec<(U256, U256)> = (1..=slots_per_account as u64)
5083                    .map(|s| (U256::from(s + acct_idx as u64 * 100), U256::ZERO))
5084                    .collect();
5085
5086                builder = builder
5087                    .state_present_account_info(address, info)
5088                    .revert_account_info(block_num, address, Some(None))
5089                    .state_storage(address, storage)
5090                    .revert_storage(block_num, address, revert_storage);
5091            }
5092
5093            let bundle = builder.build();
5094
5095            let hashed_state =
5096                HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5097
5098            let header = Header {
5099                number: block_num,
5100                parent_hash,
5101                difficulty: U256::from(1),
5102                ..Default::default()
5103            };
5104            let block = SealedBlock::<reth_ethereum_primitives::Block>::seal_parts(
5105                header,
5106                Default::default(),
5107            );
5108            parent_hash = block.hash();
5109
5110            let executed = ExecutedBlock::new(
5111                Arc::new(block.try_recover().unwrap()),
5112                Arc::new(BlockExecutionOutput {
5113                    result: BlockExecutionResult {
5114                        receipts: vec![],
5115                        requests: Default::default(),
5116                        gas_used: 0,
5117                        blob_gas_used: 0,
5118                    },
5119                    state: bundle,
5120                }),
5121                ComputedTrieData { hashed_state: Arc::new(hashed_state), ..Default::default() },
5122            );
5123            blocks.push(executed);
5124        }
5125
5126        let provider_rw = factory.provider_rw().unwrap();
5127        provider_rw.save_blocks(blocks, SaveBlocksMode::Full).unwrap();
5128        provider_rw.commit().unwrap();
5129
5130        let provider = factory.provider().unwrap();
5131
5132        for block_num in 1..=num_blocks {
5133            for acct_idx in 0..accounts_per_block {
5134                let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5135                let hashed_address = keccak256(address);
5136
5137                let ha_entry = provider
5138                    .tx_ref()
5139                    .cursor_read::<tables::HashedAccounts>()
5140                    .unwrap()
5141                    .seek_exact(hashed_address)
5142                    .unwrap();
5143                assert!(
5144                    ha_entry.is_some(),
5145                    "HashedAccounts missing for block {block_num} acct {acct_idx}"
5146                );
5147
5148                for s in 1..=slots_per_account as u64 {
5149                    let slot = U256::from(s + acct_idx as u64 * 100);
5150                    let slot_key = B256::from(slot);
5151                    let hashed_slot = keccak256(slot_key);
5152
5153                    let hs_entry = provider
5154                        .tx_ref()
5155                        .cursor_dup_read::<tables::HashedStorages>()
5156                        .unwrap()
5157                        .seek_by_key_subkey(hashed_address, hashed_slot)
5158                        .unwrap();
5159                    assert!(
5160                        hs_entry.is_some(),
5161                        "HashedStorages missing for block {block_num} acct {acct_idx} slot {s}"
5162                    );
5163                    let entry = hs_entry.unwrap();
5164                    assert_eq!(entry.key, hashed_slot);
5165                    assert_eq!(entry.value, U256::from(block_num * 1000 + s));
5166                }
5167            }
5168        }
5169
5170        for block_num in 1..=num_blocks {
5171            let header = provider.header_by_number(block_num).unwrap();
5172            assert!(header.is_some(), "Header missing for block {block_num}");
5173
5174            let indices = provider.block_body_indices(block_num).unwrap();
5175            assert!(indices.is_some(), "BlockBodyIndices missing for block {block_num}");
5176        }
5177
5178        let plain_accounts = provider.tx_ref().entries::<tables::PlainAccountState>().unwrap();
5179        let plain_storage = provider.tx_ref().entries::<tables::PlainStorageState>().unwrap();
5180
5181        if mode == StorageMode::V2 {
5182            assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5183            assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5184
5185            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5186            assert_eq!(mdbx_account_cs, 0, "v2: AccountChangeSets in MDBX should be empty");
5187
5188            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5189            assert_eq!(mdbx_storage_cs, 0, "v2: StorageChangeSets in MDBX should be empty");
5190
5191            provider.static_file_provider().commit().unwrap();
5192            let sf = factory.static_file_provider();
5193
5194            for block_num in 1..=num_blocks {
5195                let account_cs = sf.account_block_changeset(block_num).unwrap();
5196                assert!(
5197                    !account_cs.is_empty(),
5198                    "v2: static file AccountChangeSets should exist for block {block_num}"
5199                );
5200
5201                let storage_cs = sf.storage_changeset(block_num).unwrap();
5202                assert!(
5203                    !storage_cs.is_empty(),
5204                    "v2: static file StorageChangeSets should exist for block {block_num}"
5205                );
5206
5207                for (_, entry) in &storage_cs {
5208                    assert!(
5209                        entry.key != keccak256(entry.key),
5210                        "v2: static file storage changeset should have plain slot keys"
5211                    );
5212                }
5213            }
5214
5215            let rocksdb = factory.rocksdb_provider();
5216            for block_num in 1..=num_blocks {
5217                for acct_idx in 0..accounts_per_block {
5218                    let address = Address::with_last_byte((block_num * 10 + acct_idx as u64) as u8);
5219                    let shards = rocksdb.account_history_shards(address).unwrap();
5220                    assert!(
5221                        !shards.is_empty(),
5222                        "v2: RocksDB AccountsHistory missing for block {block_num} acct {acct_idx}"
5223                    );
5224
5225                    for s in 1..=slots_per_account as u64 {
5226                        let slot = U256::from(s + acct_idx as u64 * 100);
5227                        let slot_key = B256::from(slot);
5228                        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5229                        assert!(
5230                            !shards.is_empty(),
5231                            "v2: RocksDB StoragesHistory missing for block {block_num} acct {acct_idx} slot {s}"
5232                        );
5233                    }
5234                }
5235            }
5236        } else {
5237            assert!(plain_accounts > 0, "v1: PlainAccountState should not be empty");
5238            assert!(plain_storage > 0, "v1: PlainStorageState should not be empty");
5239
5240            let mdbx_account_cs = provider.tx_ref().entries::<tables::AccountChangeSets>().unwrap();
5241            assert!(mdbx_account_cs > 0, "v1: AccountChangeSets in MDBX should not be empty");
5242
5243            let mdbx_storage_cs = provider.tx_ref().entries::<tables::StorageChangeSets>().unwrap();
5244            assert!(mdbx_storage_cs > 0, "v1: StorageChangeSets in MDBX should not be empty");
5245
5246            for block_num in 1..=num_blocks {
5247                let storage_entries: Vec<_> = provider
5248                    .tx_ref()
5249                    .cursor_dup_read::<tables::StorageChangeSets>()
5250                    .unwrap()
5251                    .walk_range(BlockNumberAddress::range(block_num..=block_num))
5252                    .unwrap()
5253                    .collect::<Result<Vec<_>, _>>()
5254                    .unwrap();
5255                assert!(
5256                    !storage_entries.is_empty(),
5257                    "v1: MDBX StorageChangeSets should have entries for block {block_num}"
5258                );
5259
5260                for (_, entry) in &storage_entries {
5261                    let slot_key = B256::from(entry.key);
5262                    assert!(
5263                        slot_key != keccak256(slot_key),
5264                        "v1: storage changeset keys should be plain (not hashed)"
5265                    );
5266                }
5267            }
5268
5269            let mdbx_account_history =
5270                provider.tx_ref().entries::<tables::AccountsHistory>().unwrap();
5271            assert!(mdbx_account_history > 0, "v1: AccountsHistory in MDBX should not be empty");
5272
5273            let mdbx_storage_history =
5274                provider.tx_ref().entries::<tables::StoragesHistory>().unwrap();
5275            assert!(mdbx_storage_history > 0, "v1: StoragesHistory in MDBX should not be empty");
5276        }
5277    }
5278
5279    #[test]
5280    fn test_save_blocks_v1_table_assertions() {
5281        run_save_blocks_and_verify(StorageMode::V1);
5282    }
5283
5284    #[test]
5285    fn test_save_blocks_v2_table_assertions() {
5286        run_save_blocks_and_verify(StorageMode::V2);
5287    }
5288
5289    #[test]
5290    fn test_write_and_remove_state_roundtrip_v2() {
5291        let factory = create_test_provider_factory();
5292        let storage_settings = StorageSettings::v2();
5293        assert!(storage_settings.use_hashed_state());
5294        factory.set_storage_settings_cache(storage_settings);
5295
5296        let address = Address::with_last_byte(1);
5297        let hashed_address = keccak256(address);
5298        let slot = U256::from(5);
5299        let slot_key = B256::from(slot);
5300        let hashed_slot = keccak256(slot_key);
5301
5302        {
5303            let sf = factory.static_file_provider();
5304            let mut hw = sf.latest_writer(StaticFileSegment::Headers).unwrap();
5305            let h0 = alloy_consensus::Header { number: 0, ..Default::default() };
5306            hw.append_header(&h0, &B256::ZERO).unwrap();
5307            let h1 = alloy_consensus::Header { number: 1, ..Default::default() };
5308            hw.append_header(&h1, &B256::ZERO).unwrap();
5309            hw.commit().unwrap();
5310
5311            let mut aw = sf.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
5312            aw.append_account_changeset(vec![], 0).unwrap();
5313            aw.commit().unwrap();
5314
5315            let mut sw = sf.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
5316            sw.append_storage_changeset(vec![], 0).unwrap();
5317            sw.commit().unwrap();
5318        }
5319
5320        {
5321            let provider_rw = factory.provider_rw().unwrap();
5322            provider_rw
5323                .tx
5324                .put::<tables::BlockBodyIndices>(
5325                    0,
5326                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5327                )
5328                .unwrap();
5329            provider_rw
5330                .tx
5331                .put::<tables::BlockBodyIndices>(
5332                    1,
5333                    StoredBlockBodyIndices { first_tx_num: 0, tx_count: 0 },
5334                )
5335                .unwrap();
5336            provider_rw
5337                .tx
5338                .cursor_write::<tables::HashedAccounts>()
5339                .unwrap()
5340                .upsert(
5341                    hashed_address,
5342                    &Account { nonce: 0, balance: U256::ZERO, bytecode_hash: None },
5343                )
5344                .unwrap();
5345            provider_rw.commit().unwrap();
5346        }
5347
5348        let provider_rw = factory.provider_rw().unwrap();
5349
5350        let bundle = BundleState::builder(1..=1)
5351            .state_present_account_info(
5352                address,
5353                AccountInfo { nonce: 1, balance: U256::from(10), ..Default::default() },
5354            )
5355            .state_storage(address, HashMap::from_iter([(slot, (U256::ZERO, U256::from(10)))]))
5356            .revert_account_info(1, address, Some(None))
5357            .revert_storage(1, address, vec![(slot, U256::ZERO)])
5358            .build();
5359
5360        let execution_outcome = ExecutionOutcome::new(bundle.clone(), vec![vec![]], 1, Vec::new());
5361
5362        provider_rw
5363            .write_state(
5364                &execution_outcome,
5365                OriginalValuesKnown::Yes,
5366                StateWriteConfig {
5367                    write_receipts: false,
5368                    write_account_changesets: true,
5369                    write_storage_changesets: true,
5370                },
5371            )
5372            .unwrap();
5373
5374        let hashed_state =
5375            HashedPostState::from_bundle_state::<KeccakKeyHasher>(bundle.state()).into_sorted();
5376        provider_rw.write_hashed_state(&hashed_state).unwrap();
5377
5378        let hashed_account = provider_rw
5379            .tx
5380            .cursor_read::<tables::HashedAccounts>()
5381            .unwrap()
5382            .seek_exact(hashed_address)
5383            .unwrap()
5384            .unwrap()
5385            .1;
5386        assert_eq!(hashed_account.nonce, 1);
5387
5388        let hashed_entry = provider_rw
5389            .tx
5390            .cursor_dup_read::<tables::HashedStorages>()
5391            .unwrap()
5392            .seek_by_key_subkey(hashed_address, hashed_slot)
5393            .unwrap()
5394            .unwrap();
5395        assert_eq!(hashed_entry.key, hashed_slot);
5396        assert_eq!(hashed_entry.value, U256::from(10));
5397
5398        let plain_accounts = provider_rw.tx.entries::<tables::PlainAccountState>().unwrap();
5399        assert_eq!(plain_accounts, 0, "v2: PlainAccountState should be empty");
5400
5401        let plain_storage = provider_rw.tx.entries::<tables::PlainStorageState>().unwrap();
5402        assert_eq!(plain_storage, 0, "v2: PlainStorageState should be empty");
5403
5404        provider_rw.static_file_provider().commit().unwrap();
5405
5406        let sf = factory.static_file_provider();
5407        let storage_cs = sf.storage_changeset(1).unwrap();
5408        assert!(!storage_cs.is_empty(), "v2: storage changesets should be in static files");
5409        assert_eq!(storage_cs[0].1.key, slot_key, "v2: changeset key should be plain");
5410
5411        provider_rw.remove_state_above(0).unwrap();
5412
5413        let restored_account = provider_rw
5414            .tx
5415            .cursor_read::<tables::HashedAccounts>()
5416            .unwrap()
5417            .seek_exact(hashed_address)
5418            .unwrap();
5419        assert!(
5420            restored_account.is_none(),
5421            "v2: account should be removed (didn't exist before block 1)"
5422        );
5423
5424        let storage_gone = provider_rw
5425            .tx
5426            .cursor_dup_read::<tables::HashedStorages>()
5427            .unwrap()
5428            .seek_by_key_subkey(hashed_address, hashed_slot)
5429            .unwrap();
5430        assert!(
5431            storage_gone.is_none() || storage_gone.unwrap().key != hashed_slot,
5432            "v2: storage should be reverted (removed or different key)"
5433        );
5434
5435        let mdbx_storage_cs = provider_rw.tx.entries::<tables::StorageChangeSets>().unwrap();
5436        assert_eq!(mdbx_storage_cs, 0, "v2: MDBX StorageChangeSets should remain empty");
5437
5438        let mdbx_account_cs = provider_rw.tx.entries::<tables::AccountChangeSets>().unwrap();
5439        assert_eq!(mdbx_account_cs, 0, "v2: MDBX AccountChangeSets should remain empty");
5440    }
5441
5442    #[test]
5443    fn test_unwind_storage_history_indices_v2() {
5444        let factory = create_test_provider_factory();
5445        factory.set_storage_settings_cache(StorageSettings::v2());
5446
5447        let address = Address::with_last_byte(1);
5448        let slot_key = B256::from(U256::from(42));
5449
5450        {
5451            let rocksdb = factory.rocksdb_provider();
5452            let mut batch = rocksdb.batch();
5453            batch.append_storage_history_shard(address, slot_key, vec![3u64, 7, 10]).unwrap();
5454            batch.commit().unwrap();
5455
5456            let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5457            assert!(!shards.is_empty(), "history should be written to rocksdb");
5458        }
5459
5460        let provider_rw = factory.provider_rw().unwrap();
5461
5462        let changesets = vec![
5463            (
5464                BlockNumberAddress((7, address)),
5465                StorageEntry { key: slot_key, value: U256::from(5) },
5466            ),
5467            (
5468                BlockNumberAddress((10, address)),
5469                StorageEntry { key: slot_key, value: U256::from(8) },
5470            ),
5471        ];
5472
5473        let count = provider_rw.unwind_storage_history_indices(changesets.into_iter()).unwrap();
5474        assert_eq!(count, 2);
5475
5476        provider_rw.commit().unwrap();
5477
5478        let rocksdb = factory.rocksdb_provider();
5479        let shards = rocksdb.storage_history_shards(address, slot_key).unwrap();
5480
5481        assert!(
5482            !shards.is_empty(),
5483            "history shards should still exist with block 3 after partial unwind"
5484        );
5485
5486        let all_blocks: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
5487        assert!(all_blocks.contains(&3), "block 3 should remain");
5488        assert!(!all_blocks.contains(&7), "block 7 should be unwound");
5489        assert!(!all_blocks.contains(&10), "block 10 should be unwound");
5490    }
5491}