reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::{
3        storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4    },
5    providers::{
6        database::{chain::ChainStorage, metrics},
7        static_file::StaticFileWriter,
8        NodeTypesForProvider, StaticFileProvider,
9    },
10    to_range,
11    traits::{
12        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
13    },
14    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
15    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
16    DBProvider, EitherReader, EitherWriter, EitherWriterDestination, HashingWriter, HeaderProvider,
17    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
18    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
19    PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
20    StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader,
21    StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
22    TrieReader, TrieWriter,
23};
24use alloy_consensus::{
25    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
26    BlockHeader, TxReceipt,
27};
28use alloy_eips::BlockHashOrNumber;
29use alloy_primitives::{
30    keccak256,
31    map::{hash_map, B256Map, HashMap, HashSet},
32    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
33};
34use itertools::Itertools;
35use parking_lot::RwLock;
36use rayon::slice::ParallelSliceMut;
37use reth_chain_state::ExecutedBlock;
38use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
39use reth_db_api::{
40    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
41    database::Database,
42    models::{
43        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
44        BlockNumberHashedAddress, ShardedKey, StorageSettings, StoredBlockBodyIndices,
45    },
46    table::Table,
47    tables,
48    transaction::{DbTx, DbTxMut},
49    BlockNumberList, PlainAccountState, PlainStorageState,
50};
51use reth_execution_types::{Chain, ExecutionOutcome};
52use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
53use reth_primitives_traits::{
54    Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
55};
56use reth_prune_types::{
57    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
58};
59use reth_stages_types::{StageCheckpoint, StageId};
60use reth_static_file_types::StaticFileSegment;
61use reth_storage_api::{
62    BlockBodyIndicesProvider, BlockBodyReader, MetadataProvider, MetadataWriter,
63    NodePrimitivesProvider, StateProvider, StorageChangeSetReader, StorageSettingsCache,
64    TryIntoHistoricalStateProvider,
65};
66use reth_storage_errors::provider::ProviderResult;
67use reth_trie::{
68    trie_cursor::{
69        InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
70        TrieCursorIter,
71    },
72    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
73    HashedPostStateSorted, StoredNibbles, StoredNibblesSubKey, TrieChangeSetsEntry,
74};
75use reth_trie_db::{
76    DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
77};
78use revm_database::states::{
79    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
80};
81use std::{
82    cmp::Ordering,
83    collections::{BTreeMap, BTreeSet},
84    fmt::Debug,
85    ops::{Deref, DerefMut, Range, RangeBounds, RangeFrom, RangeInclusive},
86    sync::Arc,
87};
88use tracing::{debug, trace};
89
90/// A [`DatabaseProvider`] that holds a read-only database transaction.
91pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
92
93/// A [`DatabaseProvider`] that holds a read-write database transaction.
94///
95/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
96/// Once that issue is solved, we can probably revert back to being an alias type.
97#[derive(Debug)]
98pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
99    pub DatabaseProvider<<DB as Database>::TXMut, N>,
100);
101
102impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
103    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
104
105    fn deref(&self) -> &Self::Target {
106        &self.0
107    }
108}
109
110impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
111    fn deref_mut(&mut self) -> &mut Self::Target {
112        &mut self.0
113    }
114}
115
116impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
117    for DatabaseProviderRW<DB, N>
118{
119    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
120        &self.0
121    }
122}
123
124impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
125    /// Commit database transaction and static file if it exists.
126    pub fn commit(self) -> ProviderResult<bool> {
127        self.0.commit()
128    }
129
130    /// Consume `DbTx` or `DbTxMut`.
131    pub fn into_tx(self) -> <DB as Database>::TXMut {
132        self.0.into_tx()
133    }
134
135    /// Override the minimum pruning distance for testing purposes.
136    #[cfg(any(test, feature = "test-utils"))]
137    pub const fn with_minimum_pruning_distance(mut self, distance: u64) -> Self {
138        self.0.minimum_pruning_distance = distance;
139        self
140    }
141}
142
143impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
144    for DatabaseProvider<<DB as Database>::TXMut, N>
145{
146    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
147        provider.0
148    }
149}
150
151/// A provider struct that fetches data from the database.
152/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
153#[derive(Debug)]
154pub struct DatabaseProvider<TX, N: NodeTypes> {
155    /// Database transaction.
156    tx: TX,
157    /// Chain spec
158    chain_spec: Arc<N::ChainSpec>,
159    /// Static File provider
160    static_file_provider: StaticFileProvider<N::Primitives>,
161    /// Pruning configuration
162    prune_modes: PruneModes,
163    /// Node storage handler.
164    storage: Arc<N::Storage>,
165    /// Storage configuration settings for this node
166    storage_settings: Arc<RwLock<StorageSettings>>,
167    /// Minimum distance from tip required for pruning
168    minimum_pruning_distance: u64,
169}
170
171impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
172    /// Returns reference to prune modes.
173    pub const fn prune_modes_ref(&self) -> &PruneModes {
174        &self.prune_modes
175    }
176}
177
178impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
179    /// State provider for latest state
180    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
181        trace!(target: "providers::db", "Returning latest state provider");
182        Box::new(LatestStateProviderRef::new(self))
183    }
184
185    /// Storage provider for state at that given block hash
186    pub fn history_by_block_hash<'a>(
187        &'a self,
188        block_hash: BlockHash,
189    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
190        let mut block_number =
191            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
192        if block_number == self.best_block_number().unwrap_or_default() &&
193            block_number == self.last_block_number().unwrap_or_default()
194        {
195            return Ok(Box::new(LatestStateProviderRef::new(self)))
196        }
197
198        // +1 as the changeset that we want is the one that was applied after this block.
199        block_number += 1;
200
201        let account_history_prune_checkpoint =
202            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
203        let storage_history_prune_checkpoint =
204            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
205
206        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
207
208        // If we pruned account or storage history, we can't return state on every historical block.
209        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
210        if let Some(prune_checkpoint_block_number) =
211            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
212        {
213            state_provider = state_provider.with_lowest_available_account_history_block_number(
214                prune_checkpoint_block_number + 1,
215            );
216        }
217        if let Some(prune_checkpoint_block_number) =
218            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
219        {
220            state_provider = state_provider.with_lowest_available_storage_history_block_number(
221                prune_checkpoint_block_number + 1,
222            );
223        }
224
225        Ok(Box::new(state_provider))
226    }
227
228    #[cfg(feature = "test-utils")]
229    /// Sets the prune modes for provider.
230    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
231        self.prune_modes = prune_modes;
232    }
233}
234
235impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
236    type Primitives = N::Primitives;
237}
238
239impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
240    /// Returns a static file provider
241    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
242        self.static_file_provider.clone()
243    }
244
245    fn get_static_file_writer(
246        &self,
247        block: BlockNumber,
248        segment: StaticFileSegment,
249    ) -> ProviderResult<crate::providers::StaticFileProviderRWRefMut<'_, Self::Primitives>> {
250        self.static_file_provider.get_writer(block, segment)
251    }
252}
253
254impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
255    for DatabaseProvider<TX, N>
256{
257    type ChainSpec = N::ChainSpec;
258
259    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
260        self.chain_spec.clone()
261    }
262}
263
264impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
265    /// Creates a provider with an inner read-write transaction.
266    pub const fn new_rw(
267        tx: TX,
268        chain_spec: Arc<N::ChainSpec>,
269        static_file_provider: StaticFileProvider<N::Primitives>,
270        prune_modes: PruneModes,
271        storage: Arc<N::Storage>,
272        storage_settings: Arc<RwLock<StorageSettings>>,
273    ) -> Self {
274        Self {
275            tx,
276            chain_spec,
277            static_file_provider,
278            prune_modes,
279            storage,
280            storage_settings,
281            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
282        }
283    }
284}
285
286impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
287    fn as_ref(&self) -> &Self {
288        self
289    }
290}
291
292impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
293    /// Writes executed blocks and state to storage.
294    pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
295        if blocks.is_empty() {
296            debug!(target: "providers::db", "Attempted to write empty block range");
297            return Ok(())
298        }
299
300        // NOTE: checked non-empty above
301        let first_block = blocks.first().unwrap().recovered_block();
302
303        let last_block = blocks.last().unwrap().recovered_block();
304        let first_number = first_block.number();
305        let last_block_number = last_block.number();
306
307        debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
308
309        // TODO: Do performant / batched writes for each type of object
310        // instead of a loop over all blocks,
311        // meaning:
312        //  * blocks
313        //  * state
314        //  * hashed state
315        //  * trie updates (cannot naively extend, need helper)
316        //  * indices (already done basically)
317        // Insert the blocks
318        for ExecutedBlock { recovered_block, execution_output, hashed_state, trie_updates } in
319            blocks
320        {
321            let block_number = recovered_block.number();
322            self.insert_block(Arc::unwrap_or_clone(recovered_block))?;
323
324            // Write state and changesets to the database.
325            // Must be written after blocks because of the receipt lookup.
326            self.write_state(&execution_output, OriginalValuesKnown::No)?;
327
328            // insert hashes and intermediate merkle nodes
329            self.write_hashed_state(&hashed_state)?;
330
331            self.write_trie_changesets(block_number, &trie_updates, None)?;
332            self.write_trie_updates_sorted(&trie_updates)?;
333        }
334
335        // update history indices
336        self.update_history_indices(first_number..=last_block_number)?;
337
338        // Update pipeline progress
339        self.update_pipeline_stages(last_block_number, false)?;
340
341        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
342
343        Ok(())
344    }
345
346    /// Unwinds trie state starting at and including the given block.
347    ///
348    /// This includes calculating the resulted state root and comparing it with the parent block
349    /// state root.
350    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
351        let changed_accounts = self
352            .tx
353            .cursor_read::<tables::AccountChangeSets>()?
354            .walk_range(from..)?
355            .collect::<Result<Vec<_>, _>>()?;
356
357        // Unwind account hashes.
358        self.unwind_account_hashing(changed_accounts.iter())?;
359
360        // Unwind account history indices.
361        self.unwind_account_history_indices(changed_accounts.iter())?;
362
363        let storage_start = BlockNumberAddress((from, Address::ZERO));
364        let changed_storages = self
365            .tx
366            .cursor_read::<tables::StorageChangeSets>()?
367            .walk_range(storage_start..)?
368            .collect::<Result<Vec<_>, _>>()?;
369
370        // Unwind storage hashes.
371        self.unwind_storage_hashing(changed_storages.iter().copied())?;
372
373        // Unwind storage history indices.
374        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
375
376        // Unwind accounts/storages trie tables using the revert.
377        let trie_revert = self.trie_reverts(from)?;
378        self.write_trie_updates_sorted(&trie_revert)?;
379
380        // Clear trie changesets which have been unwound.
381        self.clear_trie_changesets_from(from)?;
382
383        Ok(())
384    }
385
386    /// Removes receipts from all transactions starting with provided number (inclusive).
387    fn remove_receipts_from(
388        &self,
389        from_tx: TxNumber,
390        last_block: BlockNumber,
391    ) -> ProviderResult<()> {
392        // iterate over block body and remove receipts
393        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
394
395        if EitherWriter::receipts_destination(self).is_static_file() {
396            let static_file_receipt_num =
397                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
398
399            let to_delete = static_file_receipt_num
400                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
401                .unwrap_or_default();
402
403            self.static_file_provider
404                .latest_writer(StaticFileSegment::Receipts)?
405                .prune_receipts(to_delete, last_block)?;
406        }
407
408        Ok(())
409    }
410}
411
412impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
413    fn try_into_history_at_block(
414        self,
415        mut block_number: BlockNumber,
416    ) -> ProviderResult<StateProviderBox> {
417        // if the block number is the same as the currently best block number on disk we can use the
418        // latest state provider here
419        if block_number == self.best_block_number().unwrap_or_default() {
420            return Ok(Box::new(LatestStateProvider::new(self)))
421        }
422
423        // +1 as the changeset that we want is the one that was applied after this block.
424        block_number += 1;
425
426        let account_history_prune_checkpoint =
427            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
428        let storage_history_prune_checkpoint =
429            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
430
431        let mut state_provider = HistoricalStateProvider::new(self, block_number);
432
433        // If we pruned account or storage history, we can't return state on every historical block.
434        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
435        if let Some(prune_checkpoint_block_number) =
436            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
437        {
438            state_provider = state_provider.with_lowest_available_account_history_block_number(
439                prune_checkpoint_block_number + 1,
440            );
441        }
442        if let Some(prune_checkpoint_block_number) =
443            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
444        {
445            state_provider = state_provider.with_lowest_available_storage_history_block_number(
446                prune_checkpoint_block_number + 1,
447            );
448        }
449
450        Ok(Box::new(state_provider))
451    }
452}
453
454/// For a given key, unwind all history shards that contain block numbers at or above the given
455/// block number.
456///
457/// S - Sharded key subtype.
458/// T - Table to walk over.
459/// C - Cursor implementation.
460///
461/// This function walks the entries from the given start key and deletes all shards that belong to
462/// the key and contain block numbers at or above the given block number. Shards entirely below
463/// the block number are preserved.
464///
465/// The boundary shard (the shard that spans across the block number) is removed from the database.
466/// Any indices that are below the block number are filtered out and returned for reinsertion.
467/// The boundary shard is returned for reinsertion (if it's not empty).
468fn unwind_history_shards<S, T, C>(
469    cursor: &mut C,
470    start_key: T::Key,
471    block_number: BlockNumber,
472    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
473) -> ProviderResult<Vec<u64>>
474where
475    T: Table<Value = BlockNumberList>,
476    T::Key: AsRef<ShardedKey<S>>,
477    C: DbCursorRO<T> + DbCursorRW<T>,
478{
479    // Start from the given key and iterate through shards
480    let mut item = cursor.seek_exact(start_key)?;
481    while let Some((sharded_key, list)) = item {
482        // If the shard does not belong to the key, break.
483        if !shard_belongs_to_key(&sharded_key) {
484            break
485        }
486
487        // Always delete the current shard from the database first
488        // We'll decide later what (if anything) to reinsert
489        cursor.delete_current()?;
490
491        // Get the first (lowest) block number in this shard
492        // All block numbers in a shard are sorted in ascending order
493        let first = list.iter().next().expect("List can't be empty");
494
495        // Case 1: Entire shard is at or above the unwinding point
496        // Keep it deleted (don't return anything for reinsertion)
497        if first >= block_number {
498            item = cursor.prev()?;
499            continue
500        }
501        // Case 2: This is a boundary shard (spans across the unwinding point)
502        // The shard contains some blocks below and some at/above the unwinding point
503        else if block_number <= sharded_key.as_ref().highest_block_number {
504            // Return only the block numbers that are below the unwinding point
505            // These will be reinserted to preserve the historical data
506            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
507        }
508        // Case 3: Entire shard is below the unwinding point
509        // Return all block numbers for reinsertion (preserve entire shard)
510        return Ok(list.iter().collect::<Vec<_>>())
511    }
512
513    // No shards found or all processed
514    Ok(Vec::new())
515}
516
517impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
518    /// Creates a provider with an inner read-only transaction.
519    pub const fn new(
520        tx: TX,
521        chain_spec: Arc<N::ChainSpec>,
522        static_file_provider: StaticFileProvider<N::Primitives>,
523        prune_modes: PruneModes,
524        storage: Arc<N::Storage>,
525        storage_settings: Arc<RwLock<StorageSettings>>,
526    ) -> Self {
527        Self {
528            tx,
529            chain_spec,
530            static_file_provider,
531            prune_modes,
532            storage,
533            storage_settings,
534            minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
535        }
536    }
537
538    /// Consume `DbTx` or `DbTxMut`.
539    pub fn into_tx(self) -> TX {
540        self.tx
541    }
542
543    /// Pass `DbTx` or `DbTxMut` mutable reference.
544    pub const fn tx_mut(&mut self) -> &mut TX {
545        &mut self.tx
546    }
547
548    /// Pass `DbTx` or `DbTxMut` immutable reference.
549    pub const fn tx_ref(&self) -> &TX {
550        &self.tx
551    }
552
553    /// Returns a reference to the chain specification.
554    pub fn chain_spec(&self) -> &N::ChainSpec {
555        &self.chain_spec
556    }
557}
558
559impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
560    fn recovered_block<H, HF, B, BF>(
561        &self,
562        id: BlockHashOrNumber,
563        _transaction_kind: TransactionVariant,
564        header_by_number: HF,
565        construct_block: BF,
566    ) -> ProviderResult<Option<B>>
567    where
568        H: AsRef<HeaderTy<N>>,
569        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
570        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
571    {
572        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
573        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
574
575        // Get the block body
576        //
577        // If the body indices are not found, this means that the transactions either do not exist
578        // in the database yet, or they do exit but are not indexed. If they exist but are not
579        // indexed, we don't have enough information to return the block anyways, so we return
580        // `None`.
581        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
582
583        let tx_range = body.tx_num_range();
584
585        let (transactions, senders) = if tx_range.is_empty() {
586            (vec![], vec![])
587        } else {
588            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
589        };
590
591        let body = self
592            .storage
593            .reader()
594            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
595            .pop()
596            .ok_or(ProviderError::InvalidStorageOutput)?;
597
598        construct_block(header, body, senders)
599    }
600
601    /// Returns a range of blocks from the database.
602    ///
603    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
604    /// construct blocks from the following inputs:
605    ///     – Header
606    ///     - Range of transaction numbers
607    ///     – Ommers
608    ///     – Withdrawals
609    ///     – Senders
610    fn block_range<F, H, HF, R>(
611        &self,
612        range: RangeInclusive<BlockNumber>,
613        headers_range: HF,
614        mut assemble_block: F,
615    ) -> ProviderResult<Vec<R>>
616    where
617        H: AsRef<HeaderTy<N>>,
618        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
619        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
620    {
621        if range.is_empty() {
622            return Ok(Vec::new())
623        }
624
625        let len = range.end().saturating_sub(*range.start()) as usize;
626        let mut blocks = Vec::with_capacity(len);
627
628        let headers = headers_range(range.clone())?;
629
630        // If the body indices are not found, this means that the transactions either do
631        // not exist in the database yet, or they do exit but are
632        // not indexed. If they exist but are not indexed, we don't
633        // have enough information to return the block anyways, so
634        // we skip the block.
635        let present_headers = self
636            .block_body_indices_range(range)?
637            .into_iter()
638            .map(|b| b.tx_num_range())
639            .zip(headers)
640            .collect::<Vec<_>>();
641
642        let mut inputs = Vec::new();
643        for (tx_range, header) in &present_headers {
644            let transactions = if tx_range.is_empty() {
645                Vec::new()
646            } else {
647                self.transactions_by_tx_range(tx_range.clone())?
648            };
649
650            inputs.push((header.as_ref(), transactions));
651        }
652
653        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
654
655        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
656            blocks.push(assemble_block(header, body, tx_range)?);
657        }
658
659        Ok(blocks)
660    }
661
662    /// Returns a range of blocks from the database, along with the senders of each
663    /// transaction in the blocks.
664    ///
665    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
666    /// construct blocks from the following inputs:
667    ///     – Header
668    ///     - Transactions
669    ///     – Ommers
670    ///     – Withdrawals
671    ///     – Senders
672    fn block_with_senders_range<H, HF, B, BF>(
673        &self,
674        range: RangeInclusive<BlockNumber>,
675        headers_range: HF,
676        assemble_block: BF,
677    ) -> ProviderResult<Vec<B>>
678    where
679        H: AsRef<HeaderTy<N>>,
680        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
681        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
682    {
683        self.block_range(range, headers_range, |header, body, tx_range| {
684            let senders = if tx_range.is_empty() {
685                Vec::new()
686            } else {
687                let known_senders: HashMap<TxNumber, Address> =
688                    EitherReader::new_senders(self)?.senders_by_tx_range(tx_range.clone())?;
689
690                let mut senders = Vec::with_capacity(body.transactions().len());
691                for (tx_num, tx) in tx_range.zip(body.transactions()) {
692                    match known_senders.get(&tx_num) {
693                        None => {
694                            // recover the sender from the transaction if not found
695                            let sender = tx.recover_signer_unchecked()?;
696                            senders.push(sender);
697                        }
698                        Some(sender) => senders.push(*sender),
699                    }
700                }
701
702                senders
703            };
704
705            assemble_block(header, body, senders)
706        })
707    }
708
709    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
710    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
711    /// account changesets.
712    fn populate_bundle_state<A, S>(
713        &self,
714        account_changeset: Vec<(u64, AccountBeforeTx)>,
715        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
716        plain_accounts_cursor: &mut A,
717        plain_storage_cursor: &mut S,
718    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
719    where
720        A: DbCursorRO<PlainAccountState>,
721        S: DbDupCursorRO<PlainStorageState>,
722    {
723        // iterate previous value and get plain state value to create changeset
724        // Double option around Account represent if Account state is know (first option) and
725        // account is removed (Second Option)
726        let mut state: BundleStateInit = HashMap::default();
727
728        // This is not working for blocks that are not at tip. as plain state is not the last
729        // state of end range. We should rename the functions or add support to access
730        // History state. Accessing history state can be tricky but we are not gaining
731        // anything.
732
733        let mut reverts: RevertsInit = HashMap::default();
734
735        // add account changeset changes
736        for (block_number, account_before) in account_changeset.into_iter().rev() {
737            let AccountBeforeTx { info: old_info, address } = account_before;
738            match state.entry(address) {
739                hash_map::Entry::Vacant(entry) => {
740                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
741                    entry.insert((old_info, new_info, HashMap::default()));
742                }
743                hash_map::Entry::Occupied(mut entry) => {
744                    // overwrite old account state.
745                    entry.get_mut().0 = old_info;
746                }
747            }
748            // insert old info into reverts.
749            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
750        }
751
752        // add storage changeset changes
753        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
754            let BlockNumberAddress((block_number, address)) = block_and_address;
755            // get account state or insert from plain state.
756            let account_state = match state.entry(address) {
757                hash_map::Entry::Vacant(entry) => {
758                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
759                    entry.insert((present_info, present_info, HashMap::default()))
760                }
761                hash_map::Entry::Occupied(entry) => entry.into_mut(),
762            };
763
764            // match storage.
765            match account_state.2.entry(old_storage.key) {
766                hash_map::Entry::Vacant(entry) => {
767                    let new_storage = plain_storage_cursor
768                        .seek_by_key_subkey(address, old_storage.key)?
769                        .filter(|storage| storage.key == old_storage.key)
770                        .unwrap_or_default();
771                    entry.insert((old_storage.value, new_storage.value));
772                }
773                hash_map::Entry::Occupied(mut entry) => {
774                    entry.get_mut().0 = old_storage.value;
775                }
776            };
777
778            reverts
779                .entry(block_number)
780                .or_default()
781                .entry(address)
782                .or_default()
783                .1
784                .push(old_storage);
785        }
786
787        Ok((state, reverts))
788    }
789}
790
791impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
792    /// Load shard and remove it. If list is empty, last shard was full or
793    /// there are no shards at all.
794    fn take_shard<T>(
795        &self,
796        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
797        key: T::Key,
798    ) -> ProviderResult<Vec<u64>>
799    where
800        T: Table<Value = BlockNumberList>,
801    {
802        if let Some((_, list)) = cursor.seek_exact(key)? {
803            // delete old shard so new one can be inserted.
804            cursor.delete_current()?;
805            let list = list.iter().collect::<Vec<_>>();
806            return Ok(list)
807        }
808        Ok(Vec::new())
809    }
810
811    /// Insert history index to the database.
812    ///
813    /// For each updated partial key, this function removes the last shard from
814    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
815    /// inserts the new shards back into the database.
816    ///
817    /// This function is used by history indexing stages.
818    fn append_history_index<P, T>(
819        &self,
820        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
821        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
822    ) -> ProviderResult<()>
823    where
824        P: Copy,
825        T: Table<Value = BlockNumberList>,
826    {
827        let mut cursor = self.tx.cursor_write::<T>()?;
828        for (partial_key, indices) in index_updates {
829            let mut last_shard =
830                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
831            last_shard.extend(indices);
832            // Chunk indices and insert them in shards of N size.
833            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
834            while let Some(list) = chunks.next() {
835                let highest_block_number = if chunks.peek().is_some() {
836                    *list.last().expect("`chunks` does not return empty list")
837                } else {
838                    // Insert last list with `u64::MAX`.
839                    u64::MAX
840                };
841                cursor.insert(
842                    sharded_key_factory(partial_key, highest_block_number),
843                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
844                )?;
845            }
846        }
847        Ok(())
848    }
849}
850
851impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
852    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
853        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
854    }
855}
856
857impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
858    fn changed_accounts_with_range(
859        &self,
860        range: impl RangeBounds<BlockNumber>,
861    ) -> ProviderResult<BTreeSet<Address>> {
862        self.tx
863            .cursor_read::<tables::AccountChangeSets>()?
864            .walk_range(range)?
865            .map(|entry| {
866                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
867            })
868            .collect()
869    }
870
871    fn basic_accounts(
872        &self,
873        iter: impl IntoIterator<Item = Address>,
874    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
875        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
876        Ok(iter
877            .into_iter()
878            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
879            .collect::<Result<Vec<_>, _>>()?)
880    }
881
882    fn changed_accounts_and_blocks_with_range(
883        &self,
884        range: RangeInclusive<BlockNumber>,
885    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
886        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
887
888        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
889            BTreeMap::new(),
890            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
891                let (index, account) = entry?;
892                accounts.entry(account.address).or_default().push(index);
893                Ok(accounts)
894            },
895        )?;
896
897        Ok(account_transitions)
898    }
899}
900
901impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
902    fn storage_changeset(
903        &self,
904        block_number: BlockNumber,
905    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
906        let range = block_number..=block_number;
907        let storage_range = BlockNumberAddress::range(range);
908        self.tx
909            .cursor_dup_read::<tables::StorageChangeSets>()?
910            .walk_range(storage_range)?
911            .map(|result| -> ProviderResult<_> { Ok(result?) })
912            .collect()
913    }
914}
915
916impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
917    fn account_block_changeset(
918        &self,
919        block_number: BlockNumber,
920    ) -> ProviderResult<Vec<AccountBeforeTx>> {
921        let range = block_number..=block_number;
922        self.tx
923            .cursor_read::<tables::AccountChangeSets>()?
924            .walk_range(range)?
925            .map(|result| -> ProviderResult<_> {
926                let (_, account_before) = result?;
927                Ok(account_before)
928            })
929            .collect()
930    }
931
932    fn get_account_before_block(
933        &self,
934        block_number: BlockNumber,
935        address: Address,
936    ) -> ProviderResult<Option<AccountBeforeTx>> {
937        self.tx
938            .cursor_dup_read::<tables::AccountChangeSets>()?
939            .seek_by_key_subkey(block_number, address)?
940            .filter(|acc| acc.address == address)
941            .map(Ok)
942            .transpose()
943    }
944}
945
946impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
947    for DatabaseProvider<TX, N>
948{
949    type Header = HeaderTy<N>;
950
951    fn local_tip_header(
952        &self,
953        highest_uninterrupted_block: BlockNumber,
954    ) -> ProviderResult<SealedHeader<Self::Header>> {
955        let static_file_provider = self.static_file_provider();
956
957        // Make sure Headers static file is at the same height. If it's further, this
958        // input execution was interrupted previously and we need to unwind the static file.
959        let next_static_file_block_num = static_file_provider
960            .get_highest_static_file_block(StaticFileSegment::Headers)
961            .map(|id| id + 1)
962            .unwrap_or_default();
963        let next_block = highest_uninterrupted_block + 1;
964
965        match next_static_file_block_num.cmp(&next_block) {
966            // The node shutdown between an executed static file commit and before the database
967            // commit, so we need to unwind the static files.
968            Ordering::Greater => {
969                let mut static_file_producer =
970                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
971                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
972                // Since this is a database <-> static file inconsistency, we commit the change
973                // straight away.
974                static_file_producer.commit()?
975            }
976            Ordering::Less => {
977                // There's either missing or corrupted files.
978                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
979            }
980            Ordering::Equal => {}
981        }
982
983        let local_head = static_file_provider
984            .sealed_header(highest_uninterrupted_block)?
985            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
986
987        Ok(local_head)
988    }
989}
990
991impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
992    type Header = HeaderTy<N>;
993
994    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
995        if let Some(num) = self.block_number(block_hash)? {
996            Ok(self.header_by_number(num)?)
997        } else {
998            Ok(None)
999        }
1000    }
1001
1002    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1003        self.static_file_provider.header_by_number(num)
1004    }
1005
1006    fn headers_range(
1007        &self,
1008        range: impl RangeBounds<BlockNumber>,
1009    ) -> ProviderResult<Vec<Self::Header>> {
1010        self.static_file_provider.headers_range(range)
1011    }
1012
1013    fn sealed_header(
1014        &self,
1015        number: BlockNumber,
1016    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1017        self.static_file_provider.sealed_header(number)
1018    }
1019
1020    fn sealed_headers_while(
1021        &self,
1022        range: impl RangeBounds<BlockNumber>,
1023        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1024    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1025        self.static_file_provider.sealed_headers_while(range, predicate)
1026    }
1027}
1028
1029impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1030    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1031        self.static_file_provider.block_hash(number)
1032    }
1033
1034    fn canonical_hashes_range(
1035        &self,
1036        start: BlockNumber,
1037        end: BlockNumber,
1038    ) -> ProviderResult<Vec<B256>> {
1039        self.static_file_provider.canonical_hashes_range(start, end)
1040    }
1041}
1042
1043impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1044    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1045        let best_number = self.best_block_number()?;
1046        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1047        Ok(ChainInfo { best_hash, best_number })
1048    }
1049
1050    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1051        // The best block number is tracked via the finished stage which gets updated in the same tx
1052        // when new blocks committed
1053        Ok(self
1054            .get_stage_checkpoint(StageId::Finish)?
1055            .map(|checkpoint| checkpoint.block_number)
1056            .unwrap_or_default())
1057    }
1058
1059    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1060        self.static_file_provider.last_block_number()
1061    }
1062
1063    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1064        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1065    }
1066}
1067
1068impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1069    type Block = BlockTy<N>;
1070
1071    fn find_block_by_hash(
1072        &self,
1073        hash: B256,
1074        source: BlockSource,
1075    ) -> ProviderResult<Option<Self::Block>> {
1076        if source.is_canonical() {
1077            self.block(hash.into())
1078        } else {
1079            Ok(None)
1080        }
1081    }
1082
1083    /// Returns the block with matching number from database.
1084    ///
1085    /// If the header for this block is not found, this returns `None`.
1086    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1087    /// will return None.
1088    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1089        if let Some(number) = self.convert_hash_or_number(id)? &&
1090            let Some(header) = self.header_by_number(number)?
1091        {
1092            // If the body indices are not found, this means that the transactions either do not
1093            // exist in the database yet, or they do exit but are not indexed.
1094            // If they exist but are not indexed, we don't have enough
1095            // information to return the block anyways, so we return `None`.
1096            let Some(transactions) = self.transactions_by_block(number.into())? else {
1097                return Ok(None)
1098            };
1099
1100            let body = self
1101                .storage
1102                .reader()
1103                .read_block_bodies(self, vec![(&header, transactions)])?
1104                .pop()
1105                .ok_or(ProviderError::InvalidStorageOutput)?;
1106
1107            return Ok(Some(Self::Block::new(header, body)))
1108        }
1109
1110        Ok(None)
1111    }
1112
1113    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1114        Ok(None)
1115    }
1116
1117    fn pending_block_and_receipts(
1118        &self,
1119    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1120        Ok(None)
1121    }
1122
1123    /// Returns the block with senders with matching number or hash from database.
1124    ///
1125    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1126    /// spot, and we want fast querying.**
1127    ///
1128    /// If the header for this block is not found, this returns `None`.
1129    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1130    /// will return None.
1131    fn recovered_block(
1132        &self,
1133        id: BlockHashOrNumber,
1134        transaction_kind: TransactionVariant,
1135    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1136        self.recovered_block(
1137            id,
1138            transaction_kind,
1139            |block_number| self.header_by_number(block_number),
1140            |header, body, senders| {
1141                Self::Block::new(header, body)
1142                    // Note: we're using unchecked here because we know the block contains valid txs
1143                    // wrt to its height and can ignore the s value check so pre
1144                    // EIP-2 txs are allowed
1145                    .try_into_recovered_unchecked(senders)
1146                    .map(Some)
1147                    .map_err(|_| ProviderError::SenderRecoveryError)
1148            },
1149        )
1150    }
1151
1152    fn sealed_block_with_senders(
1153        &self,
1154        id: BlockHashOrNumber,
1155        transaction_kind: TransactionVariant,
1156    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1157        self.recovered_block(
1158            id,
1159            transaction_kind,
1160            |block_number| self.sealed_header(block_number),
1161            |header, body, senders| {
1162                Self::Block::new_sealed(header, body)
1163                    // Note: we're using unchecked here because we know the block contains valid txs
1164                    // wrt to its height and can ignore the s value check so pre
1165                    // EIP-2 txs are allowed
1166                    .try_with_senders_unchecked(senders)
1167                    .map(Some)
1168                    .map_err(|_| ProviderError::SenderRecoveryError)
1169            },
1170        )
1171    }
1172
1173    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1174        self.block_range(
1175            range,
1176            |range| self.headers_range(range),
1177            |header, body, _| Ok(Self::Block::new(header, body)),
1178        )
1179    }
1180
1181    fn block_with_senders_range(
1182        &self,
1183        range: RangeInclusive<BlockNumber>,
1184    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1185        self.block_with_senders_range(
1186            range,
1187            |range| self.headers_range(range),
1188            |header, body, senders| {
1189                Self::Block::new(header, body)
1190                    .try_into_recovered_unchecked(senders)
1191                    .map_err(|_| ProviderError::SenderRecoveryError)
1192            },
1193        )
1194    }
1195
1196    fn recovered_block_range(
1197        &self,
1198        range: RangeInclusive<BlockNumber>,
1199    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1200        self.block_with_senders_range(
1201            range,
1202            |range| self.sealed_headers_range(range),
1203            |header, body, senders| {
1204                Self::Block::new_sealed(header, body)
1205                    .try_with_senders(senders)
1206                    .map_err(|_| ProviderError::SenderRecoveryError)
1207            },
1208        )
1209    }
1210
1211    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1212        Ok(self
1213            .tx
1214            .cursor_read::<tables::TransactionBlocks>()?
1215            .seek(id)
1216            .map(|b| b.map(|(_, bn)| bn))?)
1217    }
1218}
1219
1220impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1221    for DatabaseProvider<TX, N>
1222{
1223    /// Recovers transaction hashes by walking through `Transactions` table and
1224    /// calculating them in a parallel manner. Returned unsorted.
1225    fn transaction_hashes_by_range(
1226        &self,
1227        tx_range: Range<TxNumber>,
1228    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1229        self.static_file_provider.transaction_hashes_by_range(tx_range)
1230    }
1231}
1232
1233// Calculates the hash of the given transaction
1234impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1235    type Transaction = TxTy<N>;
1236
1237    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1238        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1239    }
1240
1241    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1242        self.static_file_provider.transaction_by_id(id)
1243    }
1244
1245    fn transaction_by_id_unhashed(
1246        &self,
1247        id: TxNumber,
1248    ) -> ProviderResult<Option<Self::Transaction>> {
1249        self.static_file_provider.transaction_by_id_unhashed(id)
1250    }
1251
1252    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1253        if let Some(id) = self.transaction_id(hash)? {
1254            Ok(self.transaction_by_id_unhashed(id)?)
1255        } else {
1256            Ok(None)
1257        }
1258    }
1259
1260    fn transaction_by_hash_with_meta(
1261        &self,
1262        tx_hash: TxHash,
1263    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1264        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1265            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1266            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1267            let Some(sealed_header) = self.sealed_header(block_number)?
1268        {
1269            let (header, block_hash) = sealed_header.split();
1270            if let Some(block_body) = self.block_body_indices(block_number)? {
1271                // the index of the tx in the block is the offset:
1272                // len([start..tx_id])
1273                // NOTE: `transaction_id` is always `>=` the block's first
1274                // index
1275                let index = transaction_id - block_body.first_tx_num();
1276
1277                let meta = TransactionMeta {
1278                    tx_hash,
1279                    index,
1280                    block_hash,
1281                    block_number,
1282                    base_fee: header.base_fee_per_gas(),
1283                    excess_blob_gas: header.excess_blob_gas(),
1284                    timestamp: header.timestamp(),
1285                };
1286
1287                return Ok(Some((transaction, meta)))
1288            }
1289        }
1290
1291        Ok(None)
1292    }
1293
1294    fn transactions_by_block(
1295        &self,
1296        id: BlockHashOrNumber,
1297    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1298        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1299            let Some(body) = self.block_body_indices(block_number)?
1300        {
1301            let tx_range = body.tx_num_range();
1302            return if tx_range.is_empty() {
1303                Ok(Some(Vec::new()))
1304            } else {
1305                self.transactions_by_tx_range(tx_range).map(Some)
1306            }
1307        }
1308        Ok(None)
1309    }
1310
1311    fn transactions_by_block_range(
1312        &self,
1313        range: impl RangeBounds<BlockNumber>,
1314    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1315        let range = to_range(range);
1316
1317        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1318            .into_iter()
1319            .map(|body| {
1320                let tx_num_range = body.tx_num_range();
1321                if tx_num_range.is_empty() {
1322                    Ok(Vec::new())
1323                } else {
1324                    self.transactions_by_tx_range(tx_num_range)
1325                }
1326            })
1327            .collect()
1328    }
1329
1330    fn transactions_by_tx_range(
1331        &self,
1332        range: impl RangeBounds<TxNumber>,
1333    ) -> ProviderResult<Vec<Self::Transaction>> {
1334        self.static_file_provider.transactions_by_tx_range(range)
1335    }
1336
1337    fn senders_by_tx_range(
1338        &self,
1339        range: impl RangeBounds<TxNumber>,
1340    ) -> ProviderResult<Vec<Address>> {
1341        if EitherWriterDestination::senders(self).is_static_file() {
1342            self.static_file_provider.senders_by_tx_range(range)
1343        } else {
1344            self.cursor_read_collect::<tables::TransactionSenders>(range)
1345        }
1346    }
1347
1348    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1349        if EitherWriterDestination::senders(self).is_static_file() {
1350            self.static_file_provider.transaction_sender(id)
1351        } else {
1352            Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1353        }
1354    }
1355}
1356
1357impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1358    type Receipt = ReceiptTy<N>;
1359
1360    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1361        self.static_file_provider.get_with_static_file_or_database(
1362            StaticFileSegment::Receipts,
1363            id,
1364            |static_file| static_file.receipt(id),
1365            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1366        )
1367    }
1368
1369    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1370        if let Some(id) = self.transaction_id(hash)? {
1371            self.receipt(id)
1372        } else {
1373            Ok(None)
1374        }
1375    }
1376
1377    fn receipts_by_block(
1378        &self,
1379        block: BlockHashOrNumber,
1380    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1381        if let Some(number) = self.convert_hash_or_number(block)? &&
1382            let Some(body) = self.block_body_indices(number)?
1383        {
1384            let tx_range = body.tx_num_range();
1385            return if tx_range.is_empty() {
1386                Ok(Some(Vec::new()))
1387            } else {
1388                self.receipts_by_tx_range(tx_range).map(Some)
1389            }
1390        }
1391        Ok(None)
1392    }
1393
1394    fn receipts_by_tx_range(
1395        &self,
1396        range: impl RangeBounds<TxNumber>,
1397    ) -> ProviderResult<Vec<Self::Receipt>> {
1398        self.static_file_provider.get_range_with_static_file_or_database(
1399            StaticFileSegment::Receipts,
1400            to_range(range),
1401            |static_file, range, _| static_file.receipts_by_tx_range(range),
1402            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1403            |_| true,
1404        )
1405    }
1406
1407    fn receipts_by_block_range(
1408        &self,
1409        block_range: RangeInclusive<BlockNumber>,
1410    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1411        if block_range.is_empty() {
1412            return Ok(Vec::new());
1413        }
1414
1415        // collect block body indices for each block in the range
1416        let mut block_body_indices = Vec::new();
1417        for block_num in block_range {
1418            if let Some(indices) = self.block_body_indices(block_num)? {
1419                block_body_indices.push(indices);
1420            } else {
1421                // use default indices for missing blocks (empty block)
1422                block_body_indices.push(StoredBlockBodyIndices::default());
1423            }
1424        }
1425
1426        if block_body_indices.is_empty() {
1427            return Ok(Vec::new());
1428        }
1429
1430        // find blocks with transactions to determine transaction range
1431        let non_empty_blocks: Vec<_> =
1432            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1433
1434        if non_empty_blocks.is_empty() {
1435            // all blocks are empty
1436            return Ok(vec![Vec::new(); block_body_indices.len()]);
1437        }
1438
1439        // calculate the overall transaction range
1440        let first_tx = non_empty_blocks[0].first_tx_num();
1441        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1442
1443        // fetch all receipts in the transaction range
1444        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1445        let mut receipts_iter = all_receipts.into_iter();
1446
1447        // distribute receipts to their respective blocks
1448        let mut result = Vec::with_capacity(block_body_indices.len());
1449        for indices in &block_body_indices {
1450            if indices.tx_count == 0 {
1451                result.push(Vec::new());
1452            } else {
1453                let block_receipts =
1454                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1455                result.push(block_receipts);
1456            }
1457        }
1458
1459        Ok(result)
1460    }
1461}
1462
1463impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1464    for DatabaseProvider<TX, N>
1465{
1466    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1467        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1468    }
1469
1470    fn block_body_indices_range(
1471        &self,
1472        range: RangeInclusive<BlockNumber>,
1473    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1474        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1475    }
1476}
1477
1478impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1479    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1480        Ok(if let Some(encoded) = id.get_pre_encoded() {
1481            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1482        } else {
1483            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1484        })
1485    }
1486
1487    /// Get stage checkpoint progress.
1488    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1489        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1490    }
1491
1492    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1493        self.tx
1494            .cursor_read::<tables::StageCheckpoints>()?
1495            .walk(None)?
1496            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1497            .map_err(ProviderError::Database)
1498    }
1499}
1500
1501impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1502    /// Save stage checkpoint.
1503    fn save_stage_checkpoint(
1504        &self,
1505        id: StageId,
1506        checkpoint: StageCheckpoint,
1507    ) -> ProviderResult<()> {
1508        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1509    }
1510
1511    /// Save stage checkpoint progress.
1512    fn save_stage_checkpoint_progress(
1513        &self,
1514        id: StageId,
1515        checkpoint: Vec<u8>,
1516    ) -> ProviderResult<()> {
1517        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1518    }
1519
1520    fn update_pipeline_stages(
1521        &self,
1522        block_number: BlockNumber,
1523        drop_stage_checkpoint: bool,
1524    ) -> ProviderResult<()> {
1525        // iterate over all existing stages in the table and update its progress.
1526        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1527        for stage_id in StageId::ALL {
1528            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1529            cursor.upsert(
1530                stage_id.to_string(),
1531                &StageCheckpoint {
1532                    block_number,
1533                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1534                },
1535            )?;
1536        }
1537
1538        Ok(())
1539    }
1540}
1541
1542impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1543    fn plain_state_storages(
1544        &self,
1545        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1546    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1547        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1548
1549        addresses_with_keys
1550            .into_iter()
1551            .map(|(address, storage)| {
1552                storage
1553                    .into_iter()
1554                    .map(|key| -> ProviderResult<_> {
1555                        Ok(plain_storage
1556                            .seek_by_key_subkey(address, key)?
1557                            .filter(|v| v.key == key)
1558                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1559                    })
1560                    .collect::<ProviderResult<Vec<_>>>()
1561                    .map(|storage| (address, storage))
1562            })
1563            .collect::<ProviderResult<Vec<(_, _)>>>()
1564    }
1565
1566    fn changed_storages_with_range(
1567        &self,
1568        range: RangeInclusive<BlockNumber>,
1569    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1570        self.tx
1571            .cursor_read::<tables::StorageChangeSets>()?
1572            .walk_range(BlockNumberAddress::range(range))?
1573            // fold all storages and save its old state so we can remove it from HashedStorage
1574            // it is needed as it is dup table.
1575            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1576                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1577                accounts.entry(address).or_default().insert(storage_entry.key);
1578                Ok(accounts)
1579            })
1580    }
1581
1582    fn changed_storages_and_blocks_with_range(
1583        &self,
1584        range: RangeInclusive<BlockNumber>,
1585    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1586        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1587
1588        let storage_changeset_lists =
1589            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1590                BTreeMap::new(),
1591                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1592                    let (index, storage) = entry?;
1593                    storages
1594                        .entry((index.address(), storage.key))
1595                        .or_default()
1596                        .push(index.block_number());
1597                    Ok(storages)
1598                },
1599            )?;
1600
1601        Ok(storage_changeset_lists)
1602    }
1603}
1604
1605impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1606    for DatabaseProvider<TX, N>
1607{
1608    type Receipt = ReceiptTy<N>;
1609
1610    fn write_state(
1611        &self,
1612        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1613        is_value_known: OriginalValuesKnown,
1614    ) -> ProviderResult<()> {
1615        let first_block = execution_outcome.first_block();
1616        let block_count = execution_outcome.len() as u64;
1617        let last_block = execution_outcome.last_block();
1618        let block_range = first_block..=last_block;
1619
1620        let tip = self.last_block_number()?.max(last_block);
1621
1622        let (plain_state, reverts) =
1623            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1624
1625        self.write_state_reverts(reverts, first_block)?;
1626        self.write_state_changes(plain_state)?;
1627
1628        // Fetch the first transaction number for each block in the range
1629        let block_indices: Vec<_> = self
1630            .block_body_indices_range(block_range)?
1631            .into_iter()
1632            .map(|b| b.first_tx_num)
1633            .collect();
1634
1635        // Ensure all expected blocks are present.
1636        if block_indices.len() < block_count as usize {
1637            let missing_blocks = block_count - block_indices.len() as u64;
1638            return Err(ProviderError::BlockBodyIndicesNotFound(
1639                last_block.saturating_sub(missing_blocks - 1),
1640            ));
1641        }
1642
1643        let mut receipts_writer = EitherWriter::new_receipts(self, first_block)?;
1644
1645        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1646        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1647
1648        // All receipts from the last 128 blocks are required for blockchain tree, even with
1649        // [`PruneSegment::ContractLogs`].
1650        //
1651        // Receipts can only be skipped if we're dealing with legacy nodes that write them to
1652        // Database, OR if receipts_in_static_files is enabled but no receipts exist in static
1653        // files yet. Once receipts exist in static files, we must continue writing to maintain
1654        // continuity and have no gaps.
1655        let prunable_receipts = (EitherWriter::receipts_destination(self).is_database() ||
1656            self.static_file_provider()
1657                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1658                .is_none()) &&
1659            PruneMode::Distance(self.minimum_pruning_distance).should_prune(first_block, tip);
1660
1661        // Prepare set of addresses which logs should not be pruned.
1662        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1663        for (_, addresses) in contract_log_pruner.range(..first_block) {
1664            allowed_addresses.extend(addresses.iter().copied());
1665        }
1666
1667        for (idx, (receipts, first_tx_index)) in
1668            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1669        {
1670            let block_number = first_block + idx as u64;
1671
1672            // Increment block number for receipts static file writer
1673            receipts_writer.increment_block(block_number)?;
1674
1675            // Skip writing receipts if pruning configuration requires us to.
1676            if prunable_receipts &&
1677                self.prune_modes
1678                    .receipts
1679                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1680            {
1681                continue
1682            }
1683
1684            // If there are new addresses to retain after this block number, track them
1685            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1686                allowed_addresses.extend(new_addresses.iter().copied());
1687            }
1688
1689            for (idx, receipt) in receipts.iter().enumerate() {
1690                let receipt_idx = first_tx_index + idx as u64;
1691                // Skip writing receipt if log filter is active and it does not have any logs to
1692                // retain
1693                if prunable_receipts &&
1694                    has_contract_log_filter &&
1695                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1696                {
1697                    continue
1698                }
1699
1700                receipts_writer.append_receipt(receipt_idx, receipt)?;
1701            }
1702        }
1703
1704        Ok(())
1705    }
1706
1707    fn write_state_reverts(
1708        &self,
1709        reverts: PlainStateReverts,
1710        first_block: BlockNumber,
1711    ) -> ProviderResult<()> {
1712        // Write storage changes
1713        tracing::trace!("Writing storage changes");
1714        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1715        let mut storage_changeset_cursor =
1716            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1717        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1718            let block_number = first_block + block_index as BlockNumber;
1719
1720            tracing::trace!(block_number, "Writing block change");
1721            // sort changes by address.
1722            storage_changes.par_sort_unstable_by_key(|a| a.address);
1723            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1724                let storage_id = BlockNumberAddress((block_number, address));
1725
1726                let mut storage = storage_revert
1727                    .into_iter()
1728                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1729                    .collect::<Vec<_>>();
1730                // sort storage slots by key.
1731                storage.par_sort_unstable_by_key(|a| a.0);
1732
1733                // If we are writing the primary storage wipe transition, the pre-existing plain
1734                // storage state has to be taken from the database and written to storage history.
1735                // See [StorageWipe::Primary] for more details.
1736                //
1737                // TODO(mediocregopher): This could be rewritten in a way which doesn't require
1738                // collecting wiped entries into a Vec like this, see
1739                // `write_storage_trie_changesets`.
1740                let mut wiped_storage = Vec::new();
1741                if wiped {
1742                    tracing::trace!(?address, "Wiping storage");
1743                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1744                        wiped_storage.push((entry.key, entry.value));
1745                        while let Some(entry) = storages_cursor.next_dup_val()? {
1746                            wiped_storage.push((entry.key, entry.value))
1747                        }
1748                    }
1749                }
1750
1751                tracing::trace!(?address, ?storage, "Writing storage reverts");
1752                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1753                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1754                }
1755            }
1756        }
1757
1758        // Write account changes
1759        tracing::trace!("Writing account changes");
1760        let mut account_changeset_cursor =
1761            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1762
1763        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1764            let block_number = first_block + block_index as BlockNumber;
1765            // Sort accounts by address.
1766            account_block_reverts.par_sort_by_key(|a| a.0);
1767
1768            for (address, info) in account_block_reverts {
1769                account_changeset_cursor.append_dup(
1770                    block_number,
1771                    AccountBeforeTx { address, info: info.map(Into::into) },
1772                )?;
1773            }
1774        }
1775
1776        Ok(())
1777    }
1778
1779    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1780        // sort all entries so they can be written to database in more performant way.
1781        // and take smaller memory footprint.
1782        changes.accounts.par_sort_by_key(|a| a.0);
1783        changes.storage.par_sort_by_key(|a| a.address);
1784        changes.contracts.par_sort_by_key(|a| a.0);
1785
1786        // Write new account state
1787        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1788        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1789        // write account to database.
1790        for (address, account) in changes.accounts {
1791            if let Some(account) = account {
1792                tracing::trace!(?address, "Updating plain state account");
1793                accounts_cursor.upsert(address, &account.into())?;
1794            } else if accounts_cursor.seek_exact(address)?.is_some() {
1795                tracing::trace!(?address, "Deleting plain state account");
1796                accounts_cursor.delete_current()?;
1797            }
1798        }
1799
1800        // Write bytecode
1801        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1802        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1803        for (hash, bytecode) in changes.contracts {
1804            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1805        }
1806
1807        // Write new storage state and wipe storage if needed.
1808        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1809        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1810        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1811            // Wiping of storage.
1812            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1813                storages_cursor.delete_current_duplicates()?;
1814            }
1815            // cast storages to B256.
1816            let mut storage = storage
1817                .into_iter()
1818                .map(|(k, value)| StorageEntry { key: k.into(), value })
1819                .collect::<Vec<_>>();
1820            // sort storage slots by key.
1821            storage.par_sort_unstable_by_key(|a| a.key);
1822
1823            for entry in storage {
1824                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1825                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1826                    db_entry.key == entry.key
1827                {
1828                    storages_cursor.delete_current()?;
1829                }
1830
1831                if !entry.value.is_zero() {
1832                    storages_cursor.upsert(address, &entry)?;
1833                }
1834            }
1835        }
1836
1837        Ok(())
1838    }
1839
1840    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1841        // Write hashed account updates.
1842        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1843        for (hashed_address, account) in hashed_state.accounts() {
1844            if let Some(account) = account {
1845                hashed_accounts_cursor.upsert(*hashed_address, account)?;
1846            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
1847                hashed_accounts_cursor.delete_current()?;
1848            }
1849        }
1850
1851        // Write hashed storage changes.
1852        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1853        let mut hashed_storage_cursor =
1854            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1855        for (hashed_address, storage) in sorted_storages {
1856            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1857                hashed_storage_cursor.delete_current_duplicates()?;
1858            }
1859
1860            for (hashed_slot, value) in storage.storage_slots_ref() {
1861                let entry = StorageEntry { key: *hashed_slot, value: *value };
1862
1863                if let Some(db_entry) =
1864                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1865                    db_entry.key == entry.key
1866                {
1867                    hashed_storage_cursor.delete_current()?;
1868                }
1869
1870                if !entry.value.is_zero() {
1871                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1872                }
1873            }
1874        }
1875
1876        Ok(())
1877    }
1878
1879    /// Remove the last N blocks of state.
1880    ///
1881    /// The latest state will be unwound
1882    ///
1883    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
1884    ///    transaction ids.
1885    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
1886    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
1887    ///    the changesets.
1888    ///    - In order to have both the old and new values in the changesets, we also access the
1889    ///      plain state tables.
1890    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
1891    ///    we:
1892    ///     1. Take the old value from the changeset
1893    ///     2. Take the new value from the plain state
1894    ///     3. Save the old value to the local state
1895    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
1896    ///    have seen before we:
1897    ///     1. Take the old value from the changeset
1898    ///     2. Take the new value from the local state
1899    ///     3. Set the local state to the value in the changeset
1900    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1901        let range = block + 1..=self.last_block_number()?;
1902
1903        if range.is_empty() {
1904            return Ok(());
1905        }
1906
1907        // We are not removing block meta as it is used to get block changesets.
1908        let block_bodies = self.block_body_indices_range(range.clone())?;
1909
1910        // get transaction receipts
1911        let from_transaction_num =
1912            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1913
1914        let storage_range = BlockNumberAddress::range(range.clone());
1915
1916        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1917        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1918
1919        // This is not working for blocks that are not at tip. as plain state is not the last
1920        // state of end range. We should rename the functions or add support to access
1921        // History state. Accessing history state can be tricky but we are not gaining
1922        // anything.
1923        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1924        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1925
1926        let (state, _) = self.populate_bundle_state(
1927            account_changeset,
1928            storage_changeset,
1929            &mut plain_accounts_cursor,
1930            &mut plain_storage_cursor,
1931        )?;
1932
1933        // iterate over local plain state remove all account and all storages.
1934        for (address, (old_account, new_account, storage)) in &state {
1935            // revert account if needed.
1936            if old_account != new_account {
1937                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1938                if let Some(account) = old_account {
1939                    plain_accounts_cursor.upsert(*address, account)?;
1940                } else if existing_entry.is_some() {
1941                    plain_accounts_cursor.delete_current()?;
1942                }
1943            }
1944
1945            // revert storages
1946            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1947                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1948                // delete previous value
1949                if plain_storage_cursor
1950                    .seek_by_key_subkey(*address, *storage_key)?
1951                    .filter(|s| s.key == *storage_key)
1952                    .is_some()
1953                {
1954                    plain_storage_cursor.delete_current()?
1955                }
1956
1957                // insert value if needed
1958                if !old_storage_value.is_zero() {
1959                    plain_storage_cursor.upsert(*address, &storage_entry)?;
1960                }
1961            }
1962        }
1963
1964        self.remove_receipts_from(from_transaction_num, block)?;
1965
1966        Ok(())
1967    }
1968
1969    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
1970    ///
1971    /// The latest state will be unwound and returned back with all the blocks
1972    ///
1973    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
1974    ///    transaction ids.
1975    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
1976    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
1977    ///    the changesets.
1978    ///    - In order to have both the old and new values in the changesets, we also access the
1979    ///      plain state tables.
1980    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
1981    ///    we:
1982    ///     1. Take the old value from the changeset
1983    ///     2. Take the new value from the plain state
1984    ///     3. Save the old value to the local state
1985    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
1986    ///    have seen before we:
1987    ///     1. Take the old value from the changeset
1988    ///     2. Take the new value from the local state
1989    ///     3. Set the local state to the value in the changeset
1990    fn take_state_above(
1991        &self,
1992        block: BlockNumber,
1993    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
1994        let range = block + 1..=self.last_block_number()?;
1995
1996        if range.is_empty() {
1997            return Ok(ExecutionOutcome::default())
1998        }
1999        let start_block_number = *range.start();
2000
2001        // We are not removing block meta as it is used to get block changesets.
2002        let block_bodies = self.block_body_indices_range(range.clone())?;
2003
2004        // get transaction receipts
2005        let from_transaction_num =
2006            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2007        let to_transaction_num =
2008            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2009
2010        let storage_range = BlockNumberAddress::range(range.clone());
2011
2012        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2013        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2014
2015        // This is not working for blocks that are not at tip. as plain state is not the last
2016        // state of end range. We should rename the functions or add support to access
2017        // History state. Accessing history state can be tricky but we are not gaining
2018        // anything.
2019        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2020        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2021
2022        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2023        // remove, and return later
2024        let (state, reverts) = self.populate_bundle_state(
2025            account_changeset,
2026            storage_changeset,
2027            &mut plain_accounts_cursor,
2028            &mut plain_storage_cursor,
2029        )?;
2030
2031        // iterate over local plain state remove all account and all storages.
2032        for (address, (old_account, new_account, storage)) in &state {
2033            // revert account if needed.
2034            if old_account != new_account {
2035                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2036                if let Some(account) = old_account {
2037                    plain_accounts_cursor.upsert(*address, account)?;
2038                } else if existing_entry.is_some() {
2039                    plain_accounts_cursor.delete_current()?;
2040                }
2041            }
2042
2043            // revert storages
2044            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2045                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2046                // delete previous value
2047                if plain_storage_cursor
2048                    .seek_by_key_subkey(*address, *storage_key)?
2049                    .filter(|s| s.key == *storage_key)
2050                    .is_some()
2051                {
2052                    plain_storage_cursor.delete_current()?
2053                }
2054
2055                // insert value if needed
2056                if !old_storage_value.is_zero() {
2057                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2058                }
2059            }
2060        }
2061
2062        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2063        let mut receipts_iter = self
2064            .static_file_provider
2065            .get_range_with_static_file_or_database(
2066                StaticFileSegment::Receipts,
2067                from_transaction_num..to_transaction_num + 1,
2068                |static_file, range, _| {
2069                    static_file
2070                        .receipts_by_tx_range(range.clone())
2071                        .map(|r| range.into_iter().zip(r).collect())
2072                },
2073                |range, _| {
2074                    self.tx
2075                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2076                        .walk_range(range)?
2077                        .map(|r| r.map_err(Into::into))
2078                        .collect()
2079                },
2080                |_| true,
2081            )?
2082            .into_iter()
2083            .peekable();
2084
2085        let mut receipts = Vec::with_capacity(block_bodies.len());
2086        // loop break if we are at the end of the blocks.
2087        for block_body in block_bodies {
2088            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2089            for num in block_body.tx_num_range() {
2090                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2091                    block_receipts.push(receipts_iter.next().unwrap().1);
2092                }
2093            }
2094            receipts.push(block_receipts);
2095        }
2096
2097        self.remove_receipts_from(from_transaction_num, block)?;
2098
2099        Ok(ExecutionOutcome::new_init(
2100            state,
2101            reverts,
2102            Vec::new(),
2103            receipts,
2104            start_block_number,
2105            Vec::new(),
2106        ))
2107    }
2108}
2109
2110impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2111    /// Writes trie updates to the database with already sorted updates.
2112    ///
2113    /// Returns the number of entries modified.
2114    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2115        if trie_updates.is_empty() {
2116            return Ok(0)
2117        }
2118
2119        // Track the number of inserted entries.
2120        let mut num_entries = 0;
2121
2122        let tx = self.tx_ref();
2123        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2124
2125        // Process sorted account nodes
2126        for (key, updated_node) in trie_updates.account_nodes_ref() {
2127            let nibbles = StoredNibbles(*key);
2128            match updated_node {
2129                Some(node) => {
2130                    if !nibbles.0.is_empty() {
2131                        num_entries += 1;
2132                        account_trie_cursor.upsert(nibbles, node)?;
2133                    }
2134                }
2135                None => {
2136                    num_entries += 1;
2137                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2138                        account_trie_cursor.delete_current()?;
2139                    }
2140                }
2141            }
2142        }
2143
2144        num_entries +=
2145            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2146
2147        Ok(num_entries)
2148    }
2149
2150    /// Records the current values of all trie nodes which will be updated using the `TrieUpdates`
2151    /// into the trie changesets tables.
2152    ///
2153    /// The intended usage of this method is to call it _prior_ to calling `write_trie_updates` with
2154    /// the same `TrieUpdates`.
2155    ///
2156    /// Returns the number of keys written.
2157    fn write_trie_changesets(
2158        &self,
2159        block_number: BlockNumber,
2160        trie_updates: &TrieUpdatesSorted,
2161        updates_overlay: Option<&TrieUpdatesSorted>,
2162    ) -> ProviderResult<usize> {
2163        let mut num_entries = 0;
2164
2165        let mut changeset_cursor =
2166            self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2167        let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2168
2169        // Wrap the cursor in DatabaseAccountTrieCursor
2170        let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2171
2172        // Create empty TrieUpdatesSorted for when updates_overlay is None
2173        let empty_updates = TrieUpdatesSorted::default();
2174        let overlay = updates_overlay.unwrap_or(&empty_updates);
2175
2176        // Wrap the cursor in InMemoryTrieCursor with the overlay
2177        let mut in_memory_account_cursor =
2178            InMemoryTrieCursor::new_account(&mut db_account_cursor, overlay);
2179
2180        for (path, _) in trie_updates.account_nodes_ref() {
2181            num_entries += 1;
2182            let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2183            changeset_cursor.append_dup(
2184                block_number,
2185                TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2186            )?;
2187        }
2188
2189        let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2190        storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2191
2192        num_entries += self.write_storage_trie_changesets(
2193            block_number,
2194            storage_updates.into_iter(),
2195            updates_overlay,
2196        )?;
2197
2198        Ok(num_entries)
2199    }
2200
2201    fn clear_trie_changesets(&self) -> ProviderResult<()> {
2202        let tx = self.tx_ref();
2203        tx.clear::<tables::AccountsTrieChangeSets>()?;
2204        tx.clear::<tables::StoragesTrieChangeSets>()?;
2205        Ok(())
2206    }
2207
2208    fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2209        let tx = self.tx_ref();
2210        {
2211            let range = from..;
2212            let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2213            let mut walker = cursor.walk_range(range)?;
2214
2215            while walker.next().transpose()?.is_some() {
2216                walker.delete_current()?;
2217            }
2218        }
2219
2220        {
2221            let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2222            let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2223            let mut walker = cursor.walk_range(range)?;
2224
2225            while walker.next().transpose()?.is_some() {
2226                walker.delete_current()?;
2227            }
2228        }
2229
2230        Ok(())
2231    }
2232}
2233
2234impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2235    fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2236        let tx = self.tx_ref();
2237
2238        // Read account trie changes directly into a Vec - data is already sorted by nibbles
2239        // within each block, and we want the oldest (first) version of each node sorted by path.
2240        let mut account_nodes = Vec::new();
2241        let mut seen_account_keys = HashSet::new();
2242        let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2243
2244        for entry in accounts_cursor.walk_range(from..)? {
2245            let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2246            // Only keep the first (oldest) version of each node
2247            if seen_account_keys.insert(nibbles.0) {
2248                account_nodes.push((nibbles.0, node));
2249            }
2250        }
2251
2252        account_nodes.sort_by_key(|(path, _)| *path);
2253
2254        // Read storage trie changes - data is sorted by (block, hashed_address, nibbles)
2255        // Keep track of seen (address, nibbles) pairs to only keep the oldest version per address,
2256        // sorted by path.
2257        let mut storage_tries = B256Map::<Vec<_>>::default();
2258        let mut seen_storage_keys = HashSet::new();
2259        let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2260
2261        // Create storage range starting from `from` block
2262        let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2263
2264        for entry in storages_cursor.walk_range(storage_range_start..)? {
2265            let (
2266                BlockNumberHashedAddress((_, hashed_address)),
2267                TrieChangeSetsEntry { nibbles, node },
2268            ) = entry?;
2269
2270            // Only keep the first (oldest) version of each node for this address
2271            if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2272                storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2273            }
2274        }
2275
2276        // Convert to StorageTrieUpdatesSorted
2277        let storage_tries = storage_tries
2278            .into_iter()
2279            .map(|(address, mut nodes)| {
2280                nodes.sort_by_key(|(path, _)| *path);
2281                (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2282            })
2283            .collect();
2284
2285        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2286    }
2287
2288    fn get_block_trie_updates(
2289        &self,
2290        block_number: BlockNumber,
2291    ) -> ProviderResult<TrieUpdatesSorted> {
2292        let tx = self.tx_ref();
2293
2294        // Step 1: Get the trie reverts for the state after the target block
2295        let reverts = self.trie_reverts(block_number + 1)?;
2296
2297        // Step 2: Create an InMemoryTrieCursorFactory with the reverts
2298        // This gives us the trie state as it was after the target block was processed
2299        let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2300        let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2301
2302        // Step 3: Collect all account trie nodes that changed in the target block
2303        let mut account_nodes = Vec::new();
2304
2305        // Walk through all account trie changes for this block
2306        let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2307        let mut account_cursor = cursor_factory.account_trie_cursor()?;
2308
2309        for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2310            let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2311            // Look up the current value of this trie node using the overlay cursor
2312            let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2313            account_nodes.push((nibbles.0, node_value));
2314        }
2315
2316        // Step 4: Collect all storage trie nodes that changed in the target block
2317        let mut storage_tries = B256Map::default();
2318        let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2319        let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2320        let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2321
2322        let mut current_hashed_address = None;
2323        let mut storage_cursor = None;
2324
2325        for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2326            let (
2327                BlockNumberHashedAddress((_, hashed_address)),
2328                TrieChangeSetsEntry { nibbles, .. },
2329            ) = entry?;
2330
2331            // Check if we need to create a new storage cursor for a different account
2332            if current_hashed_address != Some(hashed_address) {
2333                storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2334                current_hashed_address = Some(hashed_address);
2335            }
2336
2337            // Look up the current value of this storage trie node
2338            let cursor =
2339                storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2340            let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2341            storage_tries
2342                .entry(hashed_address)
2343                .or_insert_with(|| StorageTrieUpdatesSorted {
2344                    storage_nodes: Vec::new(),
2345                    is_deleted: false,
2346                })
2347                .storage_nodes
2348                .push((nibbles.0, node_value));
2349        }
2350
2351        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2352    }
2353}
2354
2355impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2356    /// Writes storage trie updates from the given storage trie map with already sorted updates.
2357    ///
2358    /// Expects the storage trie updates to already be sorted by the hashed address key.
2359    ///
2360    /// Returns the number of entries modified.
2361    fn write_storage_trie_updates_sorted<'a>(
2362        &self,
2363        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2364    ) -> ProviderResult<usize> {
2365        let mut num_entries = 0;
2366        let mut storage_tries = storage_tries.collect::<Vec<_>>();
2367        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2368        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2369        for (hashed_address, storage_trie_updates) in storage_tries {
2370            let mut db_storage_trie_cursor =
2371                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2372            num_entries +=
2373                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2374            cursor = db_storage_trie_cursor.cursor;
2375        }
2376
2377        Ok(num_entries)
2378    }
2379
2380    /// Records the current values of all trie nodes which will be updated using the
2381    /// `StorageTrieUpdates` into the storage trie changesets table.
2382    ///
2383    /// The intended usage of this method is to call it _prior_ to calling
2384    /// `write_storage_trie_updates` with the same set of `StorageTrieUpdates`.
2385    ///
2386    /// Returns the number of keys written.
2387    fn write_storage_trie_changesets<'a>(
2388        &self,
2389        block_number: BlockNumber,
2390        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2391        updates_overlay: Option<&TrieUpdatesSorted>,
2392    ) -> ProviderResult<usize> {
2393        let mut num_written = 0;
2394
2395        let mut changeset_cursor =
2396            self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2397
2398        // We hold two cursors to the same table because we use them simultaneously when an
2399        // account's storage is wiped. We keep them outside the for-loop so they can be re-used
2400        // between accounts.
2401        let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2402        let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2403
2404        // DatabaseStorageTrieCursor requires ownership of the cursor. The easiest way to deal with
2405        // this is to create this outer variable with an initial dummy account, and overwrite it on
2406        // every loop for every real account.
2407        let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2408            changed_curr_values_cursor,
2409            B256::default(), // Will be set per iteration
2410        );
2411        let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2412            wiped_nodes_cursor,
2413            B256::default(), // Will be set per iteration
2414        );
2415
2416        // Create empty TrieUpdatesSorted for when updates_overlay is None
2417        let empty_updates = TrieUpdatesSorted::default();
2418
2419        for (hashed_address, storage_trie_updates) in storage_tries {
2420            let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2421
2422            // Update the hashed address for the cursors
2423            changed_curr_values_cursor =
2424                DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2425
2426            // Get the overlay updates, or use empty updates
2427            let overlay = updates_overlay.unwrap_or(&empty_updates);
2428
2429            // Wrap the cursor in InMemoryTrieCursor with the overlay
2430            let mut in_memory_changed_cursor = InMemoryTrieCursor::new_storage(
2431                &mut changed_curr_values_cursor,
2432                overlay,
2433                *hashed_address,
2434            );
2435
2436            // Create an iterator which produces the current values of all updated paths, or None if
2437            // they are currently unset.
2438            let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2439                storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2440                &mut in_memory_changed_cursor,
2441            )?;
2442
2443            if storage_trie_updates.is_deleted() {
2444                // Create an iterator that starts from the beginning of the storage trie for this
2445                // account
2446                wiped_nodes_cursor =
2447                    DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2448
2449                // Wrap the wiped nodes cursor in InMemoryTrieCursor with the overlay
2450                let mut in_memory_wiped_cursor = InMemoryTrieCursor::new_storage(
2451                    &mut wiped_nodes_cursor,
2452                    overlay,
2453                    *hashed_address,
2454                );
2455
2456                let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2457
2458                for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2459                    let (path, node) = wiped?;
2460                    num_written += 1;
2461                    changeset_cursor.append_dup(
2462                        changeset_key,
2463                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2464                    )?;
2465                }
2466            } else {
2467                for curr_value in curr_values_of_changed {
2468                    let (path, node) = curr_value?;
2469                    num_written += 1;
2470                    changeset_cursor.append_dup(
2471                        changeset_key,
2472                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2473                    )?;
2474                }
2475            }
2476        }
2477
2478        Ok(num_written)
2479    }
2480}
2481
2482impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2483    fn unwind_account_hashing<'a>(
2484        &self,
2485        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2486    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2487        // Aggregate all block changesets and make a list of accounts that have been changed.
2488        // Note that collecting and then reversing the order is necessary to ensure that the
2489        // changes are applied in the correct order.
2490        let hashed_accounts = changesets
2491            .into_iter()
2492            .map(|(_, e)| (keccak256(e.address), e.info))
2493            .collect::<Vec<_>>()
2494            .into_iter()
2495            .rev()
2496            .collect::<BTreeMap<_, _>>();
2497
2498        // Apply values to HashedState, and remove the account if it's None.
2499        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2500        for (hashed_address, account) in &hashed_accounts {
2501            if let Some(account) = account {
2502                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2503            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2504                hashed_accounts_cursor.delete_current()?;
2505            }
2506        }
2507
2508        Ok(hashed_accounts)
2509    }
2510
2511    fn unwind_account_hashing_range(
2512        &self,
2513        range: impl RangeBounds<BlockNumber>,
2514    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2515        let changesets = self
2516            .tx
2517            .cursor_read::<tables::AccountChangeSets>()?
2518            .walk_range(range)?
2519            .collect::<Result<Vec<_>, _>>()?;
2520        self.unwind_account_hashing(changesets.iter())
2521    }
2522
2523    fn insert_account_for_hashing(
2524        &self,
2525        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2526    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2527        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2528        let hashed_accounts =
2529            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2530        for (hashed_address, account) in &hashed_accounts {
2531            if let Some(account) = account {
2532                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2533            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2534                hashed_accounts_cursor.delete_current()?;
2535            }
2536        }
2537        Ok(hashed_accounts)
2538    }
2539
2540    fn unwind_storage_hashing(
2541        &self,
2542        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2543    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2544        // Aggregate all block changesets and make list of accounts that have been changed.
2545        let mut hashed_storages = changesets
2546            .into_iter()
2547            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2548                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2549            })
2550            .collect::<Vec<_>>();
2551        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2552
2553        // Apply values to HashedState, and remove the account if it's None.
2554        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2555            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2556        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2557        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2558            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2559
2560            if hashed_storage
2561                .seek_by_key_subkey(hashed_address, key)?
2562                .filter(|entry| entry.key == key)
2563                .is_some()
2564            {
2565                hashed_storage.delete_current()?;
2566            }
2567
2568            if !value.is_zero() {
2569                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2570            }
2571        }
2572        Ok(hashed_storage_keys)
2573    }
2574
2575    fn unwind_storage_hashing_range(
2576        &self,
2577        range: impl RangeBounds<BlockNumberAddress>,
2578    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2579        let changesets = self
2580            .tx
2581            .cursor_read::<tables::StorageChangeSets>()?
2582            .walk_range(range)?
2583            .collect::<Result<Vec<_>, _>>()?;
2584        self.unwind_storage_hashing(changesets.into_iter())
2585    }
2586
2587    fn insert_storage_for_hashing(
2588        &self,
2589        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2590    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2591        // hash values
2592        let hashed_storages =
2593            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2594                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2595                    map.insert(keccak256(entry.key), entry.value);
2596                    map
2597                });
2598                map.insert(keccak256(address), storage);
2599                map
2600            });
2601
2602        let hashed_storage_keys = hashed_storages
2603            .iter()
2604            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2605            .collect();
2606
2607        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2608        // Hash the address and key and apply them to HashedStorage (if Storage is None
2609        // just remove it);
2610        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2611            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2612                if hashed_storage_cursor
2613                    .seek_by_key_subkey(hashed_address, key)?
2614                    .filter(|entry| entry.key == key)
2615                    .is_some()
2616                {
2617                    hashed_storage_cursor.delete_current()?;
2618                }
2619
2620                if !value.is_zero() {
2621                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2622                }
2623                Ok(())
2624            })
2625        })?;
2626
2627        Ok(hashed_storage_keys)
2628    }
2629}
2630
2631impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2632    fn unwind_account_history_indices<'a>(
2633        &self,
2634        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2635    ) -> ProviderResult<usize> {
2636        let mut last_indices = changesets
2637            .into_iter()
2638            .map(|(index, account)| (account.address, *index))
2639            .collect::<Vec<_>>();
2640        last_indices.sort_by_key(|(a, _)| *a);
2641
2642        // Unwind the account history index.
2643        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2644        for &(address, rem_index) in &last_indices {
2645            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2646                &mut cursor,
2647                ShardedKey::last(address),
2648                rem_index,
2649                |sharded_key| sharded_key.key == address,
2650            )?;
2651
2652            // Check the last returned partial shard.
2653            // If it's not empty, the shard needs to be reinserted.
2654            if !partial_shard.is_empty() {
2655                cursor.insert(
2656                    ShardedKey::last(address),
2657                    &BlockNumberList::new_pre_sorted(partial_shard),
2658                )?;
2659            }
2660        }
2661
2662        let changesets = last_indices.len();
2663        Ok(changesets)
2664    }
2665
2666    fn unwind_account_history_indices_range(
2667        &self,
2668        range: impl RangeBounds<BlockNumber>,
2669    ) -> ProviderResult<usize> {
2670        let changesets = self
2671            .tx
2672            .cursor_read::<tables::AccountChangeSets>()?
2673            .walk_range(range)?
2674            .collect::<Result<Vec<_>, _>>()?;
2675        self.unwind_account_history_indices(changesets.iter())
2676    }
2677
2678    fn insert_account_history_index(
2679        &self,
2680        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2681    ) -> ProviderResult<()> {
2682        self.append_history_index::<_, tables::AccountsHistory>(
2683            account_transitions,
2684            ShardedKey::new,
2685        )
2686    }
2687
2688    fn unwind_storage_history_indices(
2689        &self,
2690        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2691    ) -> ProviderResult<usize> {
2692        let mut storage_changesets = changesets
2693            .into_iter()
2694            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2695            .collect::<Vec<_>>();
2696        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2697
2698        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2699        for &(address, storage_key, rem_index) in &storage_changesets {
2700            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2701                &mut cursor,
2702                StorageShardedKey::last(address, storage_key),
2703                rem_index,
2704                |storage_sharded_key| {
2705                    storage_sharded_key.address == address &&
2706                        storage_sharded_key.sharded_key.key == storage_key
2707                },
2708            )?;
2709
2710            // Check the last returned partial shard.
2711            // If it's not empty, the shard needs to be reinserted.
2712            if !partial_shard.is_empty() {
2713                cursor.insert(
2714                    StorageShardedKey::last(address, storage_key),
2715                    &BlockNumberList::new_pre_sorted(partial_shard),
2716                )?;
2717            }
2718        }
2719
2720        let changesets = storage_changesets.len();
2721        Ok(changesets)
2722    }
2723
2724    fn unwind_storage_history_indices_range(
2725        &self,
2726        range: impl RangeBounds<BlockNumberAddress>,
2727    ) -> ProviderResult<usize> {
2728        let changesets = self
2729            .tx
2730            .cursor_read::<tables::StorageChangeSets>()?
2731            .walk_range(range)?
2732            .collect::<Result<Vec<_>, _>>()?;
2733        self.unwind_storage_history_indices(changesets.into_iter())
2734    }
2735
2736    fn insert_storage_history_index(
2737        &self,
2738        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2739    ) -> ProviderResult<()> {
2740        self.append_history_index::<_, tables::StoragesHistory>(
2741            storage_transitions,
2742            |(address, storage_key), highest_block_number| {
2743                StorageShardedKey::new(address, storage_key, highest_block_number)
2744            },
2745        )
2746    }
2747
2748    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2749        // account history stage
2750        {
2751            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2752            self.insert_account_history_index(indices)?;
2753        }
2754
2755        // storage history stage
2756        {
2757            let indices = self.changed_storages_and_blocks_with_range(range)?;
2758            self.insert_storage_history_index(indices)?;
2759        }
2760
2761        Ok(())
2762    }
2763}
2764
2765impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2766    for DatabaseProvider<TX, N>
2767{
2768    fn take_block_and_execution_above(
2769        &self,
2770        block: BlockNumber,
2771    ) -> ProviderResult<Chain<Self::Primitives>> {
2772        let range = block + 1..=self.last_block_number()?;
2773
2774        self.unwind_trie_state_from(block + 1)?;
2775
2776        // get execution res
2777        let execution_state = self.take_state_above(block)?;
2778
2779        let blocks = self.recovered_block_range(range)?;
2780
2781        // remove block bodies it is needed for both get block range and get block execution results
2782        // that is why it is deleted afterwards.
2783        self.remove_blocks_above(block)?;
2784
2785        // Update pipeline progress
2786        self.update_pipeline_stages(block, true)?;
2787
2788        Ok(Chain::new(blocks, execution_state, None))
2789    }
2790
2791    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2792        self.unwind_trie_state_from(block + 1)?;
2793
2794        // remove execution res
2795        self.remove_state_above(block)?;
2796
2797        // remove block bodies it is needed for both get block range and get block execution results
2798        // that is why it is deleted afterwards.
2799        self.remove_blocks_above(block)?;
2800
2801        // Update pipeline progress
2802        self.update_pipeline_stages(block, true)?;
2803
2804        Ok(())
2805    }
2806}
2807
2808impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2809    for DatabaseProvider<TX, N>
2810{
2811    type Block = BlockTy<N>;
2812    type Receipt = ReceiptTy<N>;
2813
2814    /// Inserts the block into the database, always modifying the following static file segments and
2815    /// tables:
2816    /// * [`StaticFileSegment::Headers`]
2817    /// * [`tables::HeaderNumbers`]
2818    /// * [`tables::BlockBodyIndices`]
2819    ///
2820    /// If there are transactions in the block, the following static file segments and tables will
2821    /// be modified:
2822    /// * [`StaticFileSegment::Transactions`]
2823    /// * [`tables::TransactionBlocks`]
2824    ///
2825    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2826    /// If withdrawals are not empty, this will modify
2827    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2828    ///
2829    /// If the provider has __not__ configured full sender pruning, this will modify either:
2830    /// * [`StaticFileSegment::TransactionSenders`] if senders are written to static files
2831    /// * [`tables::TransactionSenders`] if senders are written to the database
2832    ///
2833    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2834    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2835    fn insert_block(
2836        &self,
2837        block: RecoveredBlock<Self::Block>,
2838    ) -> ProviderResult<StoredBlockBodyIndices> {
2839        let block_number = block.number();
2840        let tx_count = block.body().transaction_count() as u64;
2841
2842        let mut durations_recorder = metrics::DurationsRecorder::default();
2843
2844        self.static_file_provider
2845            .get_writer(block_number, StaticFileSegment::Headers)?
2846            .append_header(block.header(), &block.hash())?;
2847
2848        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2849        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2850
2851        let first_tx_num = self
2852            .tx
2853            .cursor_read::<tables::TransactionBlocks>()?
2854            .last()?
2855            .map(|(n, _)| n + 1)
2856            .unwrap_or_default();
2857        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2858
2859        let tx_nums_iter = std::iter::successors(Some(first_tx_num), |n| Some(n + 1));
2860
2861        if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2862            let mut senders_writer = EitherWriter::new_senders(self, block.number())?;
2863            senders_writer.increment_block(block.number())?;
2864            senders_writer
2865                .append_senders(tx_nums_iter.clone().zip(block.senders_iter().copied()))?;
2866            durations_recorder.record_relative(metrics::Action::InsertTransactionSenders);
2867        }
2868
2869        if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2870            for (tx_num, transaction) in tx_nums_iter.zip(block.body().transactions_iter()) {
2871                let hash = transaction.tx_hash();
2872                self.tx.put::<tables::TransactionHashNumbers>(*hash, tx_num)?;
2873            }
2874            durations_recorder.record_relative(metrics::Action::InsertTransactionHashNumbers);
2875        }
2876
2877        self.append_block_bodies(vec![(block_number, Some(block.into_body()))])?;
2878
2879        debug!(
2880            target: "providers::db",
2881            ?block_number,
2882            actions = ?durations_recorder.actions,
2883            "Inserted block"
2884        );
2885
2886        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2887    }
2888
2889    fn append_block_bodies(
2890        &self,
2891        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2892    ) -> ProviderResult<()> {
2893        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2894
2895        // Initialize writer if we will be writing transactions to staticfiles
2896        let mut tx_writer =
2897            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2898
2899        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2900        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2901
2902        // Get id for the next tx_num or zero if there are no transactions.
2903        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2904
2905        for (block_number, body) in &bodies {
2906            // Increment block on static file header.
2907            tx_writer.increment_block(*block_number)?;
2908
2909            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2910            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2911
2912            let mut durations_recorder = metrics::DurationsRecorder::default();
2913
2914            // insert block meta
2915            block_indices_cursor.append(*block_number, &block_indices)?;
2916
2917            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2918
2919            let Some(body) = body else { continue };
2920
2921            // write transaction block index
2922            if !body.transactions().is_empty() {
2923                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2924                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2925            }
2926
2927            // write transactions
2928            for transaction in body.transactions() {
2929                tx_writer.append_transaction(next_tx_num, transaction)?;
2930
2931                // Increment transaction id for each transaction.
2932                next_tx_num += 1;
2933            }
2934        }
2935
2936        self.storage.writer().write_block_bodies(self, bodies)?;
2937
2938        Ok(())
2939    }
2940
2941    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2942        let last_block_number = self.last_block_number()?;
2943        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
2944        for hash in self.canonical_hashes_range(block + 1, last_block_number + 1)? {
2945            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2946        }
2947
2948        // Get highest static file block for the total block range
2949        let highest_static_file_block = self
2950            .static_file_provider()
2951            .get_highest_static_file_block(StaticFileSegment::Headers)
2952            .expect("todo: error handling, headers should exist");
2953
2954        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
2955        // we remove only what is ABOVE the block.
2956        //
2957        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
2958        // will have three blocks to remove, which will be block 8, 7, and 6.
2959        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
2960        self.static_file_provider()
2961            .get_writer(block, StaticFileSegment::Headers)?
2962            .prune_headers(highest_static_file_block.saturating_sub(block))?;
2963
2964        // First transaction to be removed
2965        let unwind_tx_from = self
2966            .block_body_indices(block)?
2967            .map(|b| b.next_tx_num())
2968            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2969
2970        // Last transaction to be removed
2971        let unwind_tx_to = self
2972            .tx
2973            .cursor_read::<tables::BlockBodyIndices>()?
2974            .last()?
2975            // shouldn't happen because this was OK above
2976            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2977            .1
2978            .last_tx_num();
2979
2980        if unwind_tx_from <= unwind_tx_to {
2981            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2982                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2983            }
2984        }
2985
2986        EitherWriter::new_senders(self, last_block_number)?.prune_senders(unwind_tx_from, block)?;
2987
2988        self.remove_bodies_above(block)?;
2989
2990        Ok(())
2991    }
2992
2993    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
2994        self.storage.writer().remove_block_bodies_above(self, block)?;
2995
2996        // First transaction to be removed
2997        let unwind_tx_from = self
2998            .block_body_indices(block)?
2999            .map(|b| b.next_tx_num())
3000            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3001
3002        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3003        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3004
3005        let static_file_tx_num =
3006            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
3007
3008        let to_delete = static_file_tx_num
3009            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3010            .unwrap_or_default();
3011
3012        self.static_file_provider
3013            .latest_writer(StaticFileSegment::Transactions)?
3014            .prune_transactions(to_delete, block)?;
3015
3016        Ok(())
3017    }
3018
3019    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3020    fn append_blocks_with_state(
3021        &self,
3022        blocks: Vec<RecoveredBlock<Self::Block>>,
3023        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3024        hashed_state: HashedPostStateSorted,
3025    ) -> ProviderResult<()> {
3026        if blocks.is_empty() {
3027            debug!(target: "providers::db", "Attempted to append empty block range");
3028            return Ok(())
3029        }
3030
3031        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3032        // `None`.
3033        let first_number = blocks[0].number();
3034
3035        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3036        // `None`.
3037        let last_block_number = blocks[blocks.len() - 1].number();
3038
3039        let mut durations_recorder = metrics::DurationsRecorder::default();
3040
3041        // Insert the blocks
3042        for block in blocks {
3043            self.insert_block(block)?;
3044            durations_recorder.record_relative(metrics::Action::InsertBlock);
3045        }
3046
3047        self.write_state(execution_outcome, OriginalValuesKnown::No)?;
3048        durations_recorder.record_relative(metrics::Action::InsertState);
3049
3050        // insert hashes and intermediate merkle nodes
3051        self.write_hashed_state(&hashed_state)?;
3052        durations_recorder.record_relative(metrics::Action::InsertHashes);
3053
3054        self.update_history_indices(first_number..=last_block_number)?;
3055        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3056
3057        // Update pipeline progress
3058        self.update_pipeline_stages(last_block_number, false)?;
3059        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3060
3061        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3062
3063        Ok(())
3064    }
3065}
3066
3067impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3068    fn get_prune_checkpoint(
3069        &self,
3070        segment: PruneSegment,
3071    ) -> ProviderResult<Option<PruneCheckpoint>> {
3072        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3073    }
3074
3075    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3076        Ok(PruneSegment::variants()
3077            .filter_map(|segment| {
3078                self.tx
3079                    .get::<tables::PruneCheckpoints>(segment)
3080                    .transpose()
3081                    .map(|chk| chk.map(|chk| (segment, chk)))
3082            })
3083            .collect::<Result<_, _>>()?)
3084    }
3085}
3086
3087impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3088    fn save_prune_checkpoint(
3089        &self,
3090        segment: PruneSegment,
3091        checkpoint: PruneCheckpoint,
3092    ) -> ProviderResult<()> {
3093        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3094    }
3095}
3096
3097impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3098    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3099        let db_entries = self.tx.entries::<T>()?;
3100        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3101            Ok(entries) => entries,
3102            Err(ProviderError::UnsupportedProvider) => 0,
3103            Err(err) => return Err(err),
3104        };
3105
3106        Ok(db_entries + static_file_entries)
3107    }
3108}
3109
3110impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3111    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3112        let mut finalized_blocks = self
3113            .tx
3114            .cursor_read::<tables::ChainState>()?
3115            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3116            .take(1)
3117            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3118
3119        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3120        Ok(last_finalized_block_number)
3121    }
3122
3123    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3124        let mut finalized_blocks = self
3125            .tx
3126            .cursor_read::<tables::ChainState>()?
3127            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3128            .take(1)
3129            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3130
3131        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3132        Ok(last_finalized_block_number)
3133    }
3134}
3135
3136impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3137    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3138        Ok(self
3139            .tx
3140            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3141    }
3142
3143    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3144        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3145    }
3146}
3147
3148impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3149    type Tx = TX;
3150
3151    fn tx_ref(&self) -> &Self::Tx {
3152        &self.tx
3153    }
3154
3155    fn tx_mut(&mut self) -> &mut Self::Tx {
3156        &mut self.tx
3157    }
3158
3159    fn into_tx(self) -> Self::Tx {
3160        self.tx
3161    }
3162
3163    fn prune_modes_ref(&self) -> &PruneModes {
3164        self.prune_modes_ref()
3165    }
3166
3167    /// Commit database transaction and static files.
3168    fn commit(self) -> ProviderResult<bool> {
3169        // For unwinding it makes more sense to commit the database first, since if
3170        // it is interrupted before the static files commit, we can just
3171        // truncate the static files according to the
3172        // checkpoints on the next start-up.
3173        if self.static_file_provider.has_unwind_queued() {
3174            self.tx.commit()?;
3175            self.static_file_provider.commit()?;
3176        } else {
3177            self.static_file_provider.commit()?;
3178            self.tx.commit()?;
3179        }
3180
3181        Ok(true)
3182    }
3183}
3184
3185impl<TX: DbTx, N: NodeTypes> MetadataProvider for DatabaseProvider<TX, N> {
3186    fn get_metadata(&self, key: &str) -> ProviderResult<Option<Vec<u8>>> {
3187        self.tx.get::<tables::Metadata>(key.to_string()).map_err(Into::into)
3188    }
3189}
3190
3191impl<TX: DbTxMut, N: NodeTypes> MetadataWriter for DatabaseProvider<TX, N> {
3192    fn write_metadata(&self, key: &str, value: Vec<u8>) -> ProviderResult<()> {
3193        self.tx.put::<tables::Metadata>(key.to_string(), value).map_err(Into::into)
3194    }
3195}
3196
3197impl<TX: Send + Sync, N: NodeTypes> StorageSettingsCache for DatabaseProvider<TX, N> {
3198    fn cached_storage_settings(&self) -> StorageSettings {
3199        *self.storage_settings.read()
3200    }
3201
3202    fn set_storage_settings_cache(&self, settings: StorageSettings) {
3203        *self.storage_settings.write() = settings;
3204    }
3205}
3206
3207#[cfg(test)]
3208mod tests {
3209    use super::*;
3210    use crate::{
3211        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3212        BlockWriter,
3213    };
3214    use reth_ethereum_primitives::Receipt;
3215    use reth_testing_utils::generators::{self, random_block, BlockParams};
3216    use reth_trie::Nibbles;
3217
3218    #[test]
3219    fn test_receipts_by_block_range_empty_range() {
3220        let factory = create_test_provider_factory();
3221        let provider = factory.provider().unwrap();
3222
3223        // empty range should return empty vec
3224        let start = 10u64;
3225        let end = 9u64;
3226        let result = provider.receipts_by_block_range(start..=end).unwrap();
3227        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3228    }
3229
3230    #[test]
3231    fn test_receipts_by_block_range_nonexistent_blocks() {
3232        let factory = create_test_provider_factory();
3233        let provider = factory.provider().unwrap();
3234
3235        // non-existent blocks should return empty vecs for each block
3236        let result = provider.receipts_by_block_range(10..=12).unwrap();
3237        assert_eq!(result, vec![vec![], vec![], vec![]]);
3238    }
3239
3240    #[test]
3241    fn test_receipts_by_block_range_single_block() {
3242        let factory = create_test_provider_factory();
3243        let data = BlockchainTestData::default();
3244
3245        let provider_rw = factory.provider_rw().unwrap();
3246        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3247        provider_rw
3248            .write_state(
3249                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3250                crate::OriginalValuesKnown::No,
3251            )
3252            .unwrap();
3253        provider_rw.insert_block(data.blocks[0].0.clone()).unwrap();
3254        provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3255        provider_rw.commit().unwrap();
3256
3257        let provider = factory.provider().unwrap();
3258        let result = provider.receipts_by_block_range(1..=1).unwrap();
3259
3260        // should have one vec with one receipt
3261        assert_eq!(result.len(), 1);
3262        assert_eq!(result[0].len(), 1);
3263        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3264    }
3265
3266    #[test]
3267    fn test_receipts_by_block_range_multiple_blocks() {
3268        let factory = create_test_provider_factory();
3269        let data = BlockchainTestData::default();
3270
3271        let provider_rw = factory.provider_rw().unwrap();
3272        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3273        provider_rw
3274            .write_state(
3275                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3276                crate::OriginalValuesKnown::No,
3277            )
3278            .unwrap();
3279        for i in 0..3 {
3280            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3281            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3282        }
3283        provider_rw.commit().unwrap();
3284
3285        let provider = factory.provider().unwrap();
3286        let result = provider.receipts_by_block_range(1..=3).unwrap();
3287
3288        // should have 3 vecs, each with one receipt
3289        assert_eq!(result.len(), 3);
3290        for (i, block_receipts) in result.iter().enumerate() {
3291            assert_eq!(block_receipts.len(), 1);
3292            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3293        }
3294    }
3295
3296    #[test]
3297    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3298        let factory = create_test_provider_factory();
3299        let data = BlockchainTestData::default();
3300
3301        let provider_rw = factory.provider_rw().unwrap();
3302        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3303        provider_rw
3304            .write_state(
3305                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3306                crate::OriginalValuesKnown::No,
3307            )
3308            .unwrap();
3309
3310        // insert blocks 1-3 with receipts
3311        for i in 0..3 {
3312            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3313            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3314        }
3315        provider_rw.commit().unwrap();
3316
3317        let provider = factory.provider().unwrap();
3318        let result = provider.receipts_by_block_range(1..=3).unwrap();
3319
3320        // verify each block has one receipt
3321        assert_eq!(result.len(), 3);
3322        for block_receipts in &result {
3323            assert_eq!(block_receipts.len(), 1);
3324        }
3325    }
3326
3327    #[test]
3328    fn test_receipts_by_block_range_partial_range() {
3329        let factory = create_test_provider_factory();
3330        let data = BlockchainTestData::default();
3331
3332        let provider_rw = factory.provider_rw().unwrap();
3333        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3334        provider_rw
3335            .write_state(
3336                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3337                crate::OriginalValuesKnown::No,
3338            )
3339            .unwrap();
3340        for i in 0..3 {
3341            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3342            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3343        }
3344        provider_rw.commit().unwrap();
3345
3346        let provider = factory.provider().unwrap();
3347
3348        // request range that includes both existing and non-existing blocks
3349        let result = provider.receipts_by_block_range(2..=5).unwrap();
3350        assert_eq!(result.len(), 4);
3351
3352        // blocks 2-3 should have receipts, blocks 4-5 should be empty
3353        assert_eq!(result[0].len(), 1); // block 2
3354        assert_eq!(result[1].len(), 1); // block 3
3355        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
3356        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
3357
3358        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3359        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3360    }
3361
3362    #[test]
3363    fn test_receipts_by_block_range_all_empty_blocks() {
3364        let factory = create_test_provider_factory();
3365        let mut rng = generators::rng();
3366
3367        // create blocks with no transactions
3368        let mut blocks = Vec::new();
3369        for i in 0..3 {
3370            let block =
3371                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3372            blocks.push(block);
3373        }
3374
3375        let provider_rw = factory.provider_rw().unwrap();
3376        for block in blocks {
3377            provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
3378        }
3379        provider_rw.commit().unwrap();
3380
3381        let provider = factory.provider().unwrap();
3382        let result = provider.receipts_by_block_range(1..=3).unwrap();
3383
3384        assert_eq!(result.len(), 3);
3385        for block_receipts in result {
3386            assert_eq!(block_receipts.len(), 0);
3387        }
3388    }
3389
3390    #[test]
3391    fn test_receipts_by_block_range_consistency_with_individual_calls() {
3392        let factory = create_test_provider_factory();
3393        let data = BlockchainTestData::default();
3394
3395        let provider_rw = factory.provider_rw().unwrap();
3396        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3397        provider_rw
3398            .write_state(
3399                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3400                crate::OriginalValuesKnown::No,
3401            )
3402            .unwrap();
3403        for i in 0..3 {
3404            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3405            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3406        }
3407        provider_rw.commit().unwrap();
3408
3409        let provider = factory.provider().unwrap();
3410
3411        // get receipts using block range method
3412        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3413
3414        // get receipts using individual block calls
3415        let mut individual_results = Vec::new();
3416        for block_num in 1..=3 {
3417            let receipts =
3418                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3419            individual_results.push(receipts);
3420        }
3421
3422        assert_eq!(range_result, individual_results);
3423    }
3424
3425    #[test]
3426    fn test_write_trie_changesets() {
3427        use reth_db_api::models::BlockNumberHashedAddress;
3428        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3429
3430        let factory = create_test_provider_factory();
3431        let provider_rw = factory.provider_rw().unwrap();
3432
3433        let block_number = 1u64;
3434
3435        // Create some test nibbles and nodes
3436        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3437        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3438
3439        let node1 = BranchNodeCompact::new(
3440            0b1111_1111_1111_1111, // state_mask
3441            0b0000_0000_0000_0000, // tree_mask
3442            0b0000_0000_0000_0000, // hash_mask
3443            vec![],                // hashes
3444            None,                  // root hash
3445        );
3446
3447        // Pre-populate AccountsTrie with a node that will be updated (for account_nibbles1)
3448        {
3449            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3450            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3451        }
3452
3453        // Create account trie updates: one Some (update) and one None (removal)
3454        let account_nodes = vec![
3455            (account_nibbles1, Some(node1.clone())), // This will update existing node
3456            (account_nibbles2, None),                // This will be a removal (no existing node)
3457        ];
3458
3459        // Create storage trie updates
3460        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3461        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3462
3463        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3464        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3465        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3466
3467        let storage_node1 = BranchNodeCompact::new(
3468            0b1111_0000_0000_0000,
3469            0b0000_0000_0000_0000,
3470            0b0000_0000_0000_0000,
3471            vec![],
3472            None,
3473        );
3474
3475        let storage_node2 = BranchNodeCompact::new(
3476            0b0000_1111_0000_0000,
3477            0b0000_0000_0000_0000,
3478            0b0000_0000_0000_0000,
3479            vec![],
3480            None,
3481        );
3482
3483        // Create an old version of storage_node1 to prepopulate
3484        let storage_node1_old = BranchNodeCompact::new(
3485            0b1010_0000_0000_0000, // Different mask to show it's an old value
3486            0b0000_0000_0000_0000,
3487            0b0000_0000_0000_0000,
3488            vec![],
3489            None,
3490        );
3491
3492        // Pre-populate StoragesTrie for normal storage (storage_address1)
3493        {
3494            let mut cursor =
3495                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3496            // Add node that will be updated (storage_nibbles1) with old value
3497            let entry = StorageTrieEntry {
3498                nibbles: StoredNibblesSubKey(storage_nibbles1),
3499                node: storage_node1_old.clone(),
3500            };
3501            cursor.upsert(storage_address1, &entry).unwrap();
3502        }
3503
3504        // Pre-populate StoragesTrie for wiped storage (storage_address2)
3505        {
3506            let mut cursor =
3507                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3508            // Add node that will be updated (storage_nibbles1)
3509            let entry1 = StorageTrieEntry {
3510                nibbles: StoredNibblesSubKey(storage_nibbles1),
3511                node: storage_node1.clone(),
3512            };
3513            cursor.upsert(storage_address2, &entry1).unwrap();
3514            // Add node that won't be updated but exists (storage_nibbles3)
3515            let entry3 = StorageTrieEntry {
3516                nibbles: StoredNibblesSubKey(storage_nibbles3),
3517                node: storage_node2.clone(),
3518            };
3519            cursor.upsert(storage_address2, &entry3).unwrap();
3520        }
3521
3522        // Normal storage trie: one Some (update) and one None (new)
3523        let storage_trie1 = StorageTrieUpdatesSorted {
3524            is_deleted: false,
3525            storage_nodes: vec![
3526                (storage_nibbles1, Some(storage_node1.clone())), // This will update existing node
3527                (storage_nibbles2, None),                        // This is a new node
3528            ],
3529        };
3530
3531        // Wiped storage trie
3532        let storage_trie2 = StorageTrieUpdatesSorted {
3533            is_deleted: true,
3534            storage_nodes: vec![
3535                (storage_nibbles1, Some(storage_node1.clone())), // Updated node already in db
3536                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in db
3537                                                                  * storage_nibbles3 is in db
3538                                                                  * but not updated */
3539            ],
3540        };
3541
3542        let mut storage_tries = B256Map::default();
3543        storage_tries.insert(storage_address1, storage_trie1);
3544        storage_tries.insert(storage_address2, storage_trie2);
3545
3546        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3547
3548        // Write the changesets
3549        let num_written =
3550            provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3551
3552        // Verify number of entries written
3553        // Account changesets: 2 (one update, one removal)
3554        // Storage changesets:
3555        //   - Normal storage: 2 (one update, one removal)
3556        //   - Wiped storage: 3 (two updated, one existing not updated)
3557        // Total: 2 + 2 + 3 = 7
3558        assert_eq!(num_written, 7);
3559
3560        // Verify account changesets were written correctly
3561        {
3562            let mut cursor =
3563                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3564
3565            // Get all entries for this block to see what was written
3566            let all_entries = cursor
3567                .walk_dup(Some(block_number), None)
3568                .unwrap()
3569                .collect::<Result<Vec<_>, _>>()
3570                .unwrap();
3571
3572            // Assert the full value of all_entries in a single assert_eq
3573            assert_eq!(
3574                all_entries,
3575                vec![
3576                    (
3577                        block_number,
3578                        TrieChangeSetsEntry {
3579                            nibbles: StoredNibblesSubKey(account_nibbles1),
3580                            node: Some(node1),
3581                        }
3582                    ),
3583                    (
3584                        block_number,
3585                        TrieChangeSetsEntry {
3586                            nibbles: StoredNibblesSubKey(account_nibbles2),
3587                            node: None,
3588                        }
3589                    ),
3590                ]
3591            );
3592        }
3593
3594        // Verify storage changesets were written correctly
3595        {
3596            let mut cursor =
3597                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3598
3599            // Check normal storage trie changesets
3600            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3601            let entries1 =
3602                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3603
3604            assert_eq!(
3605                entries1,
3606                vec![
3607                    (
3608                        key1,
3609                        TrieChangeSetsEntry {
3610                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3611                            node: Some(storage_node1_old), // Old value that was prepopulated
3612                        }
3613                    ),
3614                    (
3615                        key1,
3616                        TrieChangeSetsEntry {
3617                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3618                            node: None, // New node, no previous value
3619                        }
3620                    ),
3621                ]
3622            );
3623
3624            // Check wiped storage trie changesets
3625            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3626            let entries2 =
3627                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3628
3629            assert_eq!(
3630                entries2,
3631                vec![
3632                    (
3633                        key2,
3634                        TrieChangeSetsEntry {
3635                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3636                            node: Some(storage_node1), // Was in db, so has old value
3637                        }
3638                    ),
3639                    (
3640                        key2,
3641                        TrieChangeSetsEntry {
3642                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3643                            node: None, // Was not in db
3644                        }
3645                    ),
3646                    (
3647                        key2,
3648                        TrieChangeSetsEntry {
3649                            nibbles: StoredNibblesSubKey(storage_nibbles3),
3650                            node: Some(storage_node2), // Existing node in wiped storage
3651                        }
3652                    ),
3653                ]
3654            );
3655        }
3656
3657        provider_rw.commit().unwrap();
3658    }
3659
3660    #[test]
3661    fn test_write_trie_changesets_with_overlay() {
3662        use reth_db_api::models::BlockNumberHashedAddress;
3663        use reth_trie::BranchNodeCompact;
3664
3665        let factory = create_test_provider_factory();
3666        let provider_rw = factory.provider_rw().unwrap();
3667
3668        let block_number = 1u64;
3669
3670        // Create some test nibbles and nodes
3671        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3672        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3673
3674        let node1 = BranchNodeCompact::new(
3675            0b1111_1111_1111_1111, // state_mask
3676            0b0000_0000_0000_0000, // tree_mask
3677            0b0000_0000_0000_0000, // hash_mask
3678            vec![],                // hashes
3679            None,                  // root hash
3680        );
3681
3682        // NOTE: Unlike the previous test, we're NOT pre-populating the database
3683        // All node values will come from the overlay
3684
3685        // Create the overlay with existing values that would normally be in the DB
3686        let node1_old = BranchNodeCompact::new(
3687            0b1010_1010_1010_1010, // Different mask to show it's the overlay "existing" value
3688            0b0000_0000_0000_0000,
3689            0b0000_0000_0000_0000,
3690            vec![],
3691            None,
3692        );
3693
3694        // Create overlay account nodes
3695        let overlay_account_nodes = vec![
3696            (account_nibbles1, Some(node1_old.clone())), // This simulates existing node in overlay
3697        ];
3698
3699        // Create account trie updates: one Some (update) and one None (removal)
3700        let account_nodes = vec![
3701            (account_nibbles1, Some(node1)), // This will update overlay node
3702            (account_nibbles2, None),        // This will be a removal (no existing node)
3703        ];
3704
3705        // Create storage trie updates
3706        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3707        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3708
3709        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3710        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3711        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3712
3713        let storage_node1 = BranchNodeCompact::new(
3714            0b1111_0000_0000_0000,
3715            0b0000_0000_0000_0000,
3716            0b0000_0000_0000_0000,
3717            vec![],
3718            None,
3719        );
3720
3721        let storage_node2 = BranchNodeCompact::new(
3722            0b0000_1111_0000_0000,
3723            0b0000_0000_0000_0000,
3724            0b0000_0000_0000_0000,
3725            vec![],
3726            None,
3727        );
3728
3729        // Create old versions for overlay
3730        let storage_node1_old = BranchNodeCompact::new(
3731            0b1010_0000_0000_0000, // Different mask to show it's an old value
3732            0b0000_0000_0000_0000,
3733            0b0000_0000_0000_0000,
3734            vec![],
3735            None,
3736        );
3737
3738        // Create overlay storage nodes
3739        let mut overlay_storage_tries = B256Map::default();
3740
3741        // Overlay for normal storage (storage_address1)
3742        let overlay_storage_trie1 = StorageTrieUpdatesSorted {
3743            is_deleted: false,
3744            storage_nodes: vec![
3745                (storage_nibbles1, Some(storage_node1_old.clone())), /* Simulates existing in
3746                                                                      * overlay */
3747            ],
3748        };
3749
3750        // Overlay for wiped storage (storage_address2)
3751        let overlay_storage_trie2 = StorageTrieUpdatesSorted {
3752            is_deleted: false,
3753            storage_nodes: vec![
3754                (storage_nibbles1, Some(storage_node1.clone())), // Existing in overlay
3755                (storage_nibbles3, Some(storage_node2.clone())), // Also existing in overlay
3756            ],
3757        };
3758
3759        overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
3760        overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
3761
3762        let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
3763
3764        // Normal storage trie: one Some (update) and one None (new)
3765        let storage_trie1 = StorageTrieUpdatesSorted {
3766            is_deleted: false,
3767            storage_nodes: vec![
3768                (storage_nibbles1, Some(storage_node1.clone())), // This will update overlay node
3769                (storage_nibbles2, None),                        // This is a new node
3770            ],
3771        };
3772
3773        // Wiped storage trie
3774        let storage_trie2 = StorageTrieUpdatesSorted {
3775            is_deleted: true,
3776            storage_nodes: vec![
3777                (storage_nibbles1, Some(storage_node1.clone())), // Updated node from overlay
3778                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in overlay
3779                                                                  * storage_nibbles3 is in
3780                                                                  * overlay
3781                                                                  * but not updated */
3782            ],
3783        };
3784
3785        let mut storage_tries = B256Map::default();
3786        storage_tries.insert(storage_address1, storage_trie1);
3787        storage_tries.insert(storage_address2, storage_trie2);
3788
3789        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3790
3791        // Write the changesets WITH OVERLAY
3792        let num_written =
3793            provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
3794
3795        // Verify number of entries written
3796        // Account changesets: 2 (one update from overlay, one removal)
3797        // Storage changesets:
3798        //   - Normal storage: 2 (one update from overlay, one new)
3799        //   - Wiped storage: 3 (two updated, one existing from overlay not updated)
3800        // Total: 2 + 2 + 3 = 7
3801        assert_eq!(num_written, 7);
3802
3803        // Verify account changesets were written correctly
3804        {
3805            let mut cursor =
3806                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3807
3808            // Get all entries for this block to see what was written
3809            let all_entries = cursor
3810                .walk_dup(Some(block_number), None)
3811                .unwrap()
3812                .collect::<Result<Vec<_>, _>>()
3813                .unwrap();
3814
3815            // Assert the full value of all_entries in a single assert_eq
3816            assert_eq!(
3817                all_entries,
3818                vec![
3819                    (
3820                        block_number,
3821                        TrieChangeSetsEntry {
3822                            nibbles: StoredNibblesSubKey(account_nibbles1),
3823                            node: Some(node1_old), // Value from overlay, not DB
3824                        }
3825                    ),
3826                    (
3827                        block_number,
3828                        TrieChangeSetsEntry {
3829                            nibbles: StoredNibblesSubKey(account_nibbles2),
3830                            node: None,
3831                        }
3832                    ),
3833                ]
3834            );
3835        }
3836
3837        // Verify storage changesets were written correctly
3838        {
3839            let mut cursor =
3840                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3841
3842            // Check normal storage trie changesets
3843            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3844            let entries1 =
3845                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3846
3847            assert_eq!(
3848                entries1,
3849                vec![
3850                    (
3851                        key1,
3852                        TrieChangeSetsEntry {
3853                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3854                            node: Some(storage_node1_old), // Old value from overlay
3855                        }
3856                    ),
3857                    (
3858                        key1,
3859                        TrieChangeSetsEntry {
3860                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3861                            node: None, // New node, no previous value
3862                        }
3863                    ),
3864                ]
3865            );
3866
3867            // Check wiped storage trie changesets
3868            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3869            let entries2 =
3870                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3871
3872            assert_eq!(
3873                entries2,
3874                vec![
3875                    (
3876                        key2,
3877                        TrieChangeSetsEntry {
3878                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3879                            node: Some(storage_node1), // Value from overlay
3880                        }
3881                    ),
3882                    (
3883                        key2,
3884                        TrieChangeSetsEntry {
3885                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3886                            node: None, // Was not in overlay
3887                        }
3888                    ),
3889                    (
3890                        key2,
3891                        TrieChangeSetsEntry {
3892                            nibbles: StoredNibblesSubKey(storage_nibbles3),
3893                            node: Some(storage_node2), /* Existing node from overlay in wiped
3894                                                        * storage */
3895                        }
3896                    ),
3897                ]
3898            );
3899        }
3900
3901        provider_rw.commit().unwrap();
3902    }
3903
3904    #[test]
3905    fn test_clear_trie_changesets_from() {
3906        use alloy_primitives::hex_literal::hex;
3907        use reth_db_api::models::BlockNumberHashedAddress;
3908        use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
3909
3910        let factory = create_test_provider_factory();
3911
3912        // Create some test data for different block numbers
3913        let block1 = 100u64;
3914        let block2 = 101u64;
3915        let block3 = 102u64;
3916        let block4 = 103u64;
3917        let block5 = 104u64;
3918
3919        // Create test addresses for storage changesets
3920        let storage_address1 =
3921            B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
3922        let storage_address2 =
3923            B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
3924
3925        // Create test nibbles
3926        let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
3927        let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
3928        let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
3929
3930        // Create test nodes
3931        let node1 = BranchNodeCompact::new(
3932            0b1111_1111_1111_1111,
3933            0b1111_1111_1111_1111,
3934            0b0000_0000_0000_0001,
3935            vec![B256::from(hex!(
3936                "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
3937            ))],
3938            None,
3939        );
3940        let node2 = BranchNodeCompact::new(
3941            0b1111_1111_1111_1110,
3942            0b1111_1111_1111_1110,
3943            0b0000_0000_0000_0010,
3944            vec![B256::from(hex!(
3945                "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
3946            ))],
3947            Some(B256::from(hex!(
3948                "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
3949            ))),
3950        );
3951
3952        // Populate AccountsTrieChangeSets with data across multiple blocks
3953        {
3954            let provider_rw = factory.provider_rw().unwrap();
3955            let mut cursor =
3956                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
3957
3958            // Block 100: 2 entries (will be kept - before start block)
3959            cursor
3960                .upsert(
3961                    block1,
3962                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3963                )
3964                .unwrap();
3965            cursor
3966                .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
3967                .unwrap();
3968
3969            // Block 101: 3 entries with duplicates (will be deleted - from this block onwards)
3970            cursor
3971                .upsert(
3972                    block2,
3973                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
3974                )
3975                .unwrap();
3976            cursor
3977                .upsert(
3978                    block2,
3979                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3980                )
3981                .unwrap(); // duplicate key
3982            cursor
3983                .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3984                .unwrap();
3985
3986            // Block 102: 2 entries (will be deleted - after start block)
3987            cursor
3988                .upsert(
3989                    block3,
3990                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
3991                )
3992                .unwrap();
3993            cursor
3994                .upsert(
3995                    block3,
3996                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
3997                )
3998                .unwrap();
3999
4000            // Block 103: 1 entry (will be deleted - after start block)
4001            cursor
4002                .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4003                .unwrap();
4004
4005            // Block 104: 2 entries (will be deleted - after start block)
4006            cursor
4007                .upsert(
4008                    block5,
4009                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4010                )
4011                .unwrap();
4012            cursor
4013                .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4014                .unwrap();
4015
4016            provider_rw.commit().unwrap();
4017        }
4018
4019        // Populate StoragesTrieChangeSets with data across multiple blocks
4020        {
4021            let provider_rw = factory.provider_rw().unwrap();
4022            let mut cursor =
4023                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4024
4025            // Block 100, address1: 2 entries (will be kept - before start block)
4026            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4027            cursor
4028                .upsert(
4029                    key1_block1,
4030                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
4031                )
4032                .unwrap();
4033            cursor
4034                .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
4035                .unwrap();
4036
4037            // Block 101, address1: 3 entries with duplicates (will be deleted - from this block
4038            // onwards)
4039            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4040            cursor
4041                .upsert(
4042                    key1_block2,
4043                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
4044                )
4045                .unwrap();
4046            cursor
4047                .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
4048                .unwrap(); // duplicate key
4049            cursor
4050                .upsert(
4051                    key1_block2,
4052                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
4053                )
4054                .unwrap();
4055
4056            // Block 102, address2: 2 entries (will be deleted - after start block)
4057            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4058            cursor
4059                .upsert(
4060                    key2_block3,
4061                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
4062                )
4063                .unwrap();
4064            cursor
4065                .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
4066                .unwrap();
4067
4068            // Block 103, address1: 2 entries with duplicate (will be deleted - after start block)
4069            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4070            cursor
4071                .upsert(
4072                    key1_block4,
4073                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4074                )
4075                .unwrap();
4076            cursor
4077                .upsert(
4078                    key1_block4,
4079                    &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4080                )
4081                .unwrap(); // duplicate key
4082
4083            // Block 104, address2: 2 entries (will be deleted - after start block)
4084            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4085            cursor
4086                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4087                .unwrap();
4088            cursor
4089                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4090                .unwrap();
4091
4092            provider_rw.commit().unwrap();
4093        }
4094
4095        // Clear all changesets from block 101 onwards
4096        {
4097            let provider_rw = factory.provider_rw().unwrap();
4098            provider_rw.clear_trie_changesets_from(block2).unwrap();
4099            provider_rw.commit().unwrap();
4100        }
4101
4102        // Verify AccountsTrieChangeSets after clearing
4103        {
4104            let provider = factory.provider().unwrap();
4105            let mut cursor =
4106                provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4107
4108            // Block 100 should still exist (before range)
4109            let block1_entries = cursor
4110                .walk_dup(Some(block1), None)
4111                .unwrap()
4112                .collect::<Result<Vec<_>, _>>()
4113                .unwrap();
4114            assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4115            assert_eq!(block1_entries[0].0, block1);
4116            assert_eq!(block1_entries[1].0, block1);
4117
4118            // Blocks 101-104 should be deleted
4119            let block2_entries = cursor
4120                .walk_dup(Some(block2), None)
4121                .unwrap()
4122                .collect::<Result<Vec<_>, _>>()
4123                .unwrap();
4124            assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4125
4126            let block3_entries = cursor
4127                .walk_dup(Some(block3), None)
4128                .unwrap()
4129                .collect::<Result<Vec<_>, _>>()
4130                .unwrap();
4131            assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4132
4133            let block4_entries = cursor
4134                .walk_dup(Some(block4), None)
4135                .unwrap()
4136                .collect::<Result<Vec<_>, _>>()
4137                .unwrap();
4138            assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4139
4140            // Block 104 should also be deleted
4141            let block5_entries = cursor
4142                .walk_dup(Some(block5), None)
4143                .unwrap()
4144                .collect::<Result<Vec<_>, _>>()
4145                .unwrap();
4146            assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4147        }
4148
4149        // Verify StoragesTrieChangeSets after clearing
4150        {
4151            let provider = factory.provider().unwrap();
4152            let mut cursor =
4153                provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4154
4155            // Block 100 entries should still exist (before range)
4156            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4157            let block1_entries = cursor
4158                .walk_dup(Some(key1_block1), None)
4159                .unwrap()
4160                .collect::<Result<Vec<_>, _>>()
4161                .unwrap();
4162            assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4163
4164            // Blocks 101-104 entries should be deleted
4165            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4166            let block2_entries = cursor
4167                .walk_dup(Some(key1_block2), None)
4168                .unwrap()
4169                .collect::<Result<Vec<_>, _>>()
4170                .unwrap();
4171            assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4172
4173            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4174            let block3_entries = cursor
4175                .walk_dup(Some(key2_block3), None)
4176                .unwrap()
4177                .collect::<Result<Vec<_>, _>>()
4178                .unwrap();
4179            assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4180
4181            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4182            let block4_entries = cursor
4183                .walk_dup(Some(key1_block4), None)
4184                .unwrap()
4185                .collect::<Result<Vec<_>, _>>()
4186                .unwrap();
4187            assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4188
4189            // Block 104 entries should also be deleted
4190            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4191            let block5_entries = cursor
4192                .walk_dup(Some(key2_block5), None)
4193                .unwrap()
4194                .collect::<Result<Vec<_>, _>>()
4195                .unwrap();
4196            assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4197        }
4198    }
4199
4200    #[test]
4201    fn test_write_trie_updates_sorted() {
4202        use reth_trie::{
4203            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4204            BranchNodeCompact, StorageTrieEntry,
4205        };
4206
4207        let factory = create_test_provider_factory();
4208        let provider_rw = factory.provider_rw().unwrap();
4209
4210        // Pre-populate account trie with data that will be deleted
4211        {
4212            let tx = provider_rw.tx_ref();
4213            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4214
4215            // Add account node that will be deleted
4216            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4217            cursor
4218                .upsert(
4219                    to_delete,
4220                    &BranchNodeCompact::new(
4221                        0b1010_1010_1010_1010, // state_mask
4222                        0b0000_0000_0000_0000, // tree_mask
4223                        0b0000_0000_0000_0000, // hash_mask
4224                        vec![],
4225                        None,
4226                    ),
4227                )
4228                .unwrap();
4229
4230            // Add account node that will be updated
4231            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4232            cursor
4233                .upsert(
4234                    to_update,
4235                    &BranchNodeCompact::new(
4236                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4237                        0b0000_0000_0000_0000, // tree_mask
4238                        0b0000_0000_0000_0000, // hash_mask
4239                        vec![],
4240                        None,
4241                    ),
4242                )
4243                .unwrap();
4244        }
4245
4246        // Pre-populate storage tries with data
4247        let storage_address1 = B256::from([1u8; 32]);
4248        let storage_address2 = B256::from([2u8; 32]);
4249        {
4250            let tx = provider_rw.tx_ref();
4251            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4252
4253            // Add storage nodes for address1 (one will be deleted)
4254            storage_cursor
4255                .upsert(
4256                    storage_address1,
4257                    &StorageTrieEntry {
4258                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4259                        node: BranchNodeCompact::new(
4260                            0b0011_0011_0011_0011, // will be deleted
4261                            0b0000_0000_0000_0000,
4262                            0b0000_0000_0000_0000,
4263                            vec![],
4264                            None,
4265                        ),
4266                    },
4267                )
4268                .unwrap();
4269
4270            // Add storage nodes for address2 (will be wiped)
4271            storage_cursor
4272                .upsert(
4273                    storage_address2,
4274                    &StorageTrieEntry {
4275                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4276                        node: BranchNodeCompact::new(
4277                            0b1100_1100_1100_1100, // will be wiped
4278                            0b0000_0000_0000_0000,
4279                            0b0000_0000_0000_0000,
4280                            vec![],
4281                            None,
4282                        ),
4283                    },
4284                )
4285                .unwrap();
4286            storage_cursor
4287                .upsert(
4288                    storage_address2,
4289                    &StorageTrieEntry {
4290                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4291                        node: BranchNodeCompact::new(
4292                            0b0011_1100_0011_1100, // will be wiped
4293                            0b0000_0000_0000_0000,
4294                            0b0000_0000_0000_0000,
4295                            vec![],
4296                            None,
4297                        ),
4298                    },
4299                )
4300                .unwrap();
4301        }
4302
4303        // Create sorted account trie updates
4304        let account_nodes = vec![
4305            (
4306                Nibbles::from_nibbles([0x1, 0x2]),
4307                Some(BranchNodeCompact::new(
4308                    0b1111_1111_1111_1111, // state_mask (updated)
4309                    0b0000_0000_0000_0000, // tree_mask
4310                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4311                    vec![],
4312                    None,
4313                )),
4314            ),
4315            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4316            (
4317                Nibbles::from_nibbles([0x5, 0x6]),
4318                Some(BranchNodeCompact::new(
4319                    0b1111_1111_1111_1111, // state_mask
4320                    0b0000_0000_0000_0000, // tree_mask
4321                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4322                    vec![],
4323                    None,
4324                )),
4325            ),
4326        ];
4327
4328        // Create sorted storage trie updates
4329        let storage_trie1 = StorageTrieUpdatesSorted {
4330            is_deleted: false,
4331            storage_nodes: vec![
4332                (
4333                    Nibbles::from_nibbles([0x1, 0x0]),
4334                    Some(BranchNodeCompact::new(
4335                        0b1111_0000_0000_0000, // state_mask
4336                        0b0000_0000_0000_0000, // tree_mask
4337                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4338                        vec![],
4339                        None,
4340                    )),
4341                ),
4342                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4343            ],
4344        };
4345
4346        let storage_trie2 = StorageTrieUpdatesSorted {
4347            is_deleted: true, // Wipe all storage for this address
4348            storage_nodes: vec![],
4349        };
4350
4351        let mut storage_tries = B256Map::default();
4352        storage_tries.insert(storage_address1, storage_trie1);
4353        storage_tries.insert(storage_address2, storage_trie2);
4354
4355        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4356
4357        // Write the sorted trie updates
4358        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4359
4360        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4361        // storage deletion = 5
4362        assert_eq!(num_entries, 5);
4363
4364        // Verify account trie updates were written correctly
4365        let tx = provider_rw.tx_ref();
4366        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4367
4368        // Check first account node was updated
4369        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4370        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4371        assert!(entry1.is_some(), "Updated account node should exist");
4372        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4373        assert_eq!(
4374            entry1.unwrap().1.state_mask,
4375            expected_mask,
4376            "Account node should have updated state_mask"
4377        );
4378
4379        // Check deleted account node no longer exists
4380        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4381        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4382        assert!(entry2.is_none(), "Deleted account node should not exist");
4383
4384        // Check new account node exists
4385        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4386        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4387        assert!(entry3.is_some(), "New account node should exist");
4388
4389        // Verify storage trie updates were written correctly
4390        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4391
4392        // Check storage for address1
4393        let storage_entries1: Vec<_> = storage_cursor
4394            .walk_dup(Some(storage_address1), None)
4395            .unwrap()
4396            .collect::<Result<Vec<_>, _>>()
4397            .unwrap();
4398        assert_eq!(
4399            storage_entries1.len(),
4400            1,
4401            "Storage address1 should have 1 entry after deletion"
4402        );
4403        assert_eq!(
4404            storage_entries1[0].1.nibbles.0,
4405            Nibbles::from_nibbles([0x1, 0x0]),
4406            "Remaining entry should be [0x1, 0x0]"
4407        );
4408
4409        // Check storage for address2 was wiped
4410        let storage_entries2: Vec<_> = storage_cursor
4411            .walk_dup(Some(storage_address2), None)
4412            .unwrap()
4413            .collect::<Result<Vec<_>, _>>()
4414            .unwrap();
4415        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4416
4417        provider_rw.commit().unwrap();
4418    }
4419
4420    #[test]
4421    fn test_get_block_trie_updates() {
4422        use reth_db_api::models::BlockNumberHashedAddress;
4423        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4424
4425        let factory = create_test_provider_factory();
4426        let provider_rw = factory.provider_rw().unwrap();
4427
4428        let target_block = 2u64;
4429        let next_block = 3u64;
4430
4431        // Create test nibbles and nodes for accounts
4432        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4433        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4434        let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4435
4436        let node1 = BranchNodeCompact::new(
4437            0b1111_1111_0000_0000,
4438            0b0000_0000_0000_0000,
4439            0b0000_0000_0000_0000,
4440            vec![],
4441            None,
4442        );
4443
4444        let node2 = BranchNodeCompact::new(
4445            0b0000_0000_1111_1111,
4446            0b0000_0000_0000_0000,
4447            0b0000_0000_0000_0000,
4448            vec![],
4449            None,
4450        );
4451
4452        let node3 = BranchNodeCompact::new(
4453            0b1010_1010_1010_1010,
4454            0b0000_0000_0000_0000,
4455            0b0000_0000_0000_0000,
4456            vec![],
4457            None,
4458        );
4459
4460        // Pre-populate AccountsTrie with nodes that will be the final state
4461        {
4462            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4463            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4464            cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4465            // account_nibbles3 will be deleted (not in final state)
4466        }
4467
4468        // Insert trie changesets for target_block
4469        {
4470            let mut cursor =
4471                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4472            // nibbles1 was updated in target_block (old value stored)
4473            cursor
4474                .append_dup(
4475                    target_block,
4476                    TrieChangeSetsEntry {
4477                        nibbles: StoredNibblesSubKey(account_nibbles1),
4478                        node: Some(BranchNodeCompact::new(
4479                            0b1111_0000_0000_0000, // old value
4480                            0b0000_0000_0000_0000,
4481                            0b0000_0000_0000_0000,
4482                            vec![],
4483                            None,
4484                        )),
4485                    },
4486                )
4487                .unwrap();
4488            // nibbles2 was created in target_block (no old value)
4489            cursor
4490                .append_dup(
4491                    target_block,
4492                    TrieChangeSetsEntry {
4493                        nibbles: StoredNibblesSubKey(account_nibbles2),
4494                        node: None,
4495                    },
4496                )
4497                .unwrap();
4498        }
4499
4500        // Insert trie changesets for next_block (to test overlay)
4501        {
4502            let mut cursor =
4503                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4504            // nibbles3 was deleted in next_block (old value stored)
4505            cursor
4506                .append_dup(
4507                    next_block,
4508                    TrieChangeSetsEntry {
4509                        nibbles: StoredNibblesSubKey(account_nibbles3),
4510                        node: Some(node3),
4511                    },
4512                )
4513                .unwrap();
4514        }
4515
4516        // Storage trie updates
4517        let storage_address1 = B256::from([1u8; 32]);
4518        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4519        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4520
4521        let storage_node1 = BranchNodeCompact::new(
4522            0b1111_1111_1111_0000,
4523            0b0000_0000_0000_0000,
4524            0b0000_0000_0000_0000,
4525            vec![],
4526            None,
4527        );
4528
4529        let storage_node2 = BranchNodeCompact::new(
4530            0b0101_0101_0101_0101,
4531            0b0000_0000_0000_0000,
4532            0b0000_0000_0000_0000,
4533            vec![],
4534            None,
4535        );
4536
4537        // Pre-populate StoragesTrie with final state
4538        {
4539            let mut cursor =
4540                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4541            cursor
4542                .upsert(
4543                    storage_address1,
4544                    &StorageTrieEntry {
4545                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4546                        node: storage_node1.clone(),
4547                    },
4548                )
4549                .unwrap();
4550            // storage_nibbles2 was deleted in next_block, so it's not in final state
4551        }
4552
4553        // Insert storage trie changesets for target_block
4554        {
4555            let mut cursor =
4556                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4557            let key = BlockNumberHashedAddress((target_block, storage_address1));
4558
4559            // storage_nibbles1 was updated
4560            cursor
4561                .append_dup(
4562                    key,
4563                    TrieChangeSetsEntry {
4564                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4565                        node: Some(BranchNodeCompact::new(
4566                            0b0000_0000_1111_1111, // old value
4567                            0b0000_0000_0000_0000,
4568                            0b0000_0000_0000_0000,
4569                            vec![],
4570                            None,
4571                        )),
4572                    },
4573                )
4574                .unwrap();
4575
4576            // storage_nibbles2 was created
4577            cursor
4578                .append_dup(
4579                    key,
4580                    TrieChangeSetsEntry {
4581                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4582                        node: None,
4583                    },
4584                )
4585                .unwrap();
4586        }
4587
4588        // Insert storage trie changesets for next_block (to test overlay)
4589        {
4590            let mut cursor =
4591                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4592            let key = BlockNumberHashedAddress((next_block, storage_address1));
4593
4594            // storage_nibbles2 was deleted in next_block
4595            cursor
4596                .append_dup(
4597                    key,
4598                    TrieChangeSetsEntry {
4599                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4600                        node: Some(BranchNodeCompact::new(
4601                            0b0101_0101_0101_0101, // value that was deleted
4602                            0b0000_0000_0000_0000,
4603                            0b0000_0000_0000_0000,
4604                            vec![],
4605                            None,
4606                        )),
4607                    },
4608                )
4609                .unwrap();
4610        }
4611
4612        provider_rw.commit().unwrap();
4613
4614        // Now test get_block_trie_updates
4615        let provider = factory.provider().unwrap();
4616        let result = provider.get_block_trie_updates(target_block).unwrap();
4617
4618        // Verify account trie updates
4619        assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4620
4621        // Check nibbles1 - should have the current value (node1)
4622        let nibbles1_update = result
4623            .account_nodes_ref()
4624            .iter()
4625            .find(|(n, _)| n == &account_nibbles1)
4626            .expect("Should find nibbles1");
4627        assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4628        assert_eq!(
4629            nibbles1_update.1.as_ref().unwrap().state_mask,
4630            node1.state_mask,
4631            "nibbles1 should have current value"
4632        );
4633
4634        // Check nibbles2 - should have the current value (node2)
4635        let nibbles2_update = result
4636            .account_nodes_ref()
4637            .iter()
4638            .find(|(n, _)| n == &account_nibbles2)
4639            .expect("Should find nibbles2");
4640        assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4641        assert_eq!(
4642            nibbles2_update.1.as_ref().unwrap().state_mask,
4643            node2.state_mask,
4644            "nibbles2 should have current value"
4645        );
4646
4647        // nibbles3 should NOT be in the result (it was changed in next_block, not target_block)
4648        assert!(
4649            !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4650            "nibbles3 should not be in target_block updates"
4651        );
4652
4653        // Verify storage trie updates
4654        assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4655        let storage_updates = result
4656            .storage_tries_ref()
4657            .get(&storage_address1)
4658            .expect("Should have storage updates for address1");
4659
4660        assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4661
4662        // Check storage_nibbles1 - should have current value
4663        let storage1_update = storage_updates
4664            .storage_nodes
4665            .iter()
4666            .find(|(n, _)| n == &storage_nibbles1)
4667            .expect("Should find storage_nibbles1");
4668        assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4669        assert_eq!(
4670            storage1_update.1.as_ref().unwrap().state_mask,
4671            storage_node1.state_mask,
4672            "storage_nibbles1 should have current value"
4673        );
4674
4675        // Check storage_nibbles2 - was created in target_block, will be deleted in next_block
4676        // So it should have a value (the value that will be deleted)
4677        let storage2_update = storage_updates
4678            .storage_nodes
4679            .iter()
4680            .find(|(n, _)| n == &storage_nibbles2)
4681            .expect("Should find storage_nibbles2");
4682        assert!(
4683            storage2_update.1.is_some(),
4684            "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4685        );
4686        assert_eq!(
4687            storage2_update.1.as_ref().unwrap().state_mask,
4688            storage_node2.state_mask,
4689            "storage_nibbles2 should have the value that was created and will be deleted"
4690        );
4691    }
4692
4693    #[test]
4694    fn test_prunable_receipts_logic() {
4695        let insert_blocks =
4696            |provider_rw: &DatabaseProviderRW<_, _>, tip_block: u64, tx_count: u8| {
4697                let mut rng = generators::rng();
4698                for block_num in 0..=tip_block {
4699                    let block = random_block(
4700                        &mut rng,
4701                        block_num,
4702                        BlockParams { tx_count: Some(tx_count), ..Default::default() },
4703                    );
4704                    provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
4705                }
4706            };
4707
4708        let write_receipts = |provider_rw: DatabaseProviderRW<_, _>, block: u64| {
4709            let outcome = ExecutionOutcome {
4710                first_block: block,
4711                receipts: vec![vec![Receipt {
4712                    tx_type: Default::default(),
4713                    success: true,
4714                    cumulative_gas_used: block, // identifier to assert against
4715                    logs: vec![],
4716                }]],
4717                ..Default::default()
4718            };
4719            provider_rw.write_state(&outcome, crate::OriginalValuesKnown::No).unwrap();
4720            provider_rw.commit().unwrap();
4721        };
4722
4723        // Legacy mode (receipts in DB) - should be prunable
4724        {
4725            let factory = create_test_provider_factory();
4726            let storage_settings = StorageSettings::legacy();
4727            factory.set_storage_settings_cache(storage_settings);
4728            let factory = factory.with_prune_modes(PruneModes {
4729                receipts: Some(PruneMode::Before(100)),
4730                ..Default::default()
4731            });
4732
4733            let tip_block = 200u64;
4734            let first_block = 1u64;
4735
4736            // create chain
4737            let provider_rw = factory.provider_rw().unwrap();
4738            insert_blocks(&provider_rw, tip_block, 1);
4739            provider_rw.commit().unwrap();
4740
4741            write_receipts(
4742                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4743                first_block,
4744            );
4745            write_receipts(
4746                factory.provider_rw().unwrap().with_minimum_pruning_distance(100),
4747                tip_block - 1,
4748            );
4749
4750            let provider = factory.provider().unwrap();
4751
4752            for (block, num_receipts) in [(0, 0), (tip_block - 1, 1)] {
4753                assert!(provider
4754                    .receipts_by_block(block.into())
4755                    .unwrap()
4756                    .is_some_and(|r| r.len() == num_receipts));
4757            }
4758        }
4759
4760        // Static files mode
4761        {
4762            let factory = create_test_provider_factory();
4763            let storage_settings = StorageSettings::legacy().with_receipts_in_static_files(true);
4764            factory.set_storage_settings_cache(storage_settings);
4765            let factory = factory.with_prune_modes(PruneModes {
4766                receipts: Some(PruneMode::Before(2)),
4767                ..Default::default()
4768            });
4769
4770            let tip_block = 200u64;
4771
4772            // create chain
4773            let provider_rw = factory.provider_rw().unwrap();
4774            insert_blocks(&provider_rw, tip_block, 1);
4775            provider_rw.commit().unwrap();
4776
4777            // Attempt to write receipts for block 0 and 1 (should be skipped)
4778            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 0);
4779            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 1);
4780
4781            assert!(factory
4782                .static_file_provider()
4783                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4784                .is_none(),);
4785            assert!(factory
4786                .static_file_provider()
4787                .get_highest_static_file_block(StaticFileSegment::Receipts)
4788                .is_some_and(|b| b == 1),);
4789
4790            // Since we have prune mode Before(2), the next receipt (block 2) should be written to
4791            // static files.
4792            write_receipts(factory.provider_rw().unwrap().with_minimum_pruning_distance(100), 2);
4793            assert!(factory
4794                .static_file_provider()
4795                .get_highest_static_file_tx(StaticFileSegment::Receipts)
4796                .is_some_and(|num| num == 2),);
4797
4798            // After having a receipt already in static files, attempt to skip the next receipt by
4799            // changing the prune mode. It should NOT skip it and should still write the receipt,
4800            // since static files do not support gaps.
4801            let factory = factory.with_prune_modes(PruneModes {
4802                receipts: Some(PruneMode::Before(100)),
4803                ..Default::default()
4804            });
4805            let provider_rw = factory.provider_rw().unwrap().with_minimum_pruning_distance(1);
4806            assert!(PruneMode::Distance(1).should_prune(3, tip_block));
4807            write_receipts(provider_rw, 3);
4808
4809            // Ensure we can only fetch the 2 last receipts.
4810            //
4811            // Test setup only has 1 tx per block and each receipt has its cumulative_gas_used set
4812            // to the block number it belongs to easily identify and assert.
4813            let provider = factory.provider().unwrap();
4814            assert!(EitherWriter::receipts_destination(&provider).is_static_file());
4815            for (num, num_receipts) in [(0, 0), (1, 0), (2, 1), (3, 1)] {
4816                assert!(provider
4817                    .receipts_by_block(num.into())
4818                    .unwrap()
4819                    .is_some_and(|r| r.len() == num_receipts));
4820
4821                let receipt = provider.receipt(num).unwrap();
4822                if num_receipts > 0 {
4823                    assert!(receipt.is_some_and(|r| r.cumulative_gas_used == num));
4824                } else {
4825                    assert!(receipt.is_none());
4826                }
4827            }
4828        }
4829    }
4830}