reth_provider/providers/database/
provider.rs

1use crate::{
2    changesets_utils::{
3        storage_trie_wiped_changeset_iter, StorageRevertsIter, StorageTrieCurrentValuesIter,
4    },
5    providers::{
6        database::{chain::ChainStorage, metrics},
7        static_file::StaticFileWriter,
8        NodeTypesForProvider, StaticFileProvider,
9    },
10    to_range,
11    traits::{
12        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
13    },
14    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
15    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
16    DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
17    HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
18    OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
19    StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
20    StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
21    TransactionsProviderExt, TrieReader, TrieWriter,
22};
23use alloy_consensus::{
24    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
25    BlockHeader,
26};
27use alloy_eips::BlockHashOrNumber;
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, B256Map, HashMap, HashSet},
31    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
32};
33use itertools::Itertools;
34use rayon::slice::ParallelSliceMut;
35use reth_chain_state::ExecutedBlock;
36use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
37use reth_db_api::{
38    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
39    database::Database,
40    models::{
41        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
42        BlockNumberHashedAddress, ShardedKey, StoredBlockBodyIndices,
43    },
44    table::Table,
45    tables,
46    transaction::{DbTx, DbTxMut},
47    BlockNumberList, PlainAccountState, PlainStorageState,
48};
49use reth_execution_types::{Chain, ExecutionOutcome};
50use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
51use reth_primitives_traits::{
52    Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
53};
54use reth_prune_types::{
55    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE, PRUNE_SEGMENTS,
56};
57use reth_stages_types::{StageCheckpoint, StageId};
58use reth_static_file_types::StaticFileSegment;
59use reth_storage_api::{
60    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, StateProvider,
61    StorageChangeSetReader, TryIntoHistoricalStateProvider,
62};
63use reth_storage_errors::provider::ProviderResult;
64use reth_trie::{
65    trie_cursor::{
66        InMemoryTrieCursor, InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory,
67        TrieCursorIter,
68    },
69    updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
70    BranchNodeCompact, HashedPostStateSorted, Nibbles, StoredNibbles, StoredNibblesSubKey,
71    TrieChangeSetsEntry,
72};
73use reth_trie_db::{
74    DatabaseAccountTrieCursor, DatabaseStorageTrieCursor, DatabaseTrieCursorFactory,
75};
76use revm_database::states::{
77    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
78};
79use std::{
80    cmp::Ordering,
81    collections::{BTreeMap, BTreeSet},
82    fmt::Debug,
83    ops::{Deref, DerefMut, Not, Range, RangeBounds, RangeFrom, RangeInclusive},
84    sync::Arc,
85};
86use tracing::{debug, trace};
87
88/// A [`DatabaseProvider`] that holds a read-only database transaction.
89pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
90
91/// A [`DatabaseProvider`] that holds a read-write database transaction.
92///
93/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
94/// Once that issue is solved, we can probably revert back to being an alias type.
95#[derive(Debug)]
96pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
97    pub DatabaseProvider<<DB as Database>::TXMut, N>,
98);
99
100impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
101    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
102
103    fn deref(&self) -> &Self::Target {
104        &self.0
105    }
106}
107
108impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
109    fn deref_mut(&mut self) -> &mut Self::Target {
110        &mut self.0
111    }
112}
113
114impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
115    for DatabaseProviderRW<DB, N>
116{
117    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
118        &self.0
119    }
120}
121
122impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
123    /// Commit database transaction and static file if it exists.
124    pub fn commit(self) -> ProviderResult<bool> {
125        self.0.commit()
126    }
127
128    /// Consume `DbTx` or `DbTxMut`.
129    pub fn into_tx(self) -> <DB as Database>::TXMut {
130        self.0.into_tx()
131    }
132}
133
134impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
135    for DatabaseProvider<<DB as Database>::TXMut, N>
136{
137    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
138        provider.0
139    }
140}
141
142/// A provider struct that fetches data from the database.
143/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
144#[derive(Debug)]
145pub struct DatabaseProvider<TX, N: NodeTypes> {
146    /// Database transaction.
147    tx: TX,
148    /// Chain spec
149    chain_spec: Arc<N::ChainSpec>,
150    /// Static File provider
151    static_file_provider: StaticFileProvider<N::Primitives>,
152    /// Pruning configuration
153    prune_modes: PruneModes,
154    /// Node storage handler.
155    storage: Arc<N::Storage>,
156}
157
158impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
159    /// Returns reference to prune modes.
160    pub const fn prune_modes_ref(&self) -> &PruneModes {
161        &self.prune_modes
162    }
163}
164
165impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
166    /// State provider for latest state
167    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
168        trace!(target: "providers::db", "Returning latest state provider");
169        Box::new(LatestStateProviderRef::new(self))
170    }
171
172    /// Storage provider for state at that given block hash
173    pub fn history_by_block_hash<'a>(
174        &'a self,
175        block_hash: BlockHash,
176    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
177        let mut block_number =
178            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
179        if block_number == self.best_block_number().unwrap_or_default() &&
180            block_number == self.last_block_number().unwrap_or_default()
181        {
182            return Ok(Box::new(LatestStateProviderRef::new(self)))
183        }
184
185        // +1 as the changeset that we want is the one that was applied after this block.
186        block_number += 1;
187
188        let account_history_prune_checkpoint =
189            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
190        let storage_history_prune_checkpoint =
191            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
192
193        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
194
195        // If we pruned account or storage history, we can't return state on every historical block.
196        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
197        if let Some(prune_checkpoint_block_number) =
198            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
199        {
200            state_provider = state_provider.with_lowest_available_account_history_block_number(
201                prune_checkpoint_block_number + 1,
202            );
203        }
204        if let Some(prune_checkpoint_block_number) =
205            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
206        {
207            state_provider = state_provider.with_lowest_available_storage_history_block_number(
208                prune_checkpoint_block_number + 1,
209            );
210        }
211
212        Ok(Box::new(state_provider))
213    }
214
215    #[cfg(feature = "test-utils")]
216    /// Sets the prune modes for provider.
217    pub const fn set_prune_modes(&mut self, prune_modes: PruneModes) {
218        self.prune_modes = prune_modes;
219    }
220}
221
222impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
223    type Primitives = N::Primitives;
224}
225
226impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
227    /// Returns a static file provider
228    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
229        self.static_file_provider.clone()
230    }
231}
232
233impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
234    for DatabaseProvider<TX, N>
235{
236    type ChainSpec = N::ChainSpec;
237
238    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
239        self.chain_spec.clone()
240    }
241}
242
243impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
244    /// Creates a provider with an inner read-write transaction.
245    pub const fn new_rw(
246        tx: TX,
247        chain_spec: Arc<N::ChainSpec>,
248        static_file_provider: StaticFileProvider<N::Primitives>,
249        prune_modes: PruneModes,
250        storage: Arc<N::Storage>,
251    ) -> Self {
252        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
253    }
254}
255
256impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
257    fn as_ref(&self) -> &Self {
258        self
259    }
260}
261
262impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
263    /// Writes executed blocks and state to storage.
264    pub fn save_blocks(&self, blocks: Vec<ExecutedBlock<N::Primitives>>) -> ProviderResult<()> {
265        if blocks.is_empty() {
266            debug!(target: "providers::db", "Attempted to write empty block range");
267            return Ok(())
268        }
269
270        // NOTE: checked non-empty above
271        let first_block = blocks.first().unwrap().recovered_block();
272
273        let last_block = blocks.last().unwrap().recovered_block();
274        let first_number = first_block.number();
275        let last_block_number = last_block.number();
276
277        debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
278
279        // TODO: Do performant / batched writes for each type of object
280        // instead of a loop over all blocks,
281        // meaning:
282        //  * blocks
283        //  * state
284        //  * hashed state
285        //  * trie updates (cannot naively extend, need helper)
286        //  * indices (already done basically)
287        // Insert the blocks
288        for ExecutedBlock { recovered_block, execution_output, hashed_state, trie_updates } in
289            blocks
290        {
291            let block_number = recovered_block.number();
292            self.insert_block(Arc::unwrap_or_clone(recovered_block))?;
293
294            // Write state and changesets to the database.
295            // Must be written after blocks because of the receipt lookup.
296            self.write_state(&execution_output, OriginalValuesKnown::No)?;
297
298            // insert hashes and intermediate merkle nodes
299            self.write_hashed_state(&Arc::unwrap_or_clone(hashed_state).into_sorted())?;
300
301            // sort trie updates and insert changesets
302            let trie_updates_sorted = (*trie_updates).clone().into_sorted();
303            self.write_trie_changesets(block_number, &trie_updates_sorted, None)?;
304            self.write_trie_updates_sorted(&trie_updates_sorted)?;
305        }
306
307        // update history indices
308        self.update_history_indices(first_number..=last_block_number)?;
309
310        // Update pipeline progress
311        self.update_pipeline_stages(last_block_number, false)?;
312
313        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
314
315        Ok(())
316    }
317
318    /// Unwinds trie state starting at and including the given block.
319    ///
320    /// This includes calculating the resulted state root and comparing it with the parent block
321    /// state root.
322    pub fn unwind_trie_state_from(&self, from: BlockNumber) -> ProviderResult<()> {
323        let changed_accounts = self
324            .tx
325            .cursor_read::<tables::AccountChangeSets>()?
326            .walk_range(from..)?
327            .collect::<Result<Vec<_>, _>>()?;
328
329        // Unwind account hashes.
330        self.unwind_account_hashing(changed_accounts.iter())?;
331
332        // Unwind account history indices.
333        self.unwind_account_history_indices(changed_accounts.iter())?;
334
335        let storage_start = BlockNumberAddress((from, Address::ZERO));
336        let changed_storages = self
337            .tx
338            .cursor_read::<tables::StorageChangeSets>()?
339            .walk_range(storage_start..)?
340            .collect::<Result<Vec<_>, _>>()?;
341
342        // Unwind storage hashes.
343        self.unwind_storage_hashing(changed_storages.iter().copied())?;
344
345        // Unwind storage history indices.
346        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
347
348        // Unwind accounts/storages trie tables using the revert.
349        let trie_revert = self.trie_reverts(from)?;
350        self.write_trie_updates_sorted(&trie_revert)?;
351
352        // Clear trie changesets which have been unwound.
353        self.clear_trie_changesets_from(from)?;
354
355        Ok(())
356    }
357
358    /// Removes receipts from all transactions starting with provided number (inclusive).
359    fn remove_receipts_from(
360        &self,
361        from_tx: TxNumber,
362        last_block: BlockNumber,
363    ) -> ProviderResult<()> {
364        // iterate over block body and remove receipts
365        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
366
367        if !self.prune_modes.has_receipts_pruning() {
368            let static_file_receipt_num =
369                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
370
371            let to_delete = static_file_receipt_num
372                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
373                .unwrap_or_default();
374
375            self.static_file_provider
376                .latest_writer(StaticFileSegment::Receipts)?
377                .prune_receipts(to_delete, last_block)?;
378        }
379
380        Ok(())
381    }
382}
383
384impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
385    fn try_into_history_at_block(
386        self,
387        mut block_number: BlockNumber,
388    ) -> ProviderResult<StateProviderBox> {
389        // if the block number is the same as the currently best block number on disk we can use the
390        // latest state provider here
391        if block_number == self.best_block_number().unwrap_or_default() {
392            return Ok(Box::new(LatestStateProvider::new(self)))
393        }
394
395        // +1 as the changeset that we want is the one that was applied after this block.
396        block_number += 1;
397
398        let account_history_prune_checkpoint =
399            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
400        let storage_history_prune_checkpoint =
401            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
402
403        let mut state_provider = HistoricalStateProvider::new(self, block_number);
404
405        // If we pruned account or storage history, we can't return state on every historical block.
406        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
407        if let Some(prune_checkpoint_block_number) =
408            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
409        {
410            state_provider = state_provider.with_lowest_available_account_history_block_number(
411                prune_checkpoint_block_number + 1,
412            );
413        }
414        if let Some(prune_checkpoint_block_number) =
415            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
416        {
417            state_provider = state_provider.with_lowest_available_storage_history_block_number(
418                prune_checkpoint_block_number + 1,
419            );
420        }
421
422        Ok(Box::new(state_provider))
423    }
424}
425
426/// For a given key, unwind all history shards that contain block numbers at or above the given
427/// block number.
428///
429/// S - Sharded key subtype.
430/// T - Table to walk over.
431/// C - Cursor implementation.
432///
433/// This function walks the entries from the given start key and deletes all shards that belong to
434/// the key and contain block numbers at or above the given block number. Shards entirely below
435/// the block number are preserved.
436///
437/// The boundary shard (the shard that spans across the block number) is removed from the database.
438/// Any indices that are below the block number are filtered out and returned for reinsertion.
439/// The boundary shard is returned for reinsertion (if it's not empty).
440fn unwind_history_shards<S, T, C>(
441    cursor: &mut C,
442    start_key: T::Key,
443    block_number: BlockNumber,
444    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
445) -> ProviderResult<Vec<u64>>
446where
447    T: Table<Value = BlockNumberList>,
448    T::Key: AsRef<ShardedKey<S>>,
449    C: DbCursorRO<T> + DbCursorRW<T>,
450{
451    // Start from the given key and iterate through shards
452    let mut item = cursor.seek_exact(start_key)?;
453    while let Some((sharded_key, list)) = item {
454        // If the shard does not belong to the key, break.
455        if !shard_belongs_to_key(&sharded_key) {
456            break
457        }
458
459        // Always delete the current shard from the database first
460        // We'll decide later what (if anything) to reinsert
461        cursor.delete_current()?;
462
463        // Get the first (lowest) block number in this shard
464        // All block numbers in a shard are sorted in ascending order
465        let first = list.iter().next().expect("List can't be empty");
466
467        // Case 1: Entire shard is at or above the unwinding point
468        // Keep it deleted (don't return anything for reinsertion)
469        if first >= block_number {
470            item = cursor.prev()?;
471            continue
472        }
473        // Case 2: This is a boundary shard (spans across the unwinding point)
474        // The shard contains some blocks below and some at/above the unwinding point
475        else if block_number <= sharded_key.as_ref().highest_block_number {
476            // Return only the block numbers that are below the unwinding point
477            // These will be reinserted to preserve the historical data
478            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
479        }
480        // Case 3: Entire shard is below the unwinding point
481        // Return all block numbers for reinsertion (preserve entire shard)
482        return Ok(list.iter().collect::<Vec<_>>())
483    }
484
485    // No shards found or all processed
486    Ok(Vec::new())
487}
488
489impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
490    /// Creates a provider with an inner read-only transaction.
491    pub const fn new(
492        tx: TX,
493        chain_spec: Arc<N::ChainSpec>,
494        static_file_provider: StaticFileProvider<N::Primitives>,
495        prune_modes: PruneModes,
496        storage: Arc<N::Storage>,
497    ) -> Self {
498        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
499    }
500
501    /// Consume `DbTx` or `DbTxMut`.
502    pub fn into_tx(self) -> TX {
503        self.tx
504    }
505
506    /// Pass `DbTx` or `DbTxMut` mutable reference.
507    pub const fn tx_mut(&mut self) -> &mut TX {
508        &mut self.tx
509    }
510
511    /// Pass `DbTx` or `DbTxMut` immutable reference.
512    pub const fn tx_ref(&self) -> &TX {
513        &self.tx
514    }
515
516    /// Returns a reference to the chain specification.
517    pub fn chain_spec(&self) -> &N::ChainSpec {
518        &self.chain_spec
519    }
520}
521
522impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
523    fn recovered_block<H, HF, B, BF>(
524        &self,
525        id: BlockHashOrNumber,
526        _transaction_kind: TransactionVariant,
527        header_by_number: HF,
528        construct_block: BF,
529    ) -> ProviderResult<Option<B>>
530    where
531        H: AsRef<HeaderTy<N>>,
532        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
533        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
534    {
535        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
536        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
537
538        // Get the block body
539        //
540        // If the body indices are not found, this means that the transactions either do not exist
541        // in the database yet, or they do exit but are not indexed. If they exist but are not
542        // indexed, we don't have enough information to return the block anyways, so we return
543        // `None`.
544        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
545
546        let tx_range = body.tx_num_range();
547
548        let (transactions, senders) = if tx_range.is_empty() {
549            (vec![], vec![])
550        } else {
551            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
552        };
553
554        let body = self
555            .storage
556            .reader()
557            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
558            .pop()
559            .ok_or(ProviderError::InvalidStorageOutput)?;
560
561        construct_block(header, body, senders)
562    }
563
564    /// Returns a range of blocks from the database.
565    ///
566    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
567    /// construct blocks from the following inputs:
568    ///     – Header
569    ///     - Range of transaction numbers
570    ///     – Ommers
571    ///     – Withdrawals
572    ///     – Senders
573    fn block_range<F, H, HF, R>(
574        &self,
575        range: RangeInclusive<BlockNumber>,
576        headers_range: HF,
577        mut assemble_block: F,
578    ) -> ProviderResult<Vec<R>>
579    where
580        H: AsRef<HeaderTy<N>>,
581        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
582        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
583    {
584        if range.is_empty() {
585            return Ok(Vec::new())
586        }
587
588        let len = range.end().saturating_sub(*range.start()) as usize;
589        let mut blocks = Vec::with_capacity(len);
590
591        let headers = headers_range(range.clone())?;
592
593        // If the body indices are not found, this means that the transactions either do
594        // not exist in the database yet, or they do exit but are
595        // not indexed. If they exist but are not indexed, we don't
596        // have enough information to return the block anyways, so
597        // we skip the block.
598        let present_headers = self
599            .block_body_indices_range(range)?
600            .into_iter()
601            .map(|b| b.tx_num_range())
602            .zip(headers)
603            .collect::<Vec<_>>();
604
605        let mut inputs = Vec::new();
606        for (tx_range, header) in &present_headers {
607            let transactions = if tx_range.is_empty() {
608                Vec::new()
609            } else {
610                self.transactions_by_tx_range(tx_range.clone())?
611            };
612
613            inputs.push((header.as_ref(), transactions));
614        }
615
616        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
617
618        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
619            blocks.push(assemble_block(header, body, tx_range)?);
620        }
621
622        Ok(blocks)
623    }
624
625    /// Returns a range of blocks from the database, along with the senders of each
626    /// transaction in the blocks.
627    ///
628    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
629    /// construct blocks from the following inputs:
630    ///     – Header
631    ///     - Transactions
632    ///     – Ommers
633    ///     – Withdrawals
634    ///     – Senders
635    fn block_with_senders_range<H, HF, B, BF>(
636        &self,
637        range: RangeInclusive<BlockNumber>,
638        headers_range: HF,
639        assemble_block: BF,
640    ) -> ProviderResult<Vec<B>>
641    where
642        H: AsRef<HeaderTy<N>>,
643        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
644        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
645    {
646        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
647
648        self.block_range(range, headers_range, |header, body, tx_range| {
649            let senders = if tx_range.is_empty() {
650                Vec::new()
651            } else {
652                // fetch senders from the senders table
653                let known_senders =
654                    senders_cursor
655                        .walk_range(tx_range.clone())?
656                        .collect::<Result<HashMap<_, _>, _>>()?;
657
658                let mut senders = Vec::with_capacity(body.transactions().len());
659                for (tx_num, tx) in tx_range.zip(body.transactions()) {
660                    match known_senders.get(&tx_num) {
661                        None => {
662                            // recover the sender from the transaction if not found
663                            let sender = tx.recover_signer_unchecked()?;
664                            senders.push(sender);
665                        }
666                        Some(sender) => senders.push(*sender),
667                    }
668                }
669
670                senders
671            };
672
673            assemble_block(header, body, senders)
674        })
675    }
676
677    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
678    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
679    /// account changesets.
680    fn populate_bundle_state<A, S>(
681        &self,
682        account_changeset: Vec<(u64, AccountBeforeTx)>,
683        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
684        plain_accounts_cursor: &mut A,
685        plain_storage_cursor: &mut S,
686    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
687    where
688        A: DbCursorRO<PlainAccountState>,
689        S: DbDupCursorRO<PlainStorageState>,
690    {
691        // iterate previous value and get plain state value to create changeset
692        // Double option around Account represent if Account state is know (first option) and
693        // account is removed (Second Option)
694        let mut state: BundleStateInit = HashMap::default();
695
696        // This is not working for blocks that are not at tip. as plain state is not the last
697        // state of end range. We should rename the functions or add support to access
698        // History state. Accessing history state can be tricky but we are not gaining
699        // anything.
700
701        let mut reverts: RevertsInit = HashMap::default();
702
703        // add account changeset changes
704        for (block_number, account_before) in account_changeset.into_iter().rev() {
705            let AccountBeforeTx { info: old_info, address } = account_before;
706            match state.entry(address) {
707                hash_map::Entry::Vacant(entry) => {
708                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
709                    entry.insert((old_info, new_info, HashMap::default()));
710                }
711                hash_map::Entry::Occupied(mut entry) => {
712                    // overwrite old account state.
713                    entry.get_mut().0 = old_info;
714                }
715            }
716            // insert old info into reverts.
717            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
718        }
719
720        // add storage changeset changes
721        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
722            let BlockNumberAddress((block_number, address)) = block_and_address;
723            // get account state or insert from plain state.
724            let account_state = match state.entry(address) {
725                hash_map::Entry::Vacant(entry) => {
726                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
727                    entry.insert((present_info, present_info, HashMap::default()))
728                }
729                hash_map::Entry::Occupied(entry) => entry.into_mut(),
730            };
731
732            // match storage.
733            match account_state.2.entry(old_storage.key) {
734                hash_map::Entry::Vacant(entry) => {
735                    let new_storage = plain_storage_cursor
736                        .seek_by_key_subkey(address, old_storage.key)?
737                        .filter(|storage| storage.key == old_storage.key)
738                        .unwrap_or_default();
739                    entry.insert((old_storage.value, new_storage.value));
740                }
741                hash_map::Entry::Occupied(mut entry) => {
742                    entry.get_mut().0 = old_storage.value;
743                }
744            };
745
746            reverts
747                .entry(block_number)
748                .or_default()
749                .entry(address)
750                .or_default()
751                .1
752                .push(old_storage);
753        }
754
755        Ok((state, reverts))
756    }
757}
758
759impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
760    /// Load shard and remove it. If list is empty, last shard was full or
761    /// there are no shards at all.
762    fn take_shard<T>(
763        &self,
764        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
765        key: T::Key,
766    ) -> ProviderResult<Vec<u64>>
767    where
768        T: Table<Value = BlockNumberList>,
769    {
770        if let Some((_, list)) = cursor.seek_exact(key)? {
771            // delete old shard so new one can be inserted.
772            cursor.delete_current()?;
773            let list = list.iter().collect::<Vec<_>>();
774            return Ok(list)
775        }
776        Ok(Vec::new())
777    }
778
779    /// Insert history index to the database.
780    ///
781    /// For each updated partial key, this function removes the last shard from
782    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
783    /// inserts the new shards back into the database.
784    ///
785    /// This function is used by history indexing stages.
786    fn append_history_index<P, T>(
787        &self,
788        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
789        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
790    ) -> ProviderResult<()>
791    where
792        P: Copy,
793        T: Table<Value = BlockNumberList>,
794    {
795        let mut cursor = self.tx.cursor_write::<T>()?;
796        for (partial_key, indices) in index_updates {
797            let mut last_shard =
798                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
799            last_shard.extend(indices);
800            // Chunk indices and insert them in shards of N size.
801            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
802            while let Some(list) = chunks.next() {
803                let highest_block_number = if chunks.peek().is_some() {
804                    *list.last().expect("`chunks` does not return empty list")
805                } else {
806                    // Insert last list with `u64::MAX`.
807                    u64::MAX
808                };
809                cursor.insert(
810                    sharded_key_factory(partial_key, highest_block_number),
811                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
812                )?;
813            }
814        }
815        Ok(())
816    }
817}
818
819impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
820    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
821        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
822    }
823}
824
825impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
826    fn changed_accounts_with_range(
827        &self,
828        range: impl RangeBounds<BlockNumber>,
829    ) -> ProviderResult<BTreeSet<Address>> {
830        self.tx
831            .cursor_read::<tables::AccountChangeSets>()?
832            .walk_range(range)?
833            .map(|entry| {
834                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
835            })
836            .collect()
837    }
838
839    fn basic_accounts(
840        &self,
841        iter: impl IntoIterator<Item = Address>,
842    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
843        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
844        Ok(iter
845            .into_iter()
846            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
847            .collect::<Result<Vec<_>, _>>()?)
848    }
849
850    fn changed_accounts_and_blocks_with_range(
851        &self,
852        range: RangeInclusive<BlockNumber>,
853    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
854        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
855
856        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
857            BTreeMap::new(),
858            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
859                let (index, account) = entry?;
860                accounts.entry(account.address).or_default().push(index);
861                Ok(accounts)
862            },
863        )?;
864
865        Ok(account_transitions)
866    }
867}
868
869impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
870    fn storage_changeset(
871        &self,
872        block_number: BlockNumber,
873    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
874        let range = block_number..=block_number;
875        let storage_range = BlockNumberAddress::range(range);
876        self.tx
877            .cursor_dup_read::<tables::StorageChangeSets>()?
878            .walk_range(storage_range)?
879            .map(|result| -> ProviderResult<_> { Ok(result?) })
880            .collect()
881    }
882}
883
884impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
885    fn account_block_changeset(
886        &self,
887        block_number: BlockNumber,
888    ) -> ProviderResult<Vec<AccountBeforeTx>> {
889        let range = block_number..=block_number;
890        self.tx
891            .cursor_read::<tables::AccountChangeSets>()?
892            .walk_range(range)?
893            .map(|result| -> ProviderResult<_> {
894                let (_, account_before) = result?;
895                Ok(account_before)
896            })
897            .collect()
898    }
899
900    fn get_account_before_block(
901        &self,
902        block_number: BlockNumber,
903        address: Address,
904    ) -> ProviderResult<Option<AccountBeforeTx>> {
905        self.tx
906            .cursor_dup_read::<tables::AccountChangeSets>()?
907            .seek_by_key_subkey(block_number, address)?
908            .filter(|acc| acc.address == address)
909            .map(Ok)
910            .transpose()
911    }
912}
913
914impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
915    for DatabaseProvider<TX, N>
916{
917    type Header = HeaderTy<N>;
918
919    fn local_tip_header(
920        &self,
921        highest_uninterrupted_block: BlockNumber,
922    ) -> ProviderResult<SealedHeader<Self::Header>> {
923        let static_file_provider = self.static_file_provider();
924
925        // Make sure Headers static file is at the same height. If it's further, this
926        // input execution was interrupted previously and we need to unwind the static file.
927        let next_static_file_block_num = static_file_provider
928            .get_highest_static_file_block(StaticFileSegment::Headers)
929            .map(|id| id + 1)
930            .unwrap_or_default();
931        let next_block = highest_uninterrupted_block + 1;
932
933        match next_static_file_block_num.cmp(&next_block) {
934            // The node shutdown between an executed static file commit and before the database
935            // commit, so we need to unwind the static files.
936            Ordering::Greater => {
937                let mut static_file_producer =
938                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
939                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
940                // Since this is a database <-> static file inconsistency, we commit the change
941                // straight away.
942                static_file_producer.commit()?
943            }
944            Ordering::Less => {
945                // There's either missing or corrupted files.
946                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
947            }
948            Ordering::Equal => {}
949        }
950
951        let local_head = static_file_provider
952            .sealed_header(highest_uninterrupted_block)?
953            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
954
955        Ok(local_head)
956    }
957}
958
959impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
960    type Header = HeaderTy<N>;
961
962    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
963        if let Some(num) = self.block_number(block_hash)? {
964            Ok(self.header_by_number(num)?)
965        } else {
966            Ok(None)
967        }
968    }
969
970    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
971        self.static_file_provider.header_by_number(num)
972    }
973
974    fn headers_range(
975        &self,
976        range: impl RangeBounds<BlockNumber>,
977    ) -> ProviderResult<Vec<Self::Header>> {
978        self.static_file_provider.headers_range(range)
979    }
980
981    fn sealed_header(
982        &self,
983        number: BlockNumber,
984    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
985        self.static_file_provider.sealed_header(number)
986    }
987
988    fn sealed_headers_while(
989        &self,
990        range: impl RangeBounds<BlockNumber>,
991        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
992    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
993        self.static_file_provider.sealed_headers_while(range, predicate)
994    }
995}
996
997impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
998    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
999        self.static_file_provider.block_hash(number)
1000    }
1001
1002    fn canonical_hashes_range(
1003        &self,
1004        start: BlockNumber,
1005        end: BlockNumber,
1006    ) -> ProviderResult<Vec<B256>> {
1007        self.static_file_provider.canonical_hashes_range(start, end)
1008    }
1009}
1010
1011impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1012    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1013        let best_number = self.best_block_number()?;
1014        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1015        Ok(ChainInfo { best_hash, best_number })
1016    }
1017
1018    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1019        // The best block number is tracked via the finished stage which gets updated in the same tx
1020        // when new blocks committed
1021        Ok(self
1022            .get_stage_checkpoint(StageId::Finish)?
1023            .map(|checkpoint| checkpoint.block_number)
1024            .unwrap_or_default())
1025    }
1026
1027    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1028        self.static_file_provider.last_block_number()
1029    }
1030
1031    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1032        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1033    }
1034}
1035
1036impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1037    type Block = BlockTy<N>;
1038
1039    fn find_block_by_hash(
1040        &self,
1041        hash: B256,
1042        source: BlockSource,
1043    ) -> ProviderResult<Option<Self::Block>> {
1044        if source.is_canonical() {
1045            self.block(hash.into())
1046        } else {
1047            Ok(None)
1048        }
1049    }
1050
1051    /// Returns the block with matching number from database.
1052    ///
1053    /// If the header for this block is not found, this returns `None`.
1054    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1055    /// will return None.
1056    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1057        if let Some(number) = self.convert_hash_or_number(id)? &&
1058            let Some(header) = self.header_by_number(number)?
1059        {
1060            // If the body indices are not found, this means that the transactions either do not
1061            // exist in the database yet, or they do exit but are not indexed.
1062            // If they exist but are not indexed, we don't have enough
1063            // information to return the block anyways, so we return `None`.
1064            let Some(transactions) = self.transactions_by_block(number.into())? else {
1065                return Ok(None)
1066            };
1067
1068            let body = self
1069                .storage
1070                .reader()
1071                .read_block_bodies(self, vec![(&header, transactions)])?
1072                .pop()
1073                .ok_or(ProviderError::InvalidStorageOutput)?;
1074
1075            return Ok(Some(Self::Block::new(header, body)))
1076        }
1077
1078        Ok(None)
1079    }
1080
1081    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1082        Ok(None)
1083    }
1084
1085    fn pending_block_and_receipts(
1086        &self,
1087    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1088        Ok(None)
1089    }
1090
1091    /// Returns the block with senders with matching number or hash from database.
1092    ///
1093    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1094    /// spot, and we want fast querying.**
1095    ///
1096    /// If the header for this block is not found, this returns `None`.
1097    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1098    /// will return None.
1099    fn recovered_block(
1100        &self,
1101        id: BlockHashOrNumber,
1102        transaction_kind: TransactionVariant,
1103    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1104        self.recovered_block(
1105            id,
1106            transaction_kind,
1107            |block_number| self.header_by_number(block_number),
1108            |header, body, senders| {
1109                Self::Block::new(header, body)
1110                    // Note: we're using unchecked here because we know the block contains valid txs
1111                    // wrt to its height and can ignore the s value check so pre
1112                    // EIP-2 txs are allowed
1113                    .try_into_recovered_unchecked(senders)
1114                    .map(Some)
1115                    .map_err(|_| ProviderError::SenderRecoveryError)
1116            },
1117        )
1118    }
1119
1120    fn sealed_block_with_senders(
1121        &self,
1122        id: BlockHashOrNumber,
1123        transaction_kind: TransactionVariant,
1124    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1125        self.recovered_block(
1126            id,
1127            transaction_kind,
1128            |block_number| self.sealed_header(block_number),
1129            |header, body, senders| {
1130                Self::Block::new_sealed(header, body)
1131                    // Note: we're using unchecked here because we know the block contains valid txs
1132                    // wrt to its height and can ignore the s value check so pre
1133                    // EIP-2 txs are allowed
1134                    .try_with_senders_unchecked(senders)
1135                    .map(Some)
1136                    .map_err(|_| ProviderError::SenderRecoveryError)
1137            },
1138        )
1139    }
1140
1141    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1142        self.block_range(
1143            range,
1144            |range| self.headers_range(range),
1145            |header, body, _| Ok(Self::Block::new(header, body)),
1146        )
1147    }
1148
1149    fn block_with_senders_range(
1150        &self,
1151        range: RangeInclusive<BlockNumber>,
1152    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1153        self.block_with_senders_range(
1154            range,
1155            |range| self.headers_range(range),
1156            |header, body, senders| {
1157                Self::Block::new(header, body)
1158                    .try_into_recovered_unchecked(senders)
1159                    .map_err(|_| ProviderError::SenderRecoveryError)
1160            },
1161        )
1162    }
1163
1164    fn recovered_block_range(
1165        &self,
1166        range: RangeInclusive<BlockNumber>,
1167    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1168        self.block_with_senders_range(
1169            range,
1170            |range| self.sealed_headers_range(range),
1171            |header, body, senders| {
1172                Self::Block::new_sealed(header, body)
1173                    .try_with_senders(senders)
1174                    .map_err(|_| ProviderError::SenderRecoveryError)
1175            },
1176        )
1177    }
1178
1179    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1180        Ok(self
1181            .tx
1182            .cursor_read::<tables::TransactionBlocks>()?
1183            .seek(id)
1184            .map(|b| b.map(|(_, bn)| bn))?)
1185    }
1186}
1187
1188impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1189    for DatabaseProvider<TX, N>
1190{
1191    /// Recovers transaction hashes by walking through `Transactions` table and
1192    /// calculating them in a parallel manner. Returned unsorted.
1193    fn transaction_hashes_by_range(
1194        &self,
1195        tx_range: Range<TxNumber>,
1196    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1197        self.static_file_provider.transaction_hashes_by_range(tx_range)
1198    }
1199}
1200
1201// Calculates the hash of the given transaction
1202impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1203    type Transaction = TxTy<N>;
1204
1205    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1206        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1207    }
1208
1209    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1210        self.static_file_provider.transaction_by_id(id)
1211    }
1212
1213    fn transaction_by_id_unhashed(
1214        &self,
1215        id: TxNumber,
1216    ) -> ProviderResult<Option<Self::Transaction>> {
1217        self.static_file_provider.transaction_by_id_unhashed(id)
1218    }
1219
1220    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1221        if let Some(id) = self.transaction_id(hash)? {
1222            Ok(self.transaction_by_id_unhashed(id)?)
1223        } else {
1224            Ok(None)
1225        }
1226    }
1227
1228    fn transaction_by_hash_with_meta(
1229        &self,
1230        tx_hash: TxHash,
1231    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1232        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1233            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1234            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1235            let Some(sealed_header) = self.sealed_header(block_number)?
1236        {
1237            let (header, block_hash) = sealed_header.split();
1238            if let Some(block_body) = self.block_body_indices(block_number)? {
1239                // the index of the tx in the block is the offset:
1240                // len([start..tx_id])
1241                // NOTE: `transaction_id` is always `>=` the block's first
1242                // index
1243                let index = transaction_id - block_body.first_tx_num();
1244
1245                let meta = TransactionMeta {
1246                    tx_hash,
1247                    index,
1248                    block_hash,
1249                    block_number,
1250                    base_fee: header.base_fee_per_gas(),
1251                    excess_blob_gas: header.excess_blob_gas(),
1252                    timestamp: header.timestamp(),
1253                };
1254
1255                return Ok(Some((transaction, meta)))
1256            }
1257        }
1258
1259        Ok(None)
1260    }
1261
1262    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1263        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1264        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1265    }
1266
1267    fn transactions_by_block(
1268        &self,
1269        id: BlockHashOrNumber,
1270    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1271        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1272            let Some(body) = self.block_body_indices(block_number)?
1273        {
1274            let tx_range = body.tx_num_range();
1275            return if tx_range.is_empty() {
1276                Ok(Some(Vec::new()))
1277            } else {
1278                self.transactions_by_tx_range(tx_range).map(Some)
1279            }
1280        }
1281        Ok(None)
1282    }
1283
1284    fn transactions_by_block_range(
1285        &self,
1286        range: impl RangeBounds<BlockNumber>,
1287    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1288        let range = to_range(range);
1289
1290        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1291            .into_iter()
1292            .map(|body| {
1293                let tx_num_range = body.tx_num_range();
1294                if tx_num_range.is_empty() {
1295                    Ok(Vec::new())
1296                } else {
1297                    self.transactions_by_tx_range(tx_num_range)
1298                }
1299            })
1300            .collect()
1301    }
1302
1303    fn transactions_by_tx_range(
1304        &self,
1305        range: impl RangeBounds<TxNumber>,
1306    ) -> ProviderResult<Vec<Self::Transaction>> {
1307        self.static_file_provider.transactions_by_tx_range(range)
1308    }
1309
1310    fn senders_by_tx_range(
1311        &self,
1312        range: impl RangeBounds<TxNumber>,
1313    ) -> ProviderResult<Vec<Address>> {
1314        self.cursor_read_collect::<tables::TransactionSenders>(range)
1315    }
1316
1317    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1318        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1319    }
1320}
1321
1322impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1323    type Receipt = ReceiptTy<N>;
1324
1325    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1326        self.static_file_provider.get_with_static_file_or_database(
1327            StaticFileSegment::Receipts,
1328            id,
1329            |static_file| static_file.receipt(id),
1330            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1331        )
1332    }
1333
1334    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1335        if let Some(id) = self.transaction_id(hash)? {
1336            self.receipt(id)
1337        } else {
1338            Ok(None)
1339        }
1340    }
1341
1342    fn receipts_by_block(
1343        &self,
1344        block: BlockHashOrNumber,
1345    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1346        if let Some(number) = self.convert_hash_or_number(block)? &&
1347            let Some(body) = self.block_body_indices(number)?
1348        {
1349            let tx_range = body.tx_num_range();
1350            return if tx_range.is_empty() {
1351                Ok(Some(Vec::new()))
1352            } else {
1353                self.receipts_by_tx_range(tx_range).map(Some)
1354            }
1355        }
1356        Ok(None)
1357    }
1358
1359    fn receipts_by_tx_range(
1360        &self,
1361        range: impl RangeBounds<TxNumber>,
1362    ) -> ProviderResult<Vec<Self::Receipt>> {
1363        self.static_file_provider.get_range_with_static_file_or_database(
1364            StaticFileSegment::Receipts,
1365            to_range(range),
1366            |static_file, range, _| static_file.receipts_by_tx_range(range),
1367            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1368            |_| true,
1369        )
1370    }
1371
1372    fn receipts_by_block_range(
1373        &self,
1374        block_range: RangeInclusive<BlockNumber>,
1375    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1376        if block_range.is_empty() {
1377            return Ok(Vec::new());
1378        }
1379
1380        // collect block body indices for each block in the range
1381        let mut block_body_indices = Vec::new();
1382        for block_num in block_range {
1383            if let Some(indices) = self.block_body_indices(block_num)? {
1384                block_body_indices.push(indices);
1385            } else {
1386                // use default indices for missing blocks (empty block)
1387                block_body_indices.push(StoredBlockBodyIndices::default());
1388            }
1389        }
1390
1391        if block_body_indices.is_empty() {
1392            return Ok(Vec::new());
1393        }
1394
1395        // find blocks with transactions to determine transaction range
1396        let non_empty_blocks: Vec<_> =
1397            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1398
1399        if non_empty_blocks.is_empty() {
1400            // all blocks are empty
1401            return Ok(vec![Vec::new(); block_body_indices.len()]);
1402        }
1403
1404        // calculate the overall transaction range
1405        let first_tx = non_empty_blocks[0].first_tx_num();
1406        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1407
1408        // fetch all receipts in the transaction range
1409        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1410        let mut receipts_iter = all_receipts.into_iter();
1411
1412        // distribute receipts to their respective blocks
1413        let mut result = Vec::with_capacity(block_body_indices.len());
1414        for indices in &block_body_indices {
1415            if indices.tx_count == 0 {
1416                result.push(Vec::new());
1417            } else {
1418                let block_receipts =
1419                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1420                result.push(block_receipts);
1421            }
1422        }
1423
1424        Ok(result)
1425    }
1426}
1427
1428impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1429    for DatabaseProvider<TX, N>
1430{
1431    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1432        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1433    }
1434
1435    fn block_body_indices_range(
1436        &self,
1437        range: RangeInclusive<BlockNumber>,
1438    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1439        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1440    }
1441}
1442
1443impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1444    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1445        Ok(if let Some(encoded) = id.get_pre_encoded() {
1446            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1447        } else {
1448            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1449        })
1450    }
1451
1452    /// Get stage checkpoint progress.
1453    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1454        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1455    }
1456
1457    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1458        self.tx
1459            .cursor_read::<tables::StageCheckpoints>()?
1460            .walk(None)?
1461            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1462            .map_err(ProviderError::Database)
1463    }
1464}
1465
1466impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1467    /// Save stage checkpoint.
1468    fn save_stage_checkpoint(
1469        &self,
1470        id: StageId,
1471        checkpoint: StageCheckpoint,
1472    ) -> ProviderResult<()> {
1473        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1474    }
1475
1476    /// Save stage checkpoint progress.
1477    fn save_stage_checkpoint_progress(
1478        &self,
1479        id: StageId,
1480        checkpoint: Vec<u8>,
1481    ) -> ProviderResult<()> {
1482        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1483    }
1484
1485    fn update_pipeline_stages(
1486        &self,
1487        block_number: BlockNumber,
1488        drop_stage_checkpoint: bool,
1489    ) -> ProviderResult<()> {
1490        // iterate over all existing stages in the table and update its progress.
1491        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1492        for stage_id in StageId::ALL {
1493            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1494            cursor.upsert(
1495                stage_id.to_string(),
1496                &StageCheckpoint {
1497                    block_number,
1498                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1499                },
1500            )?;
1501        }
1502
1503        Ok(())
1504    }
1505}
1506
1507impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1508    fn plain_state_storages(
1509        &self,
1510        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1511    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1512        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1513
1514        addresses_with_keys
1515            .into_iter()
1516            .map(|(address, storage)| {
1517                storage
1518                    .into_iter()
1519                    .map(|key| -> ProviderResult<_> {
1520                        Ok(plain_storage
1521                            .seek_by_key_subkey(address, key)?
1522                            .filter(|v| v.key == key)
1523                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1524                    })
1525                    .collect::<ProviderResult<Vec<_>>>()
1526                    .map(|storage| (address, storage))
1527            })
1528            .collect::<ProviderResult<Vec<(_, _)>>>()
1529    }
1530
1531    fn changed_storages_with_range(
1532        &self,
1533        range: RangeInclusive<BlockNumber>,
1534    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1535        self.tx
1536            .cursor_read::<tables::StorageChangeSets>()?
1537            .walk_range(BlockNumberAddress::range(range))?
1538            // fold all storages and save its old state so we can remove it from HashedStorage
1539            // it is needed as it is dup table.
1540            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1541                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1542                accounts.entry(address).or_default().insert(storage_entry.key);
1543                Ok(accounts)
1544            })
1545    }
1546
1547    fn changed_storages_and_blocks_with_range(
1548        &self,
1549        range: RangeInclusive<BlockNumber>,
1550    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1551        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1552
1553        let storage_changeset_lists =
1554            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1555                BTreeMap::new(),
1556                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1557                    let (index, storage) = entry?;
1558                    storages
1559                        .entry((index.address(), storage.key))
1560                        .or_default()
1561                        .push(index.block_number());
1562                    Ok(storages)
1563                },
1564            )?;
1565
1566        Ok(storage_changeset_lists)
1567    }
1568}
1569
1570impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1571    for DatabaseProvider<TX, N>
1572{
1573    type Receipt = ReceiptTy<N>;
1574
1575    fn write_state(
1576        &self,
1577        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1578        is_value_known: OriginalValuesKnown,
1579    ) -> ProviderResult<()> {
1580        let first_block = execution_outcome.first_block();
1581        let block_count = execution_outcome.len() as u64;
1582        let last_block = execution_outcome.last_block();
1583        let block_range = first_block..=last_block;
1584
1585        let tip = self.last_block_number()?.max(last_block);
1586
1587        let (plain_state, reverts) =
1588            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1589
1590        self.write_state_reverts(reverts, first_block)?;
1591        self.write_state_changes(plain_state)?;
1592
1593        // Fetch the first transaction number for each block in the range
1594        let block_indices: Vec<_> = self
1595            .block_body_indices_range(block_range)?
1596            .into_iter()
1597            .map(|b| b.first_tx_num)
1598            .collect();
1599
1600        // Ensure all expected blocks are present.
1601        if block_indices.len() < block_count as usize {
1602            let missing_blocks = block_count - block_indices.len() as u64;
1603            return Err(ProviderError::BlockBodyIndicesNotFound(
1604                last_block.saturating_sub(missing_blocks - 1),
1605            ));
1606        }
1607
1608        let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1609
1610        // Prepare receipts cursor if we are going to write receipts to the database
1611        //
1612        // We are writing to database if requested or if there's any kind of receipt pruning
1613        // configured
1614        let mut receipts_cursor = self.tx.cursor_write::<tables::Receipts<Self::Receipt>>()?;
1615
1616        // Prepare receipts static writer if we are going to write receipts to static files
1617        //
1618        // We are writing to static files if requested and if there's no receipt pruning configured
1619        let mut receipts_static_writer = has_receipts_pruning
1620            .not()
1621            .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1622            .transpose()?;
1623
1624        // All receipts from the last 128 blocks are required for blockchain tree, even with
1625        // [`PruneSegment::ContractLogs`].
1626        let prunable_receipts =
1627            PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1628
1629        for (idx, (receipts, first_tx_index)) in
1630            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1631        {
1632            let block_number = first_block + idx as u64;
1633
1634            // Increment block number for receipts static file writer
1635            if let Some(writer) = receipts_static_writer.as_mut() {
1636                writer.increment_block(block_number)?;
1637            }
1638
1639            // Skip writing receipts if pruning configuration requires us to.
1640            if prunable_receipts &&
1641                self.prune_modes
1642                    .receipts
1643                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1644            {
1645                continue
1646            }
1647
1648            for (idx, receipt) in receipts.iter().enumerate() {
1649                let receipt_idx = first_tx_index + idx as u64;
1650
1651                if let Some(writer) = &mut receipts_static_writer {
1652                    writer.append_receipt(receipt_idx, receipt)?;
1653                } else {
1654                    receipts_cursor.append(receipt_idx, receipt)?;
1655                }
1656            }
1657        }
1658
1659        Ok(())
1660    }
1661
1662    fn write_state_reverts(
1663        &self,
1664        reverts: PlainStateReverts,
1665        first_block: BlockNumber,
1666    ) -> ProviderResult<()> {
1667        // Write storage changes
1668        tracing::trace!("Writing storage changes");
1669        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1670        let mut storage_changeset_cursor =
1671            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1672        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1673            let block_number = first_block + block_index as BlockNumber;
1674
1675            tracing::trace!(block_number, "Writing block change");
1676            // sort changes by address.
1677            storage_changes.par_sort_unstable_by_key(|a| a.address);
1678            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1679                let storage_id = BlockNumberAddress((block_number, address));
1680
1681                let mut storage = storage_revert
1682                    .into_iter()
1683                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1684                    .collect::<Vec<_>>();
1685                // sort storage slots by key.
1686                storage.par_sort_unstable_by_key(|a| a.0);
1687
1688                // If we are writing the primary storage wipe transition, the pre-existing plain
1689                // storage state has to be taken from the database and written to storage history.
1690                // See [StorageWipe::Primary] for more details.
1691                //
1692                // TODO(mediocregopher): This could be rewritten in a way which doesn't require
1693                // collecting wiped entries into a Vec like this, see
1694                // `write_storage_trie_changesets`.
1695                let mut wiped_storage = Vec::new();
1696                if wiped {
1697                    tracing::trace!(?address, "Wiping storage");
1698                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1699                        wiped_storage.push((entry.key, entry.value));
1700                        while let Some(entry) = storages_cursor.next_dup_val()? {
1701                            wiped_storage.push((entry.key, entry.value))
1702                        }
1703                    }
1704                }
1705
1706                tracing::trace!(?address, ?storage, "Writing storage reverts");
1707                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1708                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1709                }
1710            }
1711        }
1712
1713        // Write account changes
1714        tracing::trace!("Writing account changes");
1715        let mut account_changeset_cursor =
1716            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1717
1718        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1719            let block_number = first_block + block_index as BlockNumber;
1720            // Sort accounts by address.
1721            account_block_reverts.par_sort_by_key(|a| a.0);
1722
1723            for (address, info) in account_block_reverts {
1724                account_changeset_cursor.append_dup(
1725                    block_number,
1726                    AccountBeforeTx { address, info: info.map(Into::into) },
1727                )?;
1728            }
1729        }
1730
1731        Ok(())
1732    }
1733
1734    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1735        // sort all entries so they can be written to database in more performant way.
1736        // and take smaller memory footprint.
1737        changes.accounts.par_sort_by_key(|a| a.0);
1738        changes.storage.par_sort_by_key(|a| a.address);
1739        changes.contracts.par_sort_by_key(|a| a.0);
1740
1741        // Write new account state
1742        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1743        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1744        // write account to database.
1745        for (address, account) in changes.accounts {
1746            if let Some(account) = account {
1747                tracing::trace!(?address, "Updating plain state account");
1748                accounts_cursor.upsert(address, &account.into())?;
1749            } else if accounts_cursor.seek_exact(address)?.is_some() {
1750                tracing::trace!(?address, "Deleting plain state account");
1751                accounts_cursor.delete_current()?;
1752            }
1753        }
1754
1755        // Write bytecode
1756        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1757        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1758        for (hash, bytecode) in changes.contracts {
1759            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1760        }
1761
1762        // Write new storage state and wipe storage if needed.
1763        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1764        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1765        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1766            // Wiping of storage.
1767            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1768                storages_cursor.delete_current_duplicates()?;
1769            }
1770            // cast storages to B256.
1771            let mut storage = storage
1772                .into_iter()
1773                .map(|(k, value)| StorageEntry { key: k.into(), value })
1774                .collect::<Vec<_>>();
1775            // sort storage slots by key.
1776            storage.par_sort_unstable_by_key(|a| a.key);
1777
1778            for entry in storage {
1779                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1780                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1781                    db_entry.key == entry.key
1782                {
1783                    storages_cursor.delete_current()?;
1784                }
1785
1786                if !entry.value.is_zero() {
1787                    storages_cursor.upsert(address, &entry)?;
1788                }
1789            }
1790        }
1791
1792        Ok(())
1793    }
1794
1795    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1796        // Write hashed account updates.
1797        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1798        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
1799            if let Some(account) = account {
1800                hashed_accounts_cursor.upsert(hashed_address, &account)?;
1801            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
1802                hashed_accounts_cursor.delete_current()?;
1803            }
1804        }
1805
1806        // Write hashed storage changes.
1807        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1808        let mut hashed_storage_cursor =
1809            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1810        for (hashed_address, storage) in sorted_storages {
1811            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1812                hashed_storage_cursor.delete_current_duplicates()?;
1813            }
1814
1815            for (hashed_slot, value) in storage.storage_slots_sorted() {
1816                let entry = StorageEntry { key: hashed_slot, value };
1817                if let Some(db_entry) =
1818                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1819                    db_entry.key == entry.key
1820                {
1821                    hashed_storage_cursor.delete_current()?;
1822                }
1823
1824                if !entry.value.is_zero() {
1825                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1826                }
1827            }
1828        }
1829
1830        Ok(())
1831    }
1832
1833    /// Remove the last N blocks of state.
1834    ///
1835    /// The latest state will be unwound
1836    ///
1837    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
1838    ///    transaction ids.
1839    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
1840    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
1841    ///    the changesets.
1842    ///    - In order to have both the old and new values in the changesets, we also access the
1843    ///      plain state tables.
1844    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
1845    ///    we:
1846    ///     1. Take the old value from the changeset
1847    ///     2. Take the new value from the plain state
1848    ///     3. Save the old value to the local state
1849    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
1850    ///    have seen before we:
1851    ///     1. Take the old value from the changeset
1852    ///     2. Take the new value from the local state
1853    ///     3. Set the local state to the value in the changeset
1854    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1855        let range = block + 1..=self.last_block_number()?;
1856
1857        if range.is_empty() {
1858            return Ok(());
1859        }
1860
1861        // We are not removing block meta as it is used to get block changesets.
1862        let block_bodies = self.block_body_indices_range(range.clone())?;
1863
1864        // get transaction receipts
1865        let from_transaction_num =
1866            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1867
1868        let storage_range = BlockNumberAddress::range(range.clone());
1869
1870        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1871        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1872
1873        // This is not working for blocks that are not at tip. as plain state is not the last
1874        // state of end range. We should rename the functions or add support to access
1875        // History state. Accessing history state can be tricky but we are not gaining
1876        // anything.
1877        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1878        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1879
1880        let (state, _) = self.populate_bundle_state(
1881            account_changeset,
1882            storage_changeset,
1883            &mut plain_accounts_cursor,
1884            &mut plain_storage_cursor,
1885        )?;
1886
1887        // iterate over local plain state remove all account and all storages.
1888        for (address, (old_account, new_account, storage)) in &state {
1889            // revert account if needed.
1890            if old_account != new_account {
1891                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1892                if let Some(account) = old_account {
1893                    plain_accounts_cursor.upsert(*address, account)?;
1894                } else if existing_entry.is_some() {
1895                    plain_accounts_cursor.delete_current()?;
1896                }
1897            }
1898
1899            // revert storages
1900            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1901                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1902                // delete previous value
1903                if plain_storage_cursor
1904                    .seek_by_key_subkey(*address, *storage_key)?
1905                    .filter(|s| s.key == *storage_key)
1906                    .is_some()
1907                {
1908                    plain_storage_cursor.delete_current()?
1909                }
1910
1911                // insert value if needed
1912                if !old_storage_value.is_zero() {
1913                    plain_storage_cursor.upsert(*address, &storage_entry)?;
1914                }
1915            }
1916        }
1917
1918        self.remove_receipts_from(from_transaction_num, block)?;
1919
1920        Ok(())
1921    }
1922
1923    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
1924    ///
1925    /// The latest state will be unwound and returned back with all the blocks
1926    ///
1927    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
1928    ///    transaction ids.
1929    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
1930    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
1931    ///    the changesets.
1932    ///    - In order to have both the old and new values in the changesets, we also access the
1933    ///      plain state tables.
1934    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
1935    ///    we:
1936    ///     1. Take the old value from the changeset
1937    ///     2. Take the new value from the plain state
1938    ///     3. Save the old value to the local state
1939    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
1940    ///    have seen before we:
1941    ///     1. Take the old value from the changeset
1942    ///     2. Take the new value from the local state
1943    ///     3. Set the local state to the value in the changeset
1944    fn take_state_above(
1945        &self,
1946        block: BlockNumber,
1947    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
1948        let range = block + 1..=self.last_block_number()?;
1949
1950        if range.is_empty() {
1951            return Ok(ExecutionOutcome::default())
1952        }
1953        let start_block_number = *range.start();
1954
1955        // We are not removing block meta as it is used to get block changesets.
1956        let block_bodies = self.block_body_indices_range(range.clone())?;
1957
1958        // get transaction receipts
1959        let from_transaction_num =
1960            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1961        let to_transaction_num =
1962            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
1963
1964        let storage_range = BlockNumberAddress::range(range.clone());
1965
1966        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1967        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1968
1969        // This is not working for blocks that are not at tip. as plain state is not the last
1970        // state of end range. We should rename the functions or add support to access
1971        // History state. Accessing history state can be tricky but we are not gaining
1972        // anything.
1973        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1974        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1975
1976        // populate bundle state and reverts from changesets / state cursors, to iterate over,
1977        // remove, and return later
1978        let (state, reverts) = self.populate_bundle_state(
1979            account_changeset,
1980            storage_changeset,
1981            &mut plain_accounts_cursor,
1982            &mut plain_storage_cursor,
1983        )?;
1984
1985        // iterate over local plain state remove all account and all storages.
1986        for (address, (old_account, new_account, storage)) in &state {
1987            // revert account if needed.
1988            if old_account != new_account {
1989                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1990                if let Some(account) = old_account {
1991                    plain_accounts_cursor.upsert(*address, account)?;
1992                } else if existing_entry.is_some() {
1993                    plain_accounts_cursor.delete_current()?;
1994                }
1995            }
1996
1997            // revert storages
1998            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1999                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2000                // delete previous value
2001                if plain_storage_cursor
2002                    .seek_by_key_subkey(*address, *storage_key)?
2003                    .filter(|s| s.key == *storage_key)
2004                    .is_some()
2005                {
2006                    plain_storage_cursor.delete_current()?
2007                }
2008
2009                // insert value if needed
2010                if !old_storage_value.is_zero() {
2011                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2012                }
2013            }
2014        }
2015
2016        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2017        let mut receipts_iter = self
2018            .static_file_provider
2019            .get_range_with_static_file_or_database(
2020                StaticFileSegment::Receipts,
2021                from_transaction_num..to_transaction_num + 1,
2022                |static_file, range, _| {
2023                    static_file
2024                        .receipts_by_tx_range(range.clone())
2025                        .map(|r| range.into_iter().zip(r).collect())
2026                },
2027                |range, _| {
2028                    self.tx
2029                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2030                        .walk_range(range)?
2031                        .map(|r| r.map_err(Into::into))
2032                        .collect()
2033                },
2034                |_| true,
2035            )?
2036            .into_iter()
2037            .peekable();
2038
2039        let mut receipts = Vec::with_capacity(block_bodies.len());
2040        // loop break if we are at the end of the blocks.
2041        for block_body in block_bodies {
2042            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2043            for num in block_body.tx_num_range() {
2044                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2045                    block_receipts.push(receipts_iter.next().unwrap().1);
2046                }
2047            }
2048            receipts.push(block_receipts);
2049        }
2050
2051        self.remove_receipts_from(from_transaction_num, block)?;
2052
2053        Ok(ExecutionOutcome::new_init(
2054            state,
2055            reverts,
2056            Vec::new(),
2057            receipts,
2058            start_block_number,
2059            Vec::new(),
2060        ))
2061    }
2062}
2063
2064impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2065    /// Writes trie updates to the database with already sorted updates.
2066    ///
2067    /// Returns the number of entries modified.
2068    fn write_trie_updates_sorted(&self, trie_updates: &TrieUpdatesSorted) -> ProviderResult<usize> {
2069        if trie_updates.is_empty() {
2070            return Ok(0)
2071        }
2072
2073        // Track the number of inserted entries.
2074        let mut num_entries = 0;
2075
2076        let tx = self.tx_ref();
2077        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2078
2079        // Process sorted account nodes
2080        for (key, updated_node) in trie_updates.account_nodes_ref() {
2081            let nibbles = StoredNibbles(*key);
2082            match updated_node {
2083                Some(node) => {
2084                    if !nibbles.0.is_empty() {
2085                        num_entries += 1;
2086                        account_trie_cursor.upsert(nibbles, node)?;
2087                    }
2088                }
2089                None => {
2090                    num_entries += 1;
2091                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2092                        account_trie_cursor.delete_current()?;
2093                    }
2094                }
2095            }
2096        }
2097
2098        num_entries +=
2099            self.write_storage_trie_updates_sorted(trie_updates.storage_tries_ref().iter())?;
2100
2101        Ok(num_entries)
2102    }
2103
2104    /// Records the current values of all trie nodes which will be updated using the `TrieUpdates`
2105    /// into the trie changesets tables.
2106    ///
2107    /// The intended usage of this method is to call it _prior_ to calling `write_trie_updates` with
2108    /// the same `TrieUpdates`.
2109    ///
2110    /// Returns the number of keys written.
2111    fn write_trie_changesets(
2112        &self,
2113        block_number: BlockNumber,
2114        trie_updates: &TrieUpdatesSorted,
2115        updates_overlay: Option<&TrieUpdatesSorted>,
2116    ) -> ProviderResult<usize> {
2117        let mut num_entries = 0;
2118
2119        let mut changeset_cursor =
2120            self.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2121        let curr_values_cursor = self.tx_ref().cursor_read::<tables::AccountsTrie>()?;
2122
2123        // Wrap the cursor in DatabaseAccountTrieCursor
2124        let mut db_account_cursor = DatabaseAccountTrieCursor::new(curr_values_cursor);
2125
2126        // Static empty array for when updates_overlay is None
2127        static EMPTY_ACCOUNT_UPDATES: Vec<(Nibbles, Option<BranchNodeCompact>)> = Vec::new();
2128
2129        // Get the overlay updates for account trie, or use an empty array
2130        let account_overlay_updates = updates_overlay
2131            .map(|overlay| overlay.account_nodes_ref())
2132            .unwrap_or(&EMPTY_ACCOUNT_UPDATES);
2133
2134        // Wrap the cursor in InMemoryTrieCursor with the overlay
2135        let mut in_memory_account_cursor =
2136            InMemoryTrieCursor::new(Some(&mut db_account_cursor), account_overlay_updates);
2137
2138        for (path, _) in trie_updates.account_nodes_ref() {
2139            num_entries += 1;
2140            let node = in_memory_account_cursor.seek_exact(*path)?.map(|(_, node)| node);
2141            changeset_cursor.append_dup(
2142                block_number,
2143                TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(*path), node },
2144            )?;
2145        }
2146
2147        let mut storage_updates = trie_updates.storage_tries_ref().iter().collect::<Vec<_>>();
2148        storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2149
2150        num_entries += self.write_storage_trie_changesets(
2151            block_number,
2152            storage_updates.into_iter(),
2153            updates_overlay,
2154        )?;
2155
2156        Ok(num_entries)
2157    }
2158
2159    fn clear_trie_changesets(&self) -> ProviderResult<()> {
2160        let tx = self.tx_ref();
2161        tx.clear::<tables::AccountsTrieChangeSets>()?;
2162        tx.clear::<tables::StoragesTrieChangeSets>()?;
2163        Ok(())
2164    }
2165
2166    fn clear_trie_changesets_from(&self, from: BlockNumber) -> ProviderResult<()> {
2167        let tx = self.tx_ref();
2168        {
2169            let range = from..;
2170            let mut cursor = tx.cursor_dup_write::<tables::AccountsTrieChangeSets>()?;
2171            let mut walker = cursor.walk_range(range)?;
2172
2173            while walker.next().transpose()?.is_some() {
2174                walker.delete_current()?;
2175            }
2176        }
2177
2178        {
2179            let range: RangeFrom<BlockNumberHashedAddress> = (from, B256::ZERO).into()..;
2180            let mut cursor = tx.cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2181            let mut walker = cursor.walk_range(range)?;
2182
2183            while walker.next().transpose()?.is_some() {
2184                walker.delete_current()?;
2185            }
2186        }
2187
2188        Ok(())
2189    }
2190}
2191
2192impl<TX: DbTx + 'static, N: NodeTypes> TrieReader for DatabaseProvider<TX, N> {
2193    fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
2194        let tx = self.tx_ref();
2195
2196        // Read account trie changes directly into a Vec - data is already sorted by nibbles
2197        // within each block, and we want the oldest (first) version of each node sorted by path.
2198        let mut account_nodes = Vec::new();
2199        let mut seen_account_keys = HashSet::new();
2200        let mut accounts_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2201
2202        for entry in accounts_cursor.walk_range(from..)? {
2203            let (_, TrieChangeSetsEntry { nibbles, node }) = entry?;
2204            // Only keep the first (oldest) version of each node
2205            if seen_account_keys.insert(nibbles.0) {
2206                account_nodes.push((nibbles.0, node));
2207            }
2208        }
2209
2210        account_nodes.sort_by_key(|(path, _)| *path);
2211
2212        // Read storage trie changes - data is sorted by (block, hashed_address, nibbles)
2213        // Keep track of seen (address, nibbles) pairs to only keep the oldest version per address,
2214        // sorted by path.
2215        let mut storage_tries = B256Map::<Vec<_>>::default();
2216        let mut seen_storage_keys = HashSet::new();
2217        let mut storages_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2218
2219        // Create storage range starting from `from` block
2220        let storage_range_start = BlockNumberHashedAddress((from, B256::ZERO));
2221
2222        for entry in storages_cursor.walk_range(storage_range_start..)? {
2223            let (
2224                BlockNumberHashedAddress((_, hashed_address)),
2225                TrieChangeSetsEntry { nibbles, node },
2226            ) = entry?;
2227
2228            // Only keep the first (oldest) version of each node for this address
2229            if seen_storage_keys.insert((hashed_address, nibbles.0)) {
2230                storage_tries.entry(hashed_address).or_default().push((nibbles.0, node));
2231            }
2232        }
2233
2234        // Convert to StorageTrieUpdatesSorted
2235        let storage_tries = storage_tries
2236            .into_iter()
2237            .map(|(address, mut nodes)| {
2238                nodes.sort_by_key(|(path, _)| *path);
2239                (address, StorageTrieUpdatesSorted { storage_nodes: nodes, is_deleted: false })
2240            })
2241            .collect();
2242
2243        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2244    }
2245
2246    fn get_block_trie_updates(
2247        &self,
2248        block_number: BlockNumber,
2249    ) -> ProviderResult<TrieUpdatesSorted> {
2250        let tx = self.tx_ref();
2251
2252        // Step 1: Get the trie reverts for the state after the target block
2253        let reverts = self.trie_reverts(block_number + 1)?;
2254
2255        // Step 2: Create an InMemoryTrieCursorFactory with the reverts
2256        // This gives us the trie state as it was after the target block was processed
2257        let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
2258        let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
2259
2260        // Step 3: Collect all account trie nodes that changed in the target block
2261        let mut account_nodes = Vec::new();
2262
2263        // Walk through all account trie changes for this block
2264        let mut accounts_trie_cursor = tx.cursor_dup_read::<tables::AccountsTrieChangeSets>()?;
2265        let mut account_cursor = cursor_factory.account_trie_cursor()?;
2266
2267        for entry in accounts_trie_cursor.walk_dup(Some(block_number), None)? {
2268            let (_, TrieChangeSetsEntry { nibbles, .. }) = entry?;
2269            // Look up the current value of this trie node using the overlay cursor
2270            let node_value = account_cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2271            account_nodes.push((nibbles.0, node_value));
2272        }
2273
2274        // Step 4: Collect all storage trie nodes that changed in the target block
2275        let mut storage_tries = B256Map::default();
2276        let mut storages_trie_cursor = tx.cursor_dup_read::<tables::StoragesTrieChangeSets>()?;
2277        let storage_range_start = BlockNumberHashedAddress((block_number, B256::ZERO));
2278        let storage_range_end = BlockNumberHashedAddress((block_number + 1, B256::ZERO));
2279
2280        let mut current_hashed_address = None;
2281        let mut storage_cursor = None;
2282
2283        for entry in storages_trie_cursor.walk_range(storage_range_start..storage_range_end)? {
2284            let (
2285                BlockNumberHashedAddress((_, hashed_address)),
2286                TrieChangeSetsEntry { nibbles, .. },
2287            ) = entry?;
2288
2289            // Check if we need to create a new storage cursor for a different account
2290            if current_hashed_address != Some(hashed_address) {
2291                storage_cursor = Some(cursor_factory.storage_trie_cursor(hashed_address)?);
2292                current_hashed_address = Some(hashed_address);
2293            }
2294
2295            // Look up the current value of this storage trie node
2296            let cursor =
2297                storage_cursor.as_mut().expect("storage_cursor was just initialized above");
2298            let node_value = cursor.seek_exact(nibbles.0)?.map(|(_, node)| node);
2299            storage_tries
2300                .entry(hashed_address)
2301                .or_insert_with(|| StorageTrieUpdatesSorted {
2302                    storage_nodes: Vec::new(),
2303                    is_deleted: false,
2304                })
2305                .storage_nodes
2306                .push((nibbles.0, node_value));
2307        }
2308
2309        Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
2310    }
2311}
2312
2313impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2314    /// Writes storage trie updates from the given storage trie map with already sorted updates.
2315    ///
2316    /// Expects the storage trie updates to already be sorted by the hashed address key.
2317    ///
2318    /// Returns the number of entries modified.
2319    fn write_storage_trie_updates_sorted<'a>(
2320        &self,
2321        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2322    ) -> ProviderResult<usize> {
2323        let mut num_entries = 0;
2324        let mut storage_tries = storage_tries.collect::<Vec<_>>();
2325        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2326        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2327        for (hashed_address, storage_trie_updates) in storage_tries {
2328            let mut db_storage_trie_cursor =
2329                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2330            num_entries +=
2331                db_storage_trie_cursor.write_storage_trie_updates_sorted(storage_trie_updates)?;
2332            cursor = db_storage_trie_cursor.cursor;
2333        }
2334
2335        Ok(num_entries)
2336    }
2337
2338    /// Records the current values of all trie nodes which will be updated using the
2339    /// `StorageTrieUpdates` into the storage trie changesets table.
2340    ///
2341    /// The intended usage of this method is to call it _prior_ to calling
2342    /// `write_storage_trie_updates` with the same set of `StorageTrieUpdates`.
2343    ///
2344    /// Returns the number of keys written.
2345    fn write_storage_trie_changesets<'a>(
2346        &self,
2347        block_number: BlockNumber,
2348        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdatesSorted)>,
2349        updates_overlay: Option<&TrieUpdatesSorted>,
2350    ) -> ProviderResult<usize> {
2351        let mut num_written = 0;
2352
2353        let mut changeset_cursor =
2354            self.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>()?;
2355
2356        // We hold two cursors to the same table because we use them simultaneously when an
2357        // account's storage is wiped. We keep them outside the for-loop so they can be re-used
2358        // between accounts.
2359        let changed_curr_values_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2360        let wiped_nodes_cursor = self.tx_ref().cursor_dup_read::<tables::StoragesTrie>()?;
2361
2362        // DatabaseStorageTrieCursor requires ownership of the cursor. The easiest way to deal with
2363        // this is to create this outer variable with an initial dummy account, and overwrite it on
2364        // every loop for every real account.
2365        let mut changed_curr_values_cursor = DatabaseStorageTrieCursor::new(
2366            changed_curr_values_cursor,
2367            B256::default(), // Will be set per iteration
2368        );
2369        let mut wiped_nodes_cursor = DatabaseStorageTrieCursor::new(
2370            wiped_nodes_cursor,
2371            B256::default(), // Will be set per iteration
2372        );
2373
2374        // Static empty array for when updates_overlay is None
2375        static EMPTY_UPDATES: Vec<(Nibbles, Option<BranchNodeCompact>)> = Vec::new();
2376
2377        for (hashed_address, storage_trie_updates) in storage_tries {
2378            let changeset_key = BlockNumberHashedAddress((block_number, *hashed_address));
2379
2380            // Update the hashed address for the cursors
2381            changed_curr_values_cursor =
2382                DatabaseStorageTrieCursor::new(changed_curr_values_cursor.cursor, *hashed_address);
2383
2384            // Get the overlay updates for this storage trie, or use an empty array
2385            let overlay_updates = updates_overlay
2386                .and_then(|overlay| overlay.storage_tries_ref().get(hashed_address))
2387                .map(|updates| updates.storage_nodes_ref())
2388                .unwrap_or(&EMPTY_UPDATES);
2389
2390            // Wrap the cursor in InMemoryTrieCursor with the overlay
2391            let mut in_memory_changed_cursor =
2392                InMemoryTrieCursor::new(Some(&mut changed_curr_values_cursor), overlay_updates);
2393
2394            // Create an iterator which produces the current values of all updated paths, or None if
2395            // they are currently unset.
2396            let curr_values_of_changed = StorageTrieCurrentValuesIter::new(
2397                storage_trie_updates.storage_nodes.iter().map(|e| e.0),
2398                &mut in_memory_changed_cursor,
2399            )?;
2400
2401            if storage_trie_updates.is_deleted() {
2402                // Create an iterator that starts from the beginning of the storage trie for this
2403                // account
2404                wiped_nodes_cursor =
2405                    DatabaseStorageTrieCursor::new(wiped_nodes_cursor.cursor, *hashed_address);
2406
2407                // Wrap the wiped nodes cursor in InMemoryTrieCursor with the overlay
2408                let mut in_memory_wiped_cursor =
2409                    InMemoryTrieCursor::new(Some(&mut wiped_nodes_cursor), overlay_updates);
2410
2411                let all_nodes = TrieCursorIter::new(&mut in_memory_wiped_cursor);
2412
2413                for wiped in storage_trie_wiped_changeset_iter(curr_values_of_changed, all_nodes)? {
2414                    let (path, node) = wiped?;
2415                    num_written += 1;
2416                    changeset_cursor.append_dup(
2417                        changeset_key,
2418                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2419                    )?;
2420                }
2421            } else {
2422                for curr_value in curr_values_of_changed {
2423                    let (path, node) = curr_value?;
2424                    num_written += 1;
2425                    changeset_cursor.append_dup(
2426                        changeset_key,
2427                        TrieChangeSetsEntry { nibbles: StoredNibblesSubKey(path), node },
2428                    )?;
2429                }
2430            }
2431        }
2432
2433        Ok(num_written)
2434    }
2435}
2436
2437impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2438    fn unwind_account_hashing<'a>(
2439        &self,
2440        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2441    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2442        // Aggregate all block changesets and make a list of accounts that have been changed.
2443        // Note that collecting and then reversing the order is necessary to ensure that the
2444        // changes are applied in the correct order.
2445        let hashed_accounts = changesets
2446            .into_iter()
2447            .map(|(_, e)| (keccak256(e.address), e.info))
2448            .collect::<Vec<_>>()
2449            .into_iter()
2450            .rev()
2451            .collect::<BTreeMap<_, _>>();
2452
2453        // Apply values to HashedState, and remove the account if it's None.
2454        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2455        for (hashed_address, account) in &hashed_accounts {
2456            if let Some(account) = account {
2457                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2458            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2459                hashed_accounts_cursor.delete_current()?;
2460            }
2461        }
2462
2463        Ok(hashed_accounts)
2464    }
2465
2466    fn unwind_account_hashing_range(
2467        &self,
2468        range: impl RangeBounds<BlockNumber>,
2469    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2470        let changesets = self
2471            .tx
2472            .cursor_read::<tables::AccountChangeSets>()?
2473            .walk_range(range)?
2474            .collect::<Result<Vec<_>, _>>()?;
2475        self.unwind_account_hashing(changesets.iter())
2476    }
2477
2478    fn insert_account_for_hashing(
2479        &self,
2480        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2481    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2482        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2483        let hashed_accounts =
2484            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2485        for (hashed_address, account) in &hashed_accounts {
2486            if let Some(account) = account {
2487                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2488            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2489                hashed_accounts_cursor.delete_current()?;
2490            }
2491        }
2492        Ok(hashed_accounts)
2493    }
2494
2495    fn unwind_storage_hashing(
2496        &self,
2497        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2498    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2499        // Aggregate all block changesets and make list of accounts that have been changed.
2500        let mut hashed_storages = changesets
2501            .into_iter()
2502            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2503                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2504            })
2505            .collect::<Vec<_>>();
2506        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2507
2508        // Apply values to HashedState, and remove the account if it's None.
2509        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2510            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2511        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2512        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2513            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2514
2515            if hashed_storage
2516                .seek_by_key_subkey(hashed_address, key)?
2517                .filter(|entry| entry.key == key)
2518                .is_some()
2519            {
2520                hashed_storage.delete_current()?;
2521            }
2522
2523            if !value.is_zero() {
2524                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2525            }
2526        }
2527        Ok(hashed_storage_keys)
2528    }
2529
2530    fn unwind_storage_hashing_range(
2531        &self,
2532        range: impl RangeBounds<BlockNumberAddress>,
2533    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2534        let changesets = self
2535            .tx
2536            .cursor_read::<tables::StorageChangeSets>()?
2537            .walk_range(range)?
2538            .collect::<Result<Vec<_>, _>>()?;
2539        self.unwind_storage_hashing(changesets.into_iter())
2540    }
2541
2542    fn insert_storage_for_hashing(
2543        &self,
2544        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2545    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2546        // hash values
2547        let hashed_storages =
2548            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2549                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2550                    map.insert(keccak256(entry.key), entry.value);
2551                    map
2552                });
2553                map.insert(keccak256(address), storage);
2554                map
2555            });
2556
2557        let hashed_storage_keys = hashed_storages
2558            .iter()
2559            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2560            .collect();
2561
2562        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2563        // Hash the address and key and apply them to HashedStorage (if Storage is None
2564        // just remove it);
2565        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2566            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2567                if hashed_storage_cursor
2568                    .seek_by_key_subkey(hashed_address, key)?
2569                    .filter(|entry| entry.key == key)
2570                    .is_some()
2571                {
2572                    hashed_storage_cursor.delete_current()?;
2573                }
2574
2575                if !value.is_zero() {
2576                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2577                }
2578                Ok(())
2579            })
2580        })?;
2581
2582        Ok(hashed_storage_keys)
2583    }
2584}
2585
2586impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2587    fn unwind_account_history_indices<'a>(
2588        &self,
2589        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2590    ) -> ProviderResult<usize> {
2591        let mut last_indices = changesets
2592            .into_iter()
2593            .map(|(index, account)| (account.address, *index))
2594            .collect::<Vec<_>>();
2595        last_indices.sort_by_key(|(a, _)| *a);
2596
2597        // Unwind the account history index.
2598        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2599        for &(address, rem_index) in &last_indices {
2600            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2601                &mut cursor,
2602                ShardedKey::last(address),
2603                rem_index,
2604                |sharded_key| sharded_key.key == address,
2605            )?;
2606
2607            // Check the last returned partial shard.
2608            // If it's not empty, the shard needs to be reinserted.
2609            if !partial_shard.is_empty() {
2610                cursor.insert(
2611                    ShardedKey::last(address),
2612                    &BlockNumberList::new_pre_sorted(partial_shard),
2613                )?;
2614            }
2615        }
2616
2617        let changesets = last_indices.len();
2618        Ok(changesets)
2619    }
2620
2621    fn unwind_account_history_indices_range(
2622        &self,
2623        range: impl RangeBounds<BlockNumber>,
2624    ) -> ProviderResult<usize> {
2625        let changesets = self
2626            .tx
2627            .cursor_read::<tables::AccountChangeSets>()?
2628            .walk_range(range)?
2629            .collect::<Result<Vec<_>, _>>()?;
2630        self.unwind_account_history_indices(changesets.iter())
2631    }
2632
2633    fn insert_account_history_index(
2634        &self,
2635        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2636    ) -> ProviderResult<()> {
2637        self.append_history_index::<_, tables::AccountsHistory>(
2638            account_transitions,
2639            ShardedKey::new,
2640        )
2641    }
2642
2643    fn unwind_storage_history_indices(
2644        &self,
2645        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2646    ) -> ProviderResult<usize> {
2647        let mut storage_changesets = changesets
2648            .into_iter()
2649            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2650            .collect::<Vec<_>>();
2651        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2652
2653        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2654        for &(address, storage_key, rem_index) in &storage_changesets {
2655            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2656                &mut cursor,
2657                StorageShardedKey::last(address, storage_key),
2658                rem_index,
2659                |storage_sharded_key| {
2660                    storage_sharded_key.address == address &&
2661                        storage_sharded_key.sharded_key.key == storage_key
2662                },
2663            )?;
2664
2665            // Check the last returned partial shard.
2666            // If it's not empty, the shard needs to be reinserted.
2667            if !partial_shard.is_empty() {
2668                cursor.insert(
2669                    StorageShardedKey::last(address, storage_key),
2670                    &BlockNumberList::new_pre_sorted(partial_shard),
2671                )?;
2672            }
2673        }
2674
2675        let changesets = storage_changesets.len();
2676        Ok(changesets)
2677    }
2678
2679    fn unwind_storage_history_indices_range(
2680        &self,
2681        range: impl RangeBounds<BlockNumberAddress>,
2682    ) -> ProviderResult<usize> {
2683        let changesets = self
2684            .tx
2685            .cursor_read::<tables::StorageChangeSets>()?
2686            .walk_range(range)?
2687            .collect::<Result<Vec<_>, _>>()?;
2688        self.unwind_storage_history_indices(changesets.into_iter())
2689    }
2690
2691    fn insert_storage_history_index(
2692        &self,
2693        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2694    ) -> ProviderResult<()> {
2695        self.append_history_index::<_, tables::StoragesHistory>(
2696            storage_transitions,
2697            |(address, storage_key), highest_block_number| {
2698                StorageShardedKey::new(address, storage_key, highest_block_number)
2699            },
2700        )
2701    }
2702
2703    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2704        // account history stage
2705        {
2706            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2707            self.insert_account_history_index(indices)?;
2708        }
2709
2710        // storage history stage
2711        {
2712            let indices = self.changed_storages_and_blocks_with_range(range)?;
2713            self.insert_storage_history_index(indices)?;
2714        }
2715
2716        Ok(())
2717    }
2718}
2719
2720impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2721    for DatabaseProvider<TX, N>
2722{
2723    fn take_block_and_execution_above(
2724        &self,
2725        block: BlockNumber,
2726    ) -> ProviderResult<Chain<Self::Primitives>> {
2727        let range = block + 1..=self.last_block_number()?;
2728
2729        self.unwind_trie_state_from(block + 1)?;
2730
2731        // get execution res
2732        let execution_state = self.take_state_above(block)?;
2733
2734        let blocks = self.recovered_block_range(range)?;
2735
2736        // remove block bodies it is needed for both get block range and get block execution results
2737        // that is why it is deleted afterwards.
2738        self.remove_blocks_above(block)?;
2739
2740        // Update pipeline progress
2741        self.update_pipeline_stages(block, true)?;
2742
2743        Ok(Chain::new(blocks, execution_state, None))
2744    }
2745
2746    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2747        self.unwind_trie_state_from(block + 1)?;
2748
2749        // remove execution res
2750        self.remove_state_above(block)?;
2751
2752        // remove block bodies it is needed for both get block range and get block execution results
2753        // that is why it is deleted afterwards.
2754        self.remove_blocks_above(block)?;
2755
2756        // Update pipeline progress
2757        self.update_pipeline_stages(block, true)?;
2758
2759        Ok(())
2760    }
2761}
2762
2763impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2764    for DatabaseProvider<TX, N>
2765{
2766    type Block = BlockTy<N>;
2767    type Receipt = ReceiptTy<N>;
2768
2769    /// Inserts the block into the database, always modifying the following static file segments and
2770    /// tables:
2771    /// * [`StaticFileSegment::Headers`]
2772    /// * [`tables::HeaderNumbers`]
2773    /// * [`tables::BlockBodyIndices`]
2774    ///
2775    /// If there are transactions in the block, the following static file segments and tables will
2776    /// be modified:
2777    /// * [`StaticFileSegment::Transactions`]
2778    /// * [`tables::TransactionBlocks`]
2779    ///
2780    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2781    /// If withdrawals are not empty, this will modify
2782    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2783    ///
2784    /// If the provider has __not__ configured full sender pruning, this will modify
2785    /// [`TransactionSenders`](tables::TransactionSenders).
2786    ///
2787    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2788    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2789    fn insert_block(
2790        &self,
2791        block: RecoveredBlock<Self::Block>,
2792    ) -> ProviderResult<StoredBlockBodyIndices> {
2793        let block_number = block.number();
2794
2795        let mut durations_recorder = metrics::DurationsRecorder::default();
2796
2797        self.static_file_provider
2798            .get_writer(block_number, StaticFileSegment::Headers)?
2799            .append_header(block.header(), &block.hash())?;
2800
2801        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2802        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2803
2804        let mut next_tx_num = self
2805            .tx
2806            .cursor_read::<tables::TransactionBlocks>()?
2807            .last()?
2808            .map(|(n, _)| n + 1)
2809            .unwrap_or_default();
2810        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2811        let first_tx_num = next_tx_num;
2812
2813        let tx_count = block.body().transaction_count() as u64;
2814
2815        // Ensures we have all the senders for the block's transactions.
2816        for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2817            let hash = transaction.tx_hash();
2818
2819            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2820                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2821            }
2822
2823            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2824                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2825            }
2826            next_tx_num += 1;
2827        }
2828
2829        self.append_block_bodies(vec![(block_number, Some(block.into_body()))])?;
2830
2831        debug!(
2832            target: "providers::db",
2833            ?block_number,
2834            actions = ?durations_recorder.actions,
2835            "Inserted block"
2836        );
2837
2838        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2839    }
2840
2841    fn append_block_bodies(
2842        &self,
2843        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2844    ) -> ProviderResult<()> {
2845        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2846
2847        // Initialize writer if we will be writing transactions to staticfiles
2848        let mut tx_writer =
2849            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2850
2851        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2852        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2853
2854        // Get id for the next tx_num or zero if there are no transactions.
2855        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2856
2857        for (block_number, body) in &bodies {
2858            // Increment block on static file header.
2859            tx_writer.increment_block(*block_number)?;
2860
2861            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2862            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2863
2864            let mut durations_recorder = metrics::DurationsRecorder::default();
2865
2866            // insert block meta
2867            block_indices_cursor.append(*block_number, &block_indices)?;
2868
2869            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2870
2871            let Some(body) = body else { continue };
2872
2873            // write transaction block index
2874            if !body.transactions().is_empty() {
2875                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2876                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2877            }
2878
2879            // write transactions
2880            for transaction in body.transactions() {
2881                tx_writer.append_transaction(next_tx_num, transaction)?;
2882
2883                // Increment transaction id for each transaction.
2884                next_tx_num += 1;
2885            }
2886        }
2887
2888        self.storage.writer().write_block_bodies(self, bodies)?;
2889
2890        Ok(())
2891    }
2892
2893    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2894        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
2895        for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2896            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2897        }
2898
2899        // Get highest static file block for the total block range
2900        let highest_static_file_block = self
2901            .static_file_provider()
2902            .get_highest_static_file_block(StaticFileSegment::Headers)
2903            .expect("todo: error handling, headers should exist");
2904
2905        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
2906        // we remove only what is ABOVE the block.
2907        //
2908        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
2909        // will have three blocks to remove, which will be block 8, 7, and 6.
2910        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
2911        self.static_file_provider()
2912            .get_writer(block, StaticFileSegment::Headers)?
2913            .prune_headers(highest_static_file_block.saturating_sub(block))?;
2914
2915        // First transaction to be removed
2916        let unwind_tx_from = self
2917            .block_body_indices(block)?
2918            .map(|b| b.next_tx_num())
2919            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2920
2921        // Last transaction to be removed
2922        let unwind_tx_to = self
2923            .tx
2924            .cursor_read::<tables::BlockBodyIndices>()?
2925            .last()?
2926            // shouldn't happen because this was OK above
2927            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2928            .1
2929            .last_tx_num();
2930
2931        if unwind_tx_from <= unwind_tx_to {
2932            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2933                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2934            }
2935        }
2936
2937        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
2938
2939        self.remove_bodies_above(block)?;
2940
2941        Ok(())
2942    }
2943
2944    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
2945        self.storage.writer().remove_block_bodies_above(self, block)?;
2946
2947        // First transaction to be removed
2948        let unwind_tx_from = self
2949            .block_body_indices(block)?
2950            .map(|b| b.next_tx_num())
2951            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2952
2953        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
2954        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
2955
2956        let static_file_tx_num =
2957            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
2958
2959        let to_delete = static_file_tx_num
2960            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
2961            .unwrap_or_default();
2962
2963        self.static_file_provider
2964            .latest_writer(StaticFileSegment::Transactions)?
2965            .prune_transactions(to_delete, block)?;
2966
2967        Ok(())
2968    }
2969
2970    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
2971    fn append_blocks_with_state(
2972        &self,
2973        blocks: Vec<RecoveredBlock<Self::Block>>,
2974        execution_outcome: &ExecutionOutcome<Self::Receipt>,
2975        hashed_state: HashedPostStateSorted,
2976    ) -> ProviderResult<()> {
2977        if blocks.is_empty() {
2978            debug!(target: "providers::db", "Attempted to append empty block range");
2979            return Ok(())
2980        }
2981
2982        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
2983        // `None`.
2984        let first_number = blocks[0].number();
2985
2986        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
2987        // `None`.
2988        let last_block_number = blocks[blocks.len() - 1].number();
2989
2990        let mut durations_recorder = metrics::DurationsRecorder::default();
2991
2992        // Insert the blocks
2993        for block in blocks {
2994            self.insert_block(block)?;
2995            durations_recorder.record_relative(metrics::Action::InsertBlock);
2996        }
2997
2998        self.write_state(execution_outcome, OriginalValuesKnown::No)?;
2999        durations_recorder.record_relative(metrics::Action::InsertState);
3000
3001        // insert hashes and intermediate merkle nodes
3002        self.write_hashed_state(&hashed_state)?;
3003        durations_recorder.record_relative(metrics::Action::InsertHashes);
3004
3005        self.update_history_indices(first_number..=last_block_number)?;
3006        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3007
3008        // Update pipeline progress
3009        self.update_pipeline_stages(last_block_number, false)?;
3010        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3011
3012        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3013
3014        Ok(())
3015    }
3016}
3017
3018impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3019    fn get_prune_checkpoint(
3020        &self,
3021        segment: PruneSegment,
3022    ) -> ProviderResult<Option<PruneCheckpoint>> {
3023        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3024    }
3025
3026    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3027        Ok(PRUNE_SEGMENTS
3028            .iter()
3029            .filter_map(|segment| {
3030                self.tx
3031                    .get::<tables::PruneCheckpoints>(*segment)
3032                    .transpose()
3033                    .map(|chk| chk.map(|chk| (*segment, chk)))
3034            })
3035            .collect::<Result<_, _>>()?)
3036    }
3037}
3038
3039impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3040    fn save_prune_checkpoint(
3041        &self,
3042        segment: PruneSegment,
3043        checkpoint: PruneCheckpoint,
3044    ) -> ProviderResult<()> {
3045        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3046    }
3047}
3048
3049impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3050    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3051        let db_entries = self.tx.entries::<T>()?;
3052        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3053            Ok(entries) => entries,
3054            Err(ProviderError::UnsupportedProvider) => 0,
3055            Err(err) => return Err(err),
3056        };
3057
3058        Ok(db_entries + static_file_entries)
3059    }
3060}
3061
3062impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3063    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3064        let mut finalized_blocks = self
3065            .tx
3066            .cursor_read::<tables::ChainState>()?
3067            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3068            .take(1)
3069            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3070
3071        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3072        Ok(last_finalized_block_number)
3073    }
3074
3075    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3076        let mut finalized_blocks = self
3077            .tx
3078            .cursor_read::<tables::ChainState>()?
3079            .walk(Some(tables::ChainStateKey::LastSafeBlock))?
3080            .take(1)
3081            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3082
3083        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3084        Ok(last_finalized_block_number)
3085    }
3086}
3087
3088impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3089    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3090        Ok(self
3091            .tx
3092            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3093    }
3094
3095    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3096        Ok(self.tx.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlock, block_number)?)
3097    }
3098}
3099
3100impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3101    type Tx = TX;
3102
3103    fn tx_ref(&self) -> &Self::Tx {
3104        &self.tx
3105    }
3106
3107    fn tx_mut(&mut self) -> &mut Self::Tx {
3108        &mut self.tx
3109    }
3110
3111    fn into_tx(self) -> Self::Tx {
3112        self.tx
3113    }
3114
3115    fn prune_modes_ref(&self) -> &PruneModes {
3116        self.prune_modes_ref()
3117    }
3118
3119    /// Commit database transaction and static files.
3120    fn commit(self) -> ProviderResult<bool> {
3121        // For unwinding it makes more sense to commit the database first, since if
3122        // it is interrupted before the static files commit, we can just
3123        // truncate the static files according to the
3124        // checkpoints on the next start-up.
3125        if self.static_file_provider.has_unwind_queued() {
3126            self.tx.commit()?;
3127            self.static_file_provider.commit()?;
3128        } else {
3129            self.static_file_provider.commit()?;
3130            self.tx.commit()?;
3131        }
3132
3133        Ok(true)
3134    }
3135}
3136
3137#[cfg(test)]
3138mod tests {
3139    use super::*;
3140    use crate::{
3141        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3142        BlockWriter,
3143    };
3144    use reth_testing_utils::generators::{self, random_block, BlockParams};
3145
3146    #[test]
3147    fn test_receipts_by_block_range_empty_range() {
3148        let factory = create_test_provider_factory();
3149        let provider = factory.provider().unwrap();
3150
3151        // empty range should return empty vec
3152        let start = 10u64;
3153        let end = 9u64;
3154        let result = provider.receipts_by_block_range(start..=end).unwrap();
3155        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3156    }
3157
3158    #[test]
3159    fn test_receipts_by_block_range_nonexistent_blocks() {
3160        let factory = create_test_provider_factory();
3161        let provider = factory.provider().unwrap();
3162
3163        // non-existent blocks should return empty vecs for each block
3164        let result = provider.receipts_by_block_range(10..=12).unwrap();
3165        assert_eq!(result, vec![vec![], vec![], vec![]]);
3166    }
3167
3168    #[test]
3169    fn test_receipts_by_block_range_single_block() {
3170        let factory = create_test_provider_factory();
3171        let data = BlockchainTestData::default();
3172
3173        let provider_rw = factory.provider_rw().unwrap();
3174        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3175        provider_rw
3176            .write_state(
3177                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3178                crate::OriginalValuesKnown::No,
3179            )
3180            .unwrap();
3181        provider_rw.insert_block(data.blocks[0].0.clone()).unwrap();
3182        provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
3183        provider_rw.commit().unwrap();
3184
3185        let provider = factory.provider().unwrap();
3186        let result = provider.receipts_by_block_range(1..=1).unwrap();
3187
3188        // should have one vec with one receipt
3189        assert_eq!(result.len(), 1);
3190        assert_eq!(result[0].len(), 1);
3191        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3192    }
3193
3194    #[test]
3195    fn test_receipts_by_block_range_multiple_blocks() {
3196        let factory = create_test_provider_factory();
3197        let data = BlockchainTestData::default();
3198
3199        let provider_rw = factory.provider_rw().unwrap();
3200        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3201        provider_rw
3202            .write_state(
3203                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3204                crate::OriginalValuesKnown::No,
3205            )
3206            .unwrap();
3207        for i in 0..3 {
3208            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3209            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3210        }
3211        provider_rw.commit().unwrap();
3212
3213        let provider = factory.provider().unwrap();
3214        let result = provider.receipts_by_block_range(1..=3).unwrap();
3215
3216        // should have 3 vecs, each with one receipt
3217        assert_eq!(result.len(), 3);
3218        for (i, block_receipts) in result.iter().enumerate() {
3219            assert_eq!(block_receipts.len(), 1);
3220            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3221        }
3222    }
3223
3224    #[test]
3225    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3226        let factory = create_test_provider_factory();
3227        let data = BlockchainTestData::default();
3228
3229        let provider_rw = factory.provider_rw().unwrap();
3230        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3231        provider_rw
3232            .write_state(
3233                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3234                crate::OriginalValuesKnown::No,
3235            )
3236            .unwrap();
3237
3238        // insert blocks 1-3 with receipts
3239        for i in 0..3 {
3240            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3241            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3242        }
3243        provider_rw.commit().unwrap();
3244
3245        let provider = factory.provider().unwrap();
3246        let result = provider.receipts_by_block_range(1..=3).unwrap();
3247
3248        // verify each block has one receipt
3249        assert_eq!(result.len(), 3);
3250        for block_receipts in &result {
3251            assert_eq!(block_receipts.len(), 1);
3252        }
3253    }
3254
3255    #[test]
3256    fn test_receipts_by_block_range_partial_range() {
3257        let factory = create_test_provider_factory();
3258        let data = BlockchainTestData::default();
3259
3260        let provider_rw = factory.provider_rw().unwrap();
3261        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3262        provider_rw
3263            .write_state(
3264                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3265                crate::OriginalValuesKnown::No,
3266            )
3267            .unwrap();
3268        for i in 0..3 {
3269            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3270            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3271        }
3272        provider_rw.commit().unwrap();
3273
3274        let provider = factory.provider().unwrap();
3275
3276        // request range that includes both existing and non-existing blocks
3277        let result = provider.receipts_by_block_range(2..=5).unwrap();
3278        assert_eq!(result.len(), 4);
3279
3280        // blocks 2-3 should have receipts, blocks 4-5 should be empty
3281        assert_eq!(result[0].len(), 1); // block 2
3282        assert_eq!(result[1].len(), 1); // block 3
3283        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
3284        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
3285
3286        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3287        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3288    }
3289
3290    #[test]
3291    fn test_receipts_by_block_range_all_empty_blocks() {
3292        let factory = create_test_provider_factory();
3293        let mut rng = generators::rng();
3294
3295        // create blocks with no transactions
3296        let mut blocks = Vec::new();
3297        for i in 0..3 {
3298            let block =
3299                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3300            blocks.push(block);
3301        }
3302
3303        let provider_rw = factory.provider_rw().unwrap();
3304        for block in blocks {
3305            provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
3306        }
3307        provider_rw.commit().unwrap();
3308
3309        let provider = factory.provider().unwrap();
3310        let result = provider.receipts_by_block_range(1..=3).unwrap();
3311
3312        assert_eq!(result.len(), 3);
3313        for block_receipts in result {
3314            assert_eq!(block_receipts.len(), 0);
3315        }
3316    }
3317
3318    #[test]
3319    fn test_receipts_by_block_range_consistency_with_individual_calls() {
3320        let factory = create_test_provider_factory();
3321        let data = BlockchainTestData::default();
3322
3323        let provider_rw = factory.provider_rw().unwrap();
3324        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3325        provider_rw
3326            .write_state(
3327                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3328                crate::OriginalValuesKnown::No,
3329            )
3330            .unwrap();
3331        for i in 0..3 {
3332            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3333            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3334        }
3335        provider_rw.commit().unwrap();
3336
3337        let provider = factory.provider().unwrap();
3338
3339        // get receipts using block range method
3340        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3341
3342        // get receipts using individual block calls
3343        let mut individual_results = Vec::new();
3344        for block_num in 1..=3 {
3345            let receipts =
3346                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3347            individual_results.push(receipts);
3348        }
3349
3350        assert_eq!(range_result, individual_results);
3351    }
3352
3353    #[test]
3354    fn test_write_trie_changesets() {
3355        use reth_db_api::models::BlockNumberHashedAddress;
3356        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
3357
3358        let factory = create_test_provider_factory();
3359        let provider_rw = factory.provider_rw().unwrap();
3360
3361        let block_number = 1u64;
3362
3363        // Create some test nibbles and nodes
3364        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3365        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3366
3367        let node1 = BranchNodeCompact::new(
3368            0b1111_1111_1111_1111, // state_mask
3369            0b0000_0000_0000_0000, // tree_mask
3370            0b0000_0000_0000_0000, // hash_mask
3371            vec![],                // hashes
3372            None,                  // root hash
3373        );
3374
3375        // Pre-populate AccountsTrie with a node that will be updated (for account_nibbles1)
3376        {
3377            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
3378            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
3379        }
3380
3381        // Create account trie updates: one Some (update) and one None (removal)
3382        let account_nodes = vec![
3383            (account_nibbles1, Some(node1.clone())), // This will update existing node
3384            (account_nibbles2, None),                // This will be a removal (no existing node)
3385        ];
3386
3387        // Create storage trie updates
3388        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3389        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3390
3391        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3392        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3393        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3394
3395        let storage_node1 = BranchNodeCompact::new(
3396            0b1111_0000_0000_0000,
3397            0b0000_0000_0000_0000,
3398            0b0000_0000_0000_0000,
3399            vec![],
3400            None,
3401        );
3402
3403        let storage_node2 = BranchNodeCompact::new(
3404            0b0000_1111_0000_0000,
3405            0b0000_0000_0000_0000,
3406            0b0000_0000_0000_0000,
3407            vec![],
3408            None,
3409        );
3410
3411        // Create an old version of storage_node1 to prepopulate
3412        let storage_node1_old = BranchNodeCompact::new(
3413            0b1010_0000_0000_0000, // Different mask to show it's an old value
3414            0b0000_0000_0000_0000,
3415            0b0000_0000_0000_0000,
3416            vec![],
3417            None,
3418        );
3419
3420        // Pre-populate StoragesTrie for normal storage (storage_address1)
3421        {
3422            let mut cursor =
3423                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3424            // Add node that will be updated (storage_nibbles1) with old value
3425            let entry = StorageTrieEntry {
3426                nibbles: StoredNibblesSubKey(storage_nibbles1),
3427                node: storage_node1_old.clone(),
3428            };
3429            cursor.upsert(storage_address1, &entry).unwrap();
3430        }
3431
3432        // Pre-populate StoragesTrie for wiped storage (storage_address2)
3433        {
3434            let mut cursor =
3435                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
3436            // Add node that will be updated (storage_nibbles1)
3437            let entry1 = StorageTrieEntry {
3438                nibbles: StoredNibblesSubKey(storage_nibbles1),
3439                node: storage_node1.clone(),
3440            };
3441            cursor.upsert(storage_address2, &entry1).unwrap();
3442            // Add node that won't be updated but exists (storage_nibbles3)
3443            let entry3 = StorageTrieEntry {
3444                nibbles: StoredNibblesSubKey(storage_nibbles3),
3445                node: storage_node2.clone(),
3446            };
3447            cursor.upsert(storage_address2, &entry3).unwrap();
3448        }
3449
3450        // Normal storage trie: one Some (update) and one None (new)
3451        let storage_trie1 = StorageTrieUpdatesSorted {
3452            is_deleted: false,
3453            storage_nodes: vec![
3454                (storage_nibbles1, Some(storage_node1.clone())), // This will update existing node
3455                (storage_nibbles2, None),                        // This is a new node
3456            ],
3457        };
3458
3459        // Wiped storage trie
3460        let storage_trie2 = StorageTrieUpdatesSorted {
3461            is_deleted: true,
3462            storage_nodes: vec![
3463                (storage_nibbles1, Some(storage_node1.clone())), // Updated node already in db
3464                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in db
3465                                                                  * storage_nibbles3 is in db
3466                                                                  * but not updated */
3467            ],
3468        };
3469
3470        let mut storage_tries = B256Map::default();
3471        storage_tries.insert(storage_address1, storage_trie1);
3472        storage_tries.insert(storage_address2, storage_trie2);
3473
3474        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3475
3476        // Write the changesets
3477        let num_written =
3478            provider_rw.write_trie_changesets(block_number, &trie_updates, None).unwrap();
3479
3480        // Verify number of entries written
3481        // Account changesets: 2 (one update, one removal)
3482        // Storage changesets:
3483        //   - Normal storage: 2 (one update, one removal)
3484        //   - Wiped storage: 3 (two updated, one existing not updated)
3485        // Total: 2 + 2 + 3 = 7
3486        assert_eq!(num_written, 7);
3487
3488        // Verify account changesets were written correctly
3489        {
3490            let mut cursor =
3491                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3492
3493            // Get all entries for this block to see what was written
3494            let all_entries = cursor
3495                .walk_dup(Some(block_number), None)
3496                .unwrap()
3497                .collect::<Result<Vec<_>, _>>()
3498                .unwrap();
3499
3500            // Assert the full value of all_entries in a single assert_eq
3501            assert_eq!(
3502                all_entries,
3503                vec![
3504                    (
3505                        block_number,
3506                        TrieChangeSetsEntry {
3507                            nibbles: StoredNibblesSubKey(account_nibbles1),
3508                            node: Some(node1),
3509                        }
3510                    ),
3511                    (
3512                        block_number,
3513                        TrieChangeSetsEntry {
3514                            nibbles: StoredNibblesSubKey(account_nibbles2),
3515                            node: None,
3516                        }
3517                    ),
3518                ]
3519            );
3520        }
3521
3522        // Verify storage changesets were written correctly
3523        {
3524            let mut cursor =
3525                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3526
3527            // Check normal storage trie changesets
3528            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3529            let entries1 =
3530                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3531
3532            assert_eq!(
3533                entries1,
3534                vec![
3535                    (
3536                        key1,
3537                        TrieChangeSetsEntry {
3538                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3539                            node: Some(storage_node1_old), // Old value that was prepopulated
3540                        }
3541                    ),
3542                    (
3543                        key1,
3544                        TrieChangeSetsEntry {
3545                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3546                            node: None, // New node, no previous value
3547                        }
3548                    ),
3549                ]
3550            );
3551
3552            // Check wiped storage trie changesets
3553            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3554            let entries2 =
3555                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3556
3557            assert_eq!(
3558                entries2,
3559                vec![
3560                    (
3561                        key2,
3562                        TrieChangeSetsEntry {
3563                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3564                            node: Some(storage_node1), // Was in db, so has old value
3565                        }
3566                    ),
3567                    (
3568                        key2,
3569                        TrieChangeSetsEntry {
3570                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3571                            node: None, // Was not in db
3572                        }
3573                    ),
3574                    (
3575                        key2,
3576                        TrieChangeSetsEntry {
3577                            nibbles: StoredNibblesSubKey(storage_nibbles3),
3578                            node: Some(storage_node2), // Existing node in wiped storage
3579                        }
3580                    ),
3581                ]
3582            );
3583        }
3584
3585        provider_rw.commit().unwrap();
3586    }
3587
3588    #[test]
3589    fn test_write_trie_changesets_with_overlay() {
3590        use reth_db_api::models::BlockNumberHashedAddress;
3591        use reth_trie::BranchNodeCompact;
3592
3593        let factory = create_test_provider_factory();
3594        let provider_rw = factory.provider_rw().unwrap();
3595
3596        let block_number = 1u64;
3597
3598        // Create some test nibbles and nodes
3599        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
3600        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
3601
3602        let node1 = BranchNodeCompact::new(
3603            0b1111_1111_1111_1111, // state_mask
3604            0b0000_0000_0000_0000, // tree_mask
3605            0b0000_0000_0000_0000, // hash_mask
3606            vec![],                // hashes
3607            None,                  // root hash
3608        );
3609
3610        // NOTE: Unlike the previous test, we're NOT pre-populating the database
3611        // All node values will come from the overlay
3612
3613        // Create the overlay with existing values that would normally be in the DB
3614        let node1_old = BranchNodeCompact::new(
3615            0b1010_1010_1010_1010, // Different mask to show it's the overlay "existing" value
3616            0b0000_0000_0000_0000,
3617            0b0000_0000_0000_0000,
3618            vec![],
3619            None,
3620        );
3621
3622        // Create overlay account nodes
3623        let overlay_account_nodes = vec![
3624            (account_nibbles1, Some(node1_old.clone())), // This simulates existing node in overlay
3625        ];
3626
3627        // Create account trie updates: one Some (update) and one None (removal)
3628        let account_nodes = vec![
3629            (account_nibbles1, Some(node1)), // This will update overlay node
3630            (account_nibbles2, None),        // This will be a removal (no existing node)
3631        ];
3632
3633        // Create storage trie updates
3634        let storage_address1 = B256::from([1u8; 32]); // Normal storage trie
3635        let storage_address2 = B256::from([2u8; 32]); // Wiped storage trie
3636
3637        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
3638        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
3639        let storage_nibbles3 = Nibbles::from_nibbles([0xe, 0xf]);
3640
3641        let storage_node1 = BranchNodeCompact::new(
3642            0b1111_0000_0000_0000,
3643            0b0000_0000_0000_0000,
3644            0b0000_0000_0000_0000,
3645            vec![],
3646            None,
3647        );
3648
3649        let storage_node2 = BranchNodeCompact::new(
3650            0b0000_1111_0000_0000,
3651            0b0000_0000_0000_0000,
3652            0b0000_0000_0000_0000,
3653            vec![],
3654            None,
3655        );
3656
3657        // Create old versions for overlay
3658        let storage_node1_old = BranchNodeCompact::new(
3659            0b1010_0000_0000_0000, // Different mask to show it's an old value
3660            0b0000_0000_0000_0000,
3661            0b0000_0000_0000_0000,
3662            vec![],
3663            None,
3664        );
3665
3666        // Create overlay storage nodes
3667        let mut overlay_storage_tries = B256Map::default();
3668
3669        // Overlay for normal storage (storage_address1)
3670        let overlay_storage_trie1 = StorageTrieUpdatesSorted {
3671            is_deleted: false,
3672            storage_nodes: vec![
3673                (storage_nibbles1, Some(storage_node1_old.clone())), /* Simulates existing in
3674                                                                      * overlay */
3675            ],
3676        };
3677
3678        // Overlay for wiped storage (storage_address2)
3679        let overlay_storage_trie2 = StorageTrieUpdatesSorted {
3680            is_deleted: false,
3681            storage_nodes: vec![
3682                (storage_nibbles1, Some(storage_node1.clone())), // Existing in overlay
3683                (storage_nibbles3, Some(storage_node2.clone())), // Also existing in overlay
3684            ],
3685        };
3686
3687        overlay_storage_tries.insert(storage_address1, overlay_storage_trie1);
3688        overlay_storage_tries.insert(storage_address2, overlay_storage_trie2);
3689
3690        let overlay = TrieUpdatesSorted::new(overlay_account_nodes, overlay_storage_tries);
3691
3692        // Normal storage trie: one Some (update) and one None (new)
3693        let storage_trie1 = StorageTrieUpdatesSorted {
3694            is_deleted: false,
3695            storage_nodes: vec![
3696                (storage_nibbles1, Some(storage_node1.clone())), // This will update overlay node
3697                (storage_nibbles2, None),                        // This is a new node
3698            ],
3699        };
3700
3701        // Wiped storage trie
3702        let storage_trie2 = StorageTrieUpdatesSorted {
3703            is_deleted: true,
3704            storage_nodes: vec![
3705                (storage_nibbles1, Some(storage_node1.clone())), // Updated node from overlay
3706                (storage_nibbles2, Some(storage_node2.clone())), /* Updated node not in overlay
3707                                                                  * storage_nibbles3 is in
3708                                                                  * overlay
3709                                                                  * but not updated */
3710            ],
3711        };
3712
3713        let mut storage_tries = B256Map::default();
3714        storage_tries.insert(storage_address1, storage_trie1);
3715        storage_tries.insert(storage_address2, storage_trie2);
3716
3717        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
3718
3719        // Write the changesets WITH OVERLAY
3720        let num_written =
3721            provider_rw.write_trie_changesets(block_number, &trie_updates, Some(&overlay)).unwrap();
3722
3723        // Verify number of entries written
3724        // Account changesets: 2 (one update from overlay, one removal)
3725        // Storage changesets:
3726        //   - Normal storage: 2 (one update from overlay, one new)
3727        //   - Wiped storage: 3 (two updated, one existing from overlay not updated)
3728        // Total: 2 + 2 + 3 = 7
3729        assert_eq!(num_written, 7);
3730
3731        // Verify account changesets were written correctly
3732        {
3733            let mut cursor =
3734                provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
3735
3736            // Get all entries for this block to see what was written
3737            let all_entries = cursor
3738                .walk_dup(Some(block_number), None)
3739                .unwrap()
3740                .collect::<Result<Vec<_>, _>>()
3741                .unwrap();
3742
3743            // Assert the full value of all_entries in a single assert_eq
3744            assert_eq!(
3745                all_entries,
3746                vec![
3747                    (
3748                        block_number,
3749                        TrieChangeSetsEntry {
3750                            nibbles: StoredNibblesSubKey(account_nibbles1),
3751                            node: Some(node1_old), // Value from overlay, not DB
3752                        }
3753                    ),
3754                    (
3755                        block_number,
3756                        TrieChangeSetsEntry {
3757                            nibbles: StoredNibblesSubKey(account_nibbles2),
3758                            node: None,
3759                        }
3760                    ),
3761                ]
3762            );
3763        }
3764
3765        // Verify storage changesets were written correctly
3766        {
3767            let mut cursor =
3768                provider_rw.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
3769
3770            // Check normal storage trie changesets
3771            let key1 = BlockNumberHashedAddress((block_number, storage_address1));
3772            let entries1 =
3773                cursor.walk_dup(Some(key1), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3774
3775            assert_eq!(
3776                entries1,
3777                vec![
3778                    (
3779                        key1,
3780                        TrieChangeSetsEntry {
3781                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3782                            node: Some(storage_node1_old), // Old value from overlay
3783                        }
3784                    ),
3785                    (
3786                        key1,
3787                        TrieChangeSetsEntry {
3788                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3789                            node: None, // New node, no previous value
3790                        }
3791                    ),
3792                ]
3793            );
3794
3795            // Check wiped storage trie changesets
3796            let key2 = BlockNumberHashedAddress((block_number, storage_address2));
3797            let entries2 =
3798                cursor.walk_dup(Some(key2), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
3799
3800            assert_eq!(
3801                entries2,
3802                vec![
3803                    (
3804                        key2,
3805                        TrieChangeSetsEntry {
3806                            nibbles: StoredNibblesSubKey(storage_nibbles1),
3807                            node: Some(storage_node1), // Value from overlay
3808                        }
3809                    ),
3810                    (
3811                        key2,
3812                        TrieChangeSetsEntry {
3813                            nibbles: StoredNibblesSubKey(storage_nibbles2),
3814                            node: None, // Was not in overlay
3815                        }
3816                    ),
3817                    (
3818                        key2,
3819                        TrieChangeSetsEntry {
3820                            nibbles: StoredNibblesSubKey(storage_nibbles3),
3821                            node: Some(storage_node2), /* Existing node from overlay in wiped
3822                                                        * storage */
3823                        }
3824                    ),
3825                ]
3826            );
3827        }
3828
3829        provider_rw.commit().unwrap();
3830    }
3831
3832    #[test]
3833    fn test_clear_trie_changesets_from() {
3834        use alloy_primitives::hex_literal::hex;
3835        use reth_db_api::models::BlockNumberHashedAddress;
3836        use reth_trie::{BranchNodeCompact, StoredNibblesSubKey, TrieChangeSetsEntry};
3837
3838        let factory = create_test_provider_factory();
3839
3840        // Create some test data for different block numbers
3841        let block1 = 100u64;
3842        let block2 = 101u64;
3843        let block3 = 102u64;
3844        let block4 = 103u64;
3845        let block5 = 104u64;
3846
3847        // Create test addresses for storage changesets
3848        let storage_address1 =
3849            B256::from(hex!("1111111111111111111111111111111111111111111111111111111111111111"));
3850        let storage_address2 =
3851            B256::from(hex!("2222222222222222222222222222222222222222222222222222222222222222"));
3852
3853        // Create test nibbles
3854        let nibbles1 = StoredNibblesSubKey(Nibbles::from_nibbles([0x1, 0x2, 0x3]));
3855        let nibbles2 = StoredNibblesSubKey(Nibbles::from_nibbles([0x4, 0x5, 0x6]));
3856        let nibbles3 = StoredNibblesSubKey(Nibbles::from_nibbles([0x7, 0x8, 0x9]));
3857
3858        // Create test nodes
3859        let node1 = BranchNodeCompact::new(
3860            0b1111_1111_1111_1111,
3861            0b1111_1111_1111_1111,
3862            0b0000_0000_0000_0001,
3863            vec![B256::from(hex!(
3864                "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
3865            ))],
3866            None,
3867        );
3868        let node2 = BranchNodeCompact::new(
3869            0b1111_1111_1111_1110,
3870            0b1111_1111_1111_1110,
3871            0b0000_0000_0000_0010,
3872            vec![B256::from(hex!(
3873                "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
3874            ))],
3875            Some(B256::from(hex!(
3876                "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
3877            ))),
3878        );
3879
3880        // Populate AccountsTrieChangeSets with data across multiple blocks
3881        {
3882            let provider_rw = factory.provider_rw().unwrap();
3883            let mut cursor =
3884                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
3885
3886            // Block 100: 2 entries (will be kept - before start block)
3887            cursor
3888                .upsert(
3889                    block1,
3890                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3891                )
3892                .unwrap();
3893            cursor
3894                .upsert(block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
3895                .unwrap();
3896
3897            // Block 101: 3 entries with duplicates (will be deleted - from this block onwards)
3898            cursor
3899                .upsert(
3900                    block2,
3901                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
3902                )
3903                .unwrap();
3904            cursor
3905                .upsert(
3906                    block2,
3907                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3908                )
3909                .unwrap(); // duplicate key
3910            cursor
3911                .upsert(block2, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3912                .unwrap();
3913
3914            // Block 102: 2 entries (will be deleted - after start block)
3915            cursor
3916                .upsert(
3917                    block3,
3918                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
3919                )
3920                .unwrap();
3921            cursor
3922                .upsert(
3923                    block3,
3924                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node2.clone()) },
3925                )
3926                .unwrap();
3927
3928            // Block 103: 1 entry (will be deleted - after start block)
3929            cursor
3930                .upsert(block4, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
3931                .unwrap();
3932
3933            // Block 104: 2 entries (will be deleted - after start block)
3934            cursor
3935                .upsert(
3936                    block5,
3937                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
3938                )
3939                .unwrap();
3940            cursor
3941                .upsert(block5, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3942                .unwrap();
3943
3944            provider_rw.commit().unwrap();
3945        }
3946
3947        // Populate StoragesTrieChangeSets with data across multiple blocks
3948        {
3949            let provider_rw = factory.provider_rw().unwrap();
3950            let mut cursor =
3951                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
3952
3953            // Block 100, address1: 2 entries (will be kept - before start block)
3954            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
3955            cursor
3956                .upsert(
3957                    key1_block1,
3958                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node1.clone()) },
3959                )
3960                .unwrap();
3961            cursor
3962                .upsert(key1_block1, &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: None })
3963                .unwrap();
3964
3965            // Block 101, address1: 3 entries with duplicates (will be deleted - from this block
3966            // onwards)
3967            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
3968            cursor
3969                .upsert(
3970                    key1_block2,
3971                    &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: Some(node2.clone()) },
3972                )
3973                .unwrap();
3974            cursor
3975                .upsert(key1_block2, &TrieChangeSetsEntry { nibbles: nibbles1.clone(), node: None })
3976                .unwrap(); // duplicate key
3977            cursor
3978                .upsert(
3979                    key1_block2,
3980                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node1.clone()) },
3981                )
3982                .unwrap();
3983
3984            // Block 102, address2: 2 entries (will be deleted - after start block)
3985            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
3986            cursor
3987                .upsert(
3988                    key2_block3,
3989                    &TrieChangeSetsEntry { nibbles: nibbles2.clone(), node: Some(node2.clone()) },
3990                )
3991                .unwrap();
3992            cursor
3993                .upsert(key2_block3, &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: None })
3994                .unwrap();
3995
3996            // Block 103, address1: 2 entries with duplicate (will be deleted - after start block)
3997            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
3998            cursor
3999                .upsert(
4000                    key1_block4,
4001                    &TrieChangeSetsEntry { nibbles: nibbles3.clone(), node: Some(node1) },
4002                )
4003                .unwrap();
4004            cursor
4005                .upsert(
4006                    key1_block4,
4007                    &TrieChangeSetsEntry { nibbles: nibbles3, node: Some(node2.clone()) },
4008                )
4009                .unwrap(); // duplicate key
4010
4011            // Block 104, address2: 2 entries (will be deleted - after start block)
4012            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4013            cursor
4014                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles1, node: None })
4015                .unwrap();
4016            cursor
4017                .upsert(key2_block5, &TrieChangeSetsEntry { nibbles: nibbles2, node: Some(node2) })
4018                .unwrap();
4019
4020            provider_rw.commit().unwrap();
4021        }
4022
4023        // Clear all changesets from block 101 onwards
4024        {
4025            let provider_rw = factory.provider_rw().unwrap();
4026            provider_rw.clear_trie_changesets_from(block2).unwrap();
4027            provider_rw.commit().unwrap();
4028        }
4029
4030        // Verify AccountsTrieChangeSets after clearing
4031        {
4032            let provider = factory.provider().unwrap();
4033            let mut cursor =
4034                provider.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
4035
4036            // Block 100 should still exist (before range)
4037            let block1_entries = cursor
4038                .walk_dup(Some(block1), None)
4039                .unwrap()
4040                .collect::<Result<Vec<_>, _>>()
4041                .unwrap();
4042            assert_eq!(block1_entries.len(), 2, "Block 100 entries should be preserved");
4043            assert_eq!(block1_entries[0].0, block1);
4044            assert_eq!(block1_entries[1].0, block1);
4045
4046            // Blocks 101-104 should be deleted
4047            let block2_entries = cursor
4048                .walk_dup(Some(block2), None)
4049                .unwrap()
4050                .collect::<Result<Vec<_>, _>>()
4051                .unwrap();
4052            assert!(block2_entries.is_empty(), "Block 101 entries should be deleted");
4053
4054            let block3_entries = cursor
4055                .walk_dup(Some(block3), None)
4056                .unwrap()
4057                .collect::<Result<Vec<_>, _>>()
4058                .unwrap();
4059            assert!(block3_entries.is_empty(), "Block 102 entries should be deleted");
4060
4061            let block4_entries = cursor
4062                .walk_dup(Some(block4), None)
4063                .unwrap()
4064                .collect::<Result<Vec<_>, _>>()
4065                .unwrap();
4066            assert!(block4_entries.is_empty(), "Block 103 entries should be deleted");
4067
4068            // Block 104 should also be deleted
4069            let block5_entries = cursor
4070                .walk_dup(Some(block5), None)
4071                .unwrap()
4072                .collect::<Result<Vec<_>, _>>()
4073                .unwrap();
4074            assert!(block5_entries.is_empty(), "Block 104 entries should be deleted");
4075        }
4076
4077        // Verify StoragesTrieChangeSets after clearing
4078        {
4079            let provider = factory.provider().unwrap();
4080            let mut cursor =
4081                provider.tx_ref().cursor_dup_read::<tables::StoragesTrieChangeSets>().unwrap();
4082
4083            // Block 100 entries should still exist (before range)
4084            let key1_block1 = BlockNumberHashedAddress((block1, storage_address1));
4085            let block1_entries = cursor
4086                .walk_dup(Some(key1_block1), None)
4087                .unwrap()
4088                .collect::<Result<Vec<_>, _>>()
4089                .unwrap();
4090            assert_eq!(block1_entries.len(), 2, "Block 100 storage entries should be preserved");
4091
4092            // Blocks 101-104 entries should be deleted
4093            let key1_block2 = BlockNumberHashedAddress((block2, storage_address1));
4094            let block2_entries = cursor
4095                .walk_dup(Some(key1_block2), None)
4096                .unwrap()
4097                .collect::<Result<Vec<_>, _>>()
4098                .unwrap();
4099            assert!(block2_entries.is_empty(), "Block 101 storage entries should be deleted");
4100
4101            let key2_block3 = BlockNumberHashedAddress((block3, storage_address2));
4102            let block3_entries = cursor
4103                .walk_dup(Some(key2_block3), None)
4104                .unwrap()
4105                .collect::<Result<Vec<_>, _>>()
4106                .unwrap();
4107            assert!(block3_entries.is_empty(), "Block 102 storage entries should be deleted");
4108
4109            let key1_block4 = BlockNumberHashedAddress((block4, storage_address1));
4110            let block4_entries = cursor
4111                .walk_dup(Some(key1_block4), None)
4112                .unwrap()
4113                .collect::<Result<Vec<_>, _>>()
4114                .unwrap();
4115            assert!(block4_entries.is_empty(), "Block 103 storage entries should be deleted");
4116
4117            // Block 104 entries should also be deleted
4118            let key2_block5 = BlockNumberHashedAddress((block5, storage_address2));
4119            let block5_entries = cursor
4120                .walk_dup(Some(key2_block5), None)
4121                .unwrap()
4122                .collect::<Result<Vec<_>, _>>()
4123                .unwrap();
4124            assert!(block5_entries.is_empty(), "Block 104 storage entries should be deleted");
4125        }
4126    }
4127
4128    #[test]
4129    fn test_write_trie_updates_sorted() {
4130        use reth_trie::{
4131            updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted},
4132            BranchNodeCompact, StorageTrieEntry,
4133        };
4134
4135        let factory = create_test_provider_factory();
4136        let provider_rw = factory.provider_rw().unwrap();
4137
4138        // Pre-populate account trie with data that will be deleted
4139        {
4140            let tx = provider_rw.tx_ref();
4141            let mut cursor = tx.cursor_write::<tables::AccountsTrie>().unwrap();
4142
4143            // Add account node that will be deleted
4144            let to_delete = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4145            cursor
4146                .upsert(
4147                    to_delete,
4148                    &BranchNodeCompact::new(
4149                        0b1010_1010_1010_1010, // state_mask
4150                        0b0000_0000_0000_0000, // tree_mask
4151                        0b0000_0000_0000_0000, // hash_mask
4152                        vec![],
4153                        None,
4154                    ),
4155                )
4156                .unwrap();
4157
4158            // Add account node that will be updated
4159            let to_update = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4160            cursor
4161                .upsert(
4162                    to_update,
4163                    &BranchNodeCompact::new(
4164                        0b0101_0101_0101_0101, // old state_mask (will be updated)
4165                        0b0000_0000_0000_0000, // tree_mask
4166                        0b0000_0000_0000_0000, // hash_mask
4167                        vec![],
4168                        None,
4169                    ),
4170                )
4171                .unwrap();
4172        }
4173
4174        // Pre-populate storage tries with data
4175        let storage_address1 = B256::from([1u8; 32]);
4176        let storage_address2 = B256::from([2u8; 32]);
4177        {
4178            let tx = provider_rw.tx_ref();
4179            let mut storage_cursor = tx.cursor_dup_write::<tables::StoragesTrie>().unwrap();
4180
4181            // Add storage nodes for address1 (one will be deleted)
4182            storage_cursor
4183                .upsert(
4184                    storage_address1,
4185                    &StorageTrieEntry {
4186                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0x2, 0x0])),
4187                        node: BranchNodeCompact::new(
4188                            0b0011_0011_0011_0011, // will be deleted
4189                            0b0000_0000_0000_0000,
4190                            0b0000_0000_0000_0000,
4191                            vec![],
4192                            None,
4193                        ),
4194                    },
4195                )
4196                .unwrap();
4197
4198            // Add storage nodes for address2 (will be wiped)
4199            storage_cursor
4200                .upsert(
4201                    storage_address2,
4202                    &StorageTrieEntry {
4203                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xa, 0xb])),
4204                        node: BranchNodeCompact::new(
4205                            0b1100_1100_1100_1100, // will be wiped
4206                            0b0000_0000_0000_0000,
4207                            0b0000_0000_0000_0000,
4208                            vec![],
4209                            None,
4210                        ),
4211                    },
4212                )
4213                .unwrap();
4214            storage_cursor
4215                .upsert(
4216                    storage_address2,
4217                    &StorageTrieEntry {
4218                        nibbles: StoredNibblesSubKey(Nibbles::from_nibbles([0xc, 0xd])),
4219                        node: BranchNodeCompact::new(
4220                            0b0011_1100_0011_1100, // will be wiped
4221                            0b0000_0000_0000_0000,
4222                            0b0000_0000_0000_0000,
4223                            vec![],
4224                            None,
4225                        ),
4226                    },
4227                )
4228                .unwrap();
4229        }
4230
4231        // Create sorted account trie updates
4232        let account_nodes = vec![
4233            (
4234                Nibbles::from_nibbles([0x1, 0x2]),
4235                Some(BranchNodeCompact::new(
4236                    0b1111_1111_1111_1111, // state_mask (updated)
4237                    0b0000_0000_0000_0000, // tree_mask
4238                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4239                    vec![],
4240                    None,
4241                )),
4242            ),
4243            (Nibbles::from_nibbles([0x3, 0x4]), None), // Deletion
4244            (
4245                Nibbles::from_nibbles([0x5, 0x6]),
4246                Some(BranchNodeCompact::new(
4247                    0b1111_1111_1111_1111, // state_mask
4248                    0b0000_0000_0000_0000, // tree_mask
4249                    0b0000_0000_0000_0000, // hash_mask (no hashes)
4250                    vec![],
4251                    None,
4252                )),
4253            ),
4254        ];
4255
4256        // Create sorted storage trie updates
4257        let storage_trie1 = StorageTrieUpdatesSorted {
4258            is_deleted: false,
4259            storage_nodes: vec![
4260                (
4261                    Nibbles::from_nibbles([0x1, 0x0]),
4262                    Some(BranchNodeCompact::new(
4263                        0b1111_0000_0000_0000, // state_mask
4264                        0b0000_0000_0000_0000, // tree_mask
4265                        0b0000_0000_0000_0000, // hash_mask (no hashes)
4266                        vec![],
4267                        None,
4268                    )),
4269                ),
4270                (Nibbles::from_nibbles([0x2, 0x0]), None), // Deletion of existing node
4271            ],
4272        };
4273
4274        let storage_trie2 = StorageTrieUpdatesSorted {
4275            is_deleted: true, // Wipe all storage for this address
4276            storage_nodes: vec![],
4277        };
4278
4279        let mut storage_tries = B256Map::default();
4280        storage_tries.insert(storage_address1, storage_trie1);
4281        storage_tries.insert(storage_address2, storage_trie2);
4282
4283        let trie_updates = TrieUpdatesSorted::new(account_nodes, storage_tries);
4284
4285        // Write the sorted trie updates
4286        let num_entries = provider_rw.write_trie_updates_sorted(&trie_updates).unwrap();
4287
4288        // We should have 2 account insertions + 1 account deletion + 1 storage insertion + 1
4289        // storage deletion = 5
4290        assert_eq!(num_entries, 5);
4291
4292        // Verify account trie updates were written correctly
4293        let tx = provider_rw.tx_ref();
4294        let mut cursor = tx.cursor_read::<tables::AccountsTrie>().unwrap();
4295
4296        // Check first account node was updated
4297        let nibbles1 = StoredNibbles(Nibbles::from_nibbles([0x1, 0x2]));
4298        let entry1 = cursor.seek_exact(nibbles1).unwrap();
4299        assert!(entry1.is_some(), "Updated account node should exist");
4300        let expected_mask = reth_trie::TrieMask::new(0b1111_1111_1111_1111);
4301        assert_eq!(
4302            entry1.unwrap().1.state_mask,
4303            expected_mask,
4304            "Account node should have updated state_mask"
4305        );
4306
4307        // Check deleted account node no longer exists
4308        let nibbles2 = StoredNibbles(Nibbles::from_nibbles([0x3, 0x4]));
4309        let entry2 = cursor.seek_exact(nibbles2).unwrap();
4310        assert!(entry2.is_none(), "Deleted account node should not exist");
4311
4312        // Check new account node exists
4313        let nibbles3 = StoredNibbles(Nibbles::from_nibbles([0x5, 0x6]));
4314        let entry3 = cursor.seek_exact(nibbles3).unwrap();
4315        assert!(entry3.is_some(), "New account node should exist");
4316
4317        // Verify storage trie updates were written correctly
4318        let mut storage_cursor = tx.cursor_dup_read::<tables::StoragesTrie>().unwrap();
4319
4320        // Check storage for address1
4321        let storage_entries1: Vec<_> = storage_cursor
4322            .walk_dup(Some(storage_address1), None)
4323            .unwrap()
4324            .collect::<Result<Vec<_>, _>>()
4325            .unwrap();
4326        assert_eq!(
4327            storage_entries1.len(),
4328            1,
4329            "Storage address1 should have 1 entry after deletion"
4330        );
4331        assert_eq!(
4332            storage_entries1[0].1.nibbles.0,
4333            Nibbles::from_nibbles([0x1, 0x0]),
4334            "Remaining entry should be [0x1, 0x0]"
4335        );
4336
4337        // Check storage for address2 was wiped
4338        let storage_entries2: Vec<_> = storage_cursor
4339            .walk_dup(Some(storage_address2), None)
4340            .unwrap()
4341            .collect::<Result<Vec<_>, _>>()
4342            .unwrap();
4343        assert_eq!(storage_entries2.len(), 0, "Storage address2 should be empty after wipe");
4344
4345        provider_rw.commit().unwrap();
4346    }
4347
4348    #[test]
4349    fn test_get_block_trie_updates() {
4350        use reth_db_api::models::BlockNumberHashedAddress;
4351        use reth_trie::{BranchNodeCompact, StorageTrieEntry};
4352
4353        let factory = create_test_provider_factory();
4354        let provider_rw = factory.provider_rw().unwrap();
4355
4356        let target_block = 2u64;
4357        let next_block = 3u64;
4358
4359        // Create test nibbles and nodes for accounts
4360        let account_nibbles1 = Nibbles::from_nibbles([0x1, 0x2, 0x3, 0x4]);
4361        let account_nibbles2 = Nibbles::from_nibbles([0x5, 0x6, 0x7, 0x8]);
4362        let account_nibbles3 = Nibbles::from_nibbles([0x9, 0xa, 0xb, 0xc]);
4363
4364        let node1 = BranchNodeCompact::new(
4365            0b1111_1111_0000_0000,
4366            0b0000_0000_0000_0000,
4367            0b0000_0000_0000_0000,
4368            vec![],
4369            None,
4370        );
4371
4372        let node2 = BranchNodeCompact::new(
4373            0b0000_0000_1111_1111,
4374            0b0000_0000_0000_0000,
4375            0b0000_0000_0000_0000,
4376            vec![],
4377            None,
4378        );
4379
4380        let node3 = BranchNodeCompact::new(
4381            0b1010_1010_1010_1010,
4382            0b0000_0000_0000_0000,
4383            0b0000_0000_0000_0000,
4384            vec![],
4385            None,
4386        );
4387
4388        // Pre-populate AccountsTrie with nodes that will be the final state
4389        {
4390            let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
4391            cursor.insert(StoredNibbles(account_nibbles1), &node1).unwrap();
4392            cursor.insert(StoredNibbles(account_nibbles2), &node2).unwrap();
4393            // account_nibbles3 will be deleted (not in final state)
4394        }
4395
4396        // Insert trie changesets for target_block
4397        {
4398            let mut cursor =
4399                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4400            // nibbles1 was updated in target_block (old value stored)
4401            cursor
4402                .append_dup(
4403                    target_block,
4404                    TrieChangeSetsEntry {
4405                        nibbles: StoredNibblesSubKey(account_nibbles1),
4406                        node: Some(BranchNodeCompact::new(
4407                            0b1111_0000_0000_0000, // old value
4408                            0b0000_0000_0000_0000,
4409                            0b0000_0000_0000_0000,
4410                            vec![],
4411                            None,
4412                        )),
4413                    },
4414                )
4415                .unwrap();
4416            // nibbles2 was created in target_block (no old value)
4417            cursor
4418                .append_dup(
4419                    target_block,
4420                    TrieChangeSetsEntry {
4421                        nibbles: StoredNibblesSubKey(account_nibbles2),
4422                        node: None,
4423                    },
4424                )
4425                .unwrap();
4426        }
4427
4428        // Insert trie changesets for next_block (to test overlay)
4429        {
4430            let mut cursor =
4431                provider_rw.tx_ref().cursor_dup_write::<tables::AccountsTrieChangeSets>().unwrap();
4432            // nibbles3 was deleted in next_block (old value stored)
4433            cursor
4434                .append_dup(
4435                    next_block,
4436                    TrieChangeSetsEntry {
4437                        nibbles: StoredNibblesSubKey(account_nibbles3),
4438                        node: Some(node3),
4439                    },
4440                )
4441                .unwrap();
4442        }
4443
4444        // Storage trie updates
4445        let storage_address1 = B256::from([1u8; 32]);
4446        let storage_nibbles1 = Nibbles::from_nibbles([0xa, 0xb]);
4447        let storage_nibbles2 = Nibbles::from_nibbles([0xc, 0xd]);
4448
4449        let storage_node1 = BranchNodeCompact::new(
4450            0b1111_1111_1111_0000,
4451            0b0000_0000_0000_0000,
4452            0b0000_0000_0000_0000,
4453            vec![],
4454            None,
4455        );
4456
4457        let storage_node2 = BranchNodeCompact::new(
4458            0b0101_0101_0101_0101,
4459            0b0000_0000_0000_0000,
4460            0b0000_0000_0000_0000,
4461            vec![],
4462            None,
4463        );
4464
4465        // Pre-populate StoragesTrie with final state
4466        {
4467            let mut cursor =
4468                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrie>().unwrap();
4469            cursor
4470                .upsert(
4471                    storage_address1,
4472                    &StorageTrieEntry {
4473                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4474                        node: storage_node1.clone(),
4475                    },
4476                )
4477                .unwrap();
4478            // storage_nibbles2 was deleted in next_block, so it's not in final state
4479        }
4480
4481        // Insert storage trie changesets for target_block
4482        {
4483            let mut cursor =
4484                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4485            let key = BlockNumberHashedAddress((target_block, storage_address1));
4486
4487            // storage_nibbles1 was updated
4488            cursor
4489                .append_dup(
4490                    key,
4491                    TrieChangeSetsEntry {
4492                        nibbles: StoredNibblesSubKey(storage_nibbles1),
4493                        node: Some(BranchNodeCompact::new(
4494                            0b0000_0000_1111_1111, // old value
4495                            0b0000_0000_0000_0000,
4496                            0b0000_0000_0000_0000,
4497                            vec![],
4498                            None,
4499                        )),
4500                    },
4501                )
4502                .unwrap();
4503
4504            // storage_nibbles2 was created
4505            cursor
4506                .append_dup(
4507                    key,
4508                    TrieChangeSetsEntry {
4509                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4510                        node: None,
4511                    },
4512                )
4513                .unwrap();
4514        }
4515
4516        // Insert storage trie changesets for next_block (to test overlay)
4517        {
4518            let mut cursor =
4519                provider_rw.tx_ref().cursor_dup_write::<tables::StoragesTrieChangeSets>().unwrap();
4520            let key = BlockNumberHashedAddress((next_block, storage_address1));
4521
4522            // storage_nibbles2 was deleted in next_block
4523            cursor
4524                .append_dup(
4525                    key,
4526                    TrieChangeSetsEntry {
4527                        nibbles: StoredNibblesSubKey(storage_nibbles2),
4528                        node: Some(BranchNodeCompact::new(
4529                            0b0101_0101_0101_0101, // value that was deleted
4530                            0b0000_0000_0000_0000,
4531                            0b0000_0000_0000_0000,
4532                            vec![],
4533                            None,
4534                        )),
4535                    },
4536                )
4537                .unwrap();
4538        }
4539
4540        provider_rw.commit().unwrap();
4541
4542        // Now test get_block_trie_updates
4543        let provider = factory.provider().unwrap();
4544        let result = provider.get_block_trie_updates(target_block).unwrap();
4545
4546        // Verify account trie updates
4547        assert_eq!(result.account_nodes_ref().len(), 2, "Should have 2 account trie updates");
4548
4549        // Check nibbles1 - should have the current value (node1)
4550        let nibbles1_update = result
4551            .account_nodes_ref()
4552            .iter()
4553            .find(|(n, _)| n == &account_nibbles1)
4554            .expect("Should find nibbles1");
4555        assert!(nibbles1_update.1.is_some(), "nibbles1 should have a value");
4556        assert_eq!(
4557            nibbles1_update.1.as_ref().unwrap().state_mask,
4558            node1.state_mask,
4559            "nibbles1 should have current value"
4560        );
4561
4562        // Check nibbles2 - should have the current value (node2)
4563        let nibbles2_update = result
4564            .account_nodes_ref()
4565            .iter()
4566            .find(|(n, _)| n == &account_nibbles2)
4567            .expect("Should find nibbles2");
4568        assert!(nibbles2_update.1.is_some(), "nibbles2 should have a value");
4569        assert_eq!(
4570            nibbles2_update.1.as_ref().unwrap().state_mask,
4571            node2.state_mask,
4572            "nibbles2 should have current value"
4573        );
4574
4575        // nibbles3 should NOT be in the result (it was changed in next_block, not target_block)
4576        assert!(
4577            !result.account_nodes_ref().iter().any(|(n, _)| n == &account_nibbles3),
4578            "nibbles3 should not be in target_block updates"
4579        );
4580
4581        // Verify storage trie updates
4582        assert_eq!(result.storage_tries_ref().len(), 1, "Should have 1 storage trie");
4583        let storage_updates = result
4584            .storage_tries_ref()
4585            .get(&storage_address1)
4586            .expect("Should have storage updates for address1");
4587
4588        assert_eq!(storage_updates.storage_nodes.len(), 2, "Should have 2 storage node updates");
4589
4590        // Check storage_nibbles1 - should have current value
4591        let storage1_update = storage_updates
4592            .storage_nodes
4593            .iter()
4594            .find(|(n, _)| n == &storage_nibbles1)
4595            .expect("Should find storage_nibbles1");
4596        assert!(storage1_update.1.is_some(), "storage_nibbles1 should have a value");
4597        assert_eq!(
4598            storage1_update.1.as_ref().unwrap().state_mask,
4599            storage_node1.state_mask,
4600            "storage_nibbles1 should have current value"
4601        );
4602
4603        // Check storage_nibbles2 - was created in target_block, will be deleted in next_block
4604        // So it should have a value (the value that will be deleted)
4605        let storage2_update = storage_updates
4606            .storage_nodes
4607            .iter()
4608            .find(|(n, _)| n == &storage_nibbles2)
4609            .expect("Should find storage_nibbles2");
4610        assert!(
4611            storage2_update.1.is_some(),
4612            "storage_nibbles2 should have a value (the node that will be deleted in next block)"
4613        );
4614        assert_eq!(
4615            storage2_update.1.as_ref().unwrap().state_mask,
4616            storage_node2.state_mask,
4617            "storage_nibbles2 should have the value that was created and will be deleted"
4618        );
4619    }
4620}