reth_provider/providers/database/
provider.rs

1use crate::{
2    bundle_state::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        static_file::StaticFileWriter,
6        NodeTypesForProvider, StaticFileProvider,
7    },
8    to_range,
9    traits::{
10        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11    },
12    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14    DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15    HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16    OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17    StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
18    StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
19    TransactionsProviderExt, TrieWriter,
20};
21use alloy_consensus::{
22    transaction::{SignerRecoverable, TransactionMeta, TxHashRef},
23    BlockHeader, TxReceipt,
24};
25use alloy_eips::BlockHashOrNumber;
26use alloy_primitives::{
27    keccak256,
28    map::{hash_map, B256Map, HashMap, HashSet},
29    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
30};
31use itertools::Itertools;
32use rayon::slice::ParallelSliceMut;
33use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates};
34use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
35use reth_db_api::{
36    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
37    database::Database,
38    models::{
39        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
40        ShardedKey, StoredBlockBodyIndices,
41    },
42    table::Table,
43    tables,
44    transaction::{DbTx, DbTxMut},
45    BlockNumberList, PlainAccountState, PlainStorageState,
46};
47use reth_execution_types::{Chain, ExecutionOutcome};
48use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
49use reth_primitives_traits::{
50    Account, Block as _, BlockBody as _, Bytecode, GotExpected, RecoveredBlock, SealedHeader,
51    StorageEntry,
52};
53use reth_prune_types::{
54    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
55};
56use reth_stages_types::{StageCheckpoint, StageId};
57use reth_static_file_types::StaticFileSegment;
58use reth_storage_api::{
59    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, StateProvider,
60    StorageChangeSetReader, TryIntoHistoricalStateProvider,
61};
62use reth_storage_errors::provider::{ProviderResult, RootMismatch};
63use reth_trie::{
64    prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
65    updates::{StorageTrieUpdates, TrieUpdates},
66    HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
67};
68use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
69use revm_database::states::{
70    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
71};
72use std::{
73    cmp::Ordering,
74    collections::{BTreeMap, BTreeSet},
75    fmt::Debug,
76    ops::{Deref, DerefMut, Not, Range, RangeBounds, RangeInclusive},
77    sync::Arc,
78};
79use tracing::{debug, trace};
80
81/// A [`DatabaseProvider`] that holds a read-only database transaction.
82pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
83
84/// A [`DatabaseProvider`] that holds a read-write database transaction.
85///
86/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
87/// Once that issue is solved, we can probably revert back to being an alias type.
88#[derive(Debug)]
89pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
90    pub DatabaseProvider<<DB as Database>::TXMut, N>,
91);
92
93impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
94    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
95
96    fn deref(&self) -> &Self::Target {
97        &self.0
98    }
99}
100
101impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
102    fn deref_mut(&mut self) -> &mut Self::Target {
103        &mut self.0
104    }
105}
106
107impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
108    for DatabaseProviderRW<DB, N>
109{
110    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
111        &self.0
112    }
113}
114
115impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
116    /// Commit database transaction and static file if it exists.
117    pub fn commit(self) -> ProviderResult<bool> {
118        self.0.commit()
119    }
120
121    /// Consume `DbTx` or `DbTxMut`.
122    pub fn into_tx(self) -> <DB as Database>::TXMut {
123        self.0.into_tx()
124    }
125}
126
127impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
128    for DatabaseProvider<<DB as Database>::TXMut, N>
129{
130    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
131        provider.0
132    }
133}
134
135/// A provider struct that fetches data from the database.
136/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
137#[derive(Debug)]
138pub struct DatabaseProvider<TX, N: NodeTypes> {
139    /// Database transaction.
140    tx: TX,
141    /// Chain spec
142    chain_spec: Arc<N::ChainSpec>,
143    /// Static File provider
144    static_file_provider: StaticFileProvider<N::Primitives>,
145    /// Pruning configuration
146    prune_modes: PruneModes,
147    /// Node storage handler.
148    storage: Arc<N::Storage>,
149}
150
151impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
152    /// Returns reference to prune modes.
153    pub const fn prune_modes_ref(&self) -> &PruneModes {
154        &self.prune_modes
155    }
156}
157
158impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
159    /// State provider for latest state
160    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
161        trace!(target: "providers::db", "Returning latest state provider");
162        Box::new(LatestStateProviderRef::new(self))
163    }
164
165    /// Storage provider for state at that given block hash
166    pub fn history_by_block_hash<'a>(
167        &'a self,
168        block_hash: BlockHash,
169    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
170        let mut block_number =
171            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
172        if block_number == self.best_block_number().unwrap_or_default() &&
173            block_number == self.last_block_number().unwrap_or_default()
174        {
175            return Ok(Box::new(LatestStateProviderRef::new(self)))
176        }
177
178        // +1 as the changeset that we want is the one that was applied after this block.
179        block_number += 1;
180
181        let account_history_prune_checkpoint =
182            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
183        let storage_history_prune_checkpoint =
184            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
185
186        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
187
188        // If we pruned account or storage history, we can't return state on every historical block.
189        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
190        if let Some(prune_checkpoint_block_number) =
191            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
192        {
193            state_provider = state_provider.with_lowest_available_account_history_block_number(
194                prune_checkpoint_block_number + 1,
195            );
196        }
197        if let Some(prune_checkpoint_block_number) =
198            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
199        {
200            state_provider = state_provider.with_lowest_available_storage_history_block_number(
201                prune_checkpoint_block_number + 1,
202            );
203        }
204
205        Ok(Box::new(state_provider))
206    }
207
208    #[cfg(feature = "test-utils")]
209    /// Sets the prune modes for provider.
210    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
211        self.prune_modes = prune_modes;
212    }
213}
214
215impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
216    type Primitives = N::Primitives;
217}
218
219impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
220    /// Returns a static file provider
221    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
222        self.static_file_provider.clone()
223    }
224}
225
226impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
227    for DatabaseProvider<TX, N>
228{
229    type ChainSpec = N::ChainSpec;
230
231    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
232        self.chain_spec.clone()
233    }
234}
235
236impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
237    /// Creates a provider with an inner read-write transaction.
238    pub const fn new_rw(
239        tx: TX,
240        chain_spec: Arc<N::ChainSpec>,
241        static_file_provider: StaticFileProvider<N::Primitives>,
242        prune_modes: PruneModes,
243        storage: Arc<N::Storage>,
244    ) -> Self {
245        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
246    }
247}
248
249impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
250    fn as_ref(&self) -> &Self {
251        self
252    }
253}
254
255impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
256    /// Writes executed blocks and state to storage.
257    pub fn save_blocks(
258        &self,
259        blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
260    ) -> ProviderResult<()> {
261        if blocks.is_empty() {
262            debug!(target: "providers::db", "Attempted to write empty block range");
263            return Ok(())
264        }
265
266        // NOTE: checked non-empty above
267        let first_block = blocks.first().unwrap().recovered_block();
268
269        let last_block = blocks.last().unwrap().recovered_block();
270        let first_number = first_block.number();
271        let last_block_number = last_block.number();
272
273        debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage");
274
275        // TODO: Do performant / batched writes for each type of object
276        // instead of a loop over all blocks,
277        // meaning:
278        //  * blocks
279        //  * state
280        //  * hashed state
281        //  * trie updates (cannot naively extend, need helper)
282        //  * indices (already done basically)
283        // Insert the blocks
284        for ExecutedBlockWithTrieUpdates {
285            block: ExecutedBlock { recovered_block, execution_output, hashed_state },
286            trie,
287        } in blocks
288        {
289            let block_hash = recovered_block.hash();
290            self.insert_block(Arc::unwrap_or_clone(recovered_block))?;
291
292            // Write state and changesets to the database.
293            // Must be written after blocks because of the receipt lookup.
294            self.write_state(&execution_output, OriginalValuesKnown::No)?;
295
296            // insert hashes and intermediate merkle nodes
297            self.write_hashed_state(&Arc::unwrap_or_clone(hashed_state).into_sorted())?;
298            self.write_trie_updates(
299                trie.as_ref().ok_or(ProviderError::MissingTrieUpdates(block_hash))?,
300            )?;
301        }
302
303        // update history indices
304        self.update_history_indices(first_number..=last_block_number)?;
305
306        // Update pipeline progress
307        self.update_pipeline_stages(last_block_number, false)?;
308
309        debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data");
310
311        Ok(())
312    }
313
314    /// Unwinds trie state for the given range.
315    ///
316    /// This includes calculating the resulted state root and comparing it with the parent block
317    /// state root.
318    pub fn unwind_trie_state_range(
319        &self,
320        range: RangeInclusive<BlockNumber>,
321    ) -> ProviderResult<()> {
322        let changed_accounts = self
323            .tx
324            .cursor_read::<tables::AccountChangeSets>()?
325            .walk_range(range.clone())?
326            .collect::<Result<Vec<_>, _>>()?;
327
328        // Unwind account hashes. Add changed accounts to account prefix set.
329        let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
330        let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
331        let mut destroyed_accounts = HashSet::default();
332        for (hashed_address, account) in hashed_addresses {
333            account_prefix_set.insert(Nibbles::unpack(hashed_address));
334            if account.is_none() {
335                destroyed_accounts.insert(hashed_address);
336            }
337        }
338
339        // Unwind account history indices.
340        self.unwind_account_history_indices(changed_accounts.iter())?;
341        let storage_range = BlockNumberAddress::range(range.clone());
342
343        let changed_storages = self
344            .tx
345            .cursor_read::<tables::StorageChangeSets>()?
346            .walk_range(storage_range)?
347            .collect::<Result<Vec<_>, _>>()?;
348
349        // Unwind storage hashes. Add changed account and storage keys to corresponding prefix
350        // sets.
351        let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
352        let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
353        for (hashed_address, hashed_slots) in storage_entries {
354            account_prefix_set.insert(Nibbles::unpack(hashed_address));
355            let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
356            for slot in hashed_slots {
357                storage_prefix_set.insert(Nibbles::unpack(slot));
358            }
359            storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
360        }
361
362        // Unwind storage history indices.
363        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
364
365        // Calculate the reverted merkle root.
366        // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
367        // are pre-loaded.
368        let prefix_sets = TriePrefixSets {
369            account_prefix_set: account_prefix_set.freeze(),
370            storage_prefix_sets,
371            destroyed_accounts,
372        };
373        let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
374            .with_prefix_sets(prefix_sets)
375            .root_with_updates()
376            .map_err(reth_db_api::DatabaseError::from)?;
377
378        let parent_number = range.start().saturating_sub(1);
379        let parent_state_root = self
380            .header_by_number(parent_number)?
381            .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
382            .state_root();
383
384        // state root should be always correct as we are reverting state.
385        // but for sake of double verification we will check it again.
386        if new_state_root != parent_state_root {
387            let parent_hash = self
388                .block_hash(parent_number)?
389                .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
390            return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
391                root: GotExpected { got: new_state_root, expected: parent_state_root },
392                block_number: parent_number,
393                block_hash: parent_hash,
394            })))
395        }
396        self.write_trie_updates(&trie_updates)?;
397
398        Ok(())
399    }
400
401    /// Removes receipts from all transactions starting with provided number (inclusive).
402    fn remove_receipts_from(
403        &self,
404        from_tx: TxNumber,
405        last_block: BlockNumber,
406    ) -> ProviderResult<()> {
407        // iterate over block body and remove receipts
408        self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
409
410        if !self.prune_modes.has_receipts_pruning() {
411            let static_file_receipt_num =
412                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
413
414            let to_delete = static_file_receipt_num
415                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
416                .unwrap_or_default();
417
418            self.static_file_provider
419                .latest_writer(StaticFileSegment::Receipts)?
420                .prune_receipts(to_delete, last_block)?;
421        }
422
423        Ok(())
424    }
425}
426
427impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
428    fn try_into_history_at_block(
429        self,
430        mut block_number: BlockNumber,
431    ) -> ProviderResult<StateProviderBox> {
432        // if the block number is the same as the currently best block number on disk we can use the
433        // latest state provider here
434        if block_number == self.best_block_number().unwrap_or_default() {
435            return Ok(Box::new(LatestStateProvider::new(self)))
436        }
437
438        // +1 as the changeset that we want is the one that was applied after this block.
439        block_number += 1;
440
441        let account_history_prune_checkpoint =
442            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
443        let storage_history_prune_checkpoint =
444            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
445
446        let mut state_provider = HistoricalStateProvider::new(self, block_number);
447
448        // If we pruned account or storage history, we can't return state on every historical block.
449        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
450        if let Some(prune_checkpoint_block_number) =
451            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
452        {
453            state_provider = state_provider.with_lowest_available_account_history_block_number(
454                prune_checkpoint_block_number + 1,
455            );
456        }
457        if let Some(prune_checkpoint_block_number) =
458            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
459        {
460            state_provider = state_provider.with_lowest_available_storage_history_block_number(
461                prune_checkpoint_block_number + 1,
462            );
463        }
464
465        Ok(Box::new(state_provider))
466    }
467}
468
469/// For a given key, unwind all history shards that contain block numbers at or above the given
470/// block number.
471///
472/// S - Sharded key subtype.
473/// T - Table to walk over.
474/// C - Cursor implementation.
475///
476/// This function walks the entries from the given start key and deletes all shards that belong to
477/// the key and contain block numbers at or above the given block number. Shards entirely below
478/// the block number are preserved.
479///
480/// The boundary shard (the shard that spans across the block number) is removed from the database.
481/// Any indices that are below the block number are filtered out and returned for reinsertion.
482/// The boundary shard is returned for reinsertion (if it's not empty).
483fn unwind_history_shards<S, T, C>(
484    cursor: &mut C,
485    start_key: T::Key,
486    block_number: BlockNumber,
487    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
488) -> ProviderResult<Vec<u64>>
489where
490    T: Table<Value = BlockNumberList>,
491    T::Key: AsRef<ShardedKey<S>>,
492    C: DbCursorRO<T> + DbCursorRW<T>,
493{
494    // Start from the given key and iterate through shards
495    let mut item = cursor.seek_exact(start_key)?;
496    while let Some((sharded_key, list)) = item {
497        // If the shard does not belong to the key, break.
498        if !shard_belongs_to_key(&sharded_key) {
499            break
500        }
501
502        // Always delete the current shard from the database first
503        // We'll decide later what (if anything) to reinsert
504        cursor.delete_current()?;
505
506        // Get the first (lowest) block number in this shard
507        // All block numbers in a shard are sorted in ascending order
508        let first = list.iter().next().expect("List can't be empty");
509
510        // Case 1: Entire shard is at or above the unwinding point
511        // Keep it deleted (don't return anything for reinsertion)
512        if first >= block_number {
513            item = cursor.prev()?;
514            continue
515        }
516        // Case 2: This is a boundary shard (spans across the unwinding point)
517        // The shard contains some blocks below and some at/above the unwinding point
518        else if block_number <= sharded_key.as_ref().highest_block_number {
519            // Return only the block numbers that are below the unwinding point
520            // These will be reinserted to preserve the historical data
521            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
522        }
523        // Case 3: Entire shard is below the unwinding point
524        // Return all block numbers for reinsertion (preserve entire shard)
525        return Ok(list.iter().collect::<Vec<_>>())
526    }
527
528    // No shards found or all processed
529    Ok(Vec::new())
530}
531
532impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
533    /// Creates a provider with an inner read-only transaction.
534    pub const fn new(
535        tx: TX,
536        chain_spec: Arc<N::ChainSpec>,
537        static_file_provider: StaticFileProvider<N::Primitives>,
538        prune_modes: PruneModes,
539        storage: Arc<N::Storage>,
540    ) -> Self {
541        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
542    }
543
544    /// Consume `DbTx` or `DbTxMut`.
545    pub fn into_tx(self) -> TX {
546        self.tx
547    }
548
549    /// Pass `DbTx` or `DbTxMut` mutable reference.
550    pub const fn tx_mut(&mut self) -> &mut TX {
551        &mut self.tx
552    }
553
554    /// Pass `DbTx` or `DbTxMut` immutable reference.
555    pub const fn tx_ref(&self) -> &TX {
556        &self.tx
557    }
558
559    /// Returns a reference to the chain specification.
560    pub fn chain_spec(&self) -> &N::ChainSpec {
561        &self.chain_spec
562    }
563}
564
565impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
566    fn recovered_block<H, HF, B, BF>(
567        &self,
568        id: BlockHashOrNumber,
569        _transaction_kind: TransactionVariant,
570        header_by_number: HF,
571        construct_block: BF,
572    ) -> ProviderResult<Option<B>>
573    where
574        H: AsRef<HeaderTy<N>>,
575        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
576        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
577    {
578        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
579        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
580
581        // Get the block body
582        //
583        // If the body indices are not found, this means that the transactions either do not exist
584        // in the database yet, or they do exit but are not indexed. If they exist but are not
585        // indexed, we don't have enough information to return the block anyways, so we return
586        // `None`.
587        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
588
589        let tx_range = body.tx_num_range();
590
591        let (transactions, senders) = if tx_range.is_empty() {
592            (vec![], vec![])
593        } else {
594            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
595        };
596
597        let body = self
598            .storage
599            .reader()
600            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
601            .pop()
602            .ok_or(ProviderError::InvalidStorageOutput)?;
603
604        construct_block(header, body, senders)
605    }
606
607    /// Returns a range of blocks from the database.
608    ///
609    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
610    /// construct blocks from the following inputs:
611    ///     – Header
612    ///     - Range of transaction numbers
613    ///     – Ommers
614    ///     – Withdrawals
615    ///     – Senders
616    fn block_range<F, H, HF, R>(
617        &self,
618        range: RangeInclusive<BlockNumber>,
619        headers_range: HF,
620        mut assemble_block: F,
621    ) -> ProviderResult<Vec<R>>
622    where
623        H: AsRef<HeaderTy<N>>,
624        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
625        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
626    {
627        if range.is_empty() {
628            return Ok(Vec::new())
629        }
630
631        let len = range.end().saturating_sub(*range.start()) as usize;
632        let mut blocks = Vec::with_capacity(len);
633
634        let headers = headers_range(range.clone())?;
635
636        // If the body indices are not found, this means that the transactions either do
637        // not exist in the database yet, or they do exit but are
638        // not indexed. If they exist but are not indexed, we don't
639        // have enough information to return the block anyways, so
640        // we skip the block.
641        let present_headers = self
642            .block_body_indices_range(range)?
643            .into_iter()
644            .map(|b| b.tx_num_range())
645            .zip(headers)
646            .collect::<Vec<_>>();
647
648        let mut inputs = Vec::new();
649        for (tx_range, header) in &present_headers {
650            let transactions = if tx_range.is_empty() {
651                Vec::new()
652            } else {
653                self.transactions_by_tx_range(tx_range.clone())?
654            };
655
656            inputs.push((header.as_ref(), transactions));
657        }
658
659        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
660
661        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
662            blocks.push(assemble_block(header, body, tx_range)?);
663        }
664
665        Ok(blocks)
666    }
667
668    /// Returns a range of blocks from the database, along with the senders of each
669    /// transaction in the blocks.
670    ///
671    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
672    /// construct blocks from the following inputs:
673    ///     – Header
674    ///     - Transactions
675    ///     – Ommers
676    ///     – Withdrawals
677    ///     – Senders
678    fn block_with_senders_range<H, HF, B, BF>(
679        &self,
680        range: RangeInclusive<BlockNumber>,
681        headers_range: HF,
682        assemble_block: BF,
683    ) -> ProviderResult<Vec<B>>
684    where
685        H: AsRef<HeaderTy<N>>,
686        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
687        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
688    {
689        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
690
691        self.block_range(range, headers_range, |header, body, tx_range| {
692            let senders = if tx_range.is_empty() {
693                Vec::new()
694            } else {
695                // fetch senders from the senders table
696                let known_senders =
697                    senders_cursor
698                        .walk_range(tx_range.clone())?
699                        .collect::<Result<HashMap<_, _>, _>>()?;
700
701                let mut senders = Vec::with_capacity(body.transactions().len());
702                for (tx_num, tx) in tx_range.zip(body.transactions()) {
703                    match known_senders.get(&tx_num) {
704                        None => {
705                            // recover the sender from the transaction if not found
706                            let sender = tx.recover_signer_unchecked()?;
707                            senders.push(sender);
708                        }
709                        Some(sender) => senders.push(*sender),
710                    }
711                }
712
713                senders
714            };
715
716            assemble_block(header, body, senders)
717        })
718    }
719
720    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
721    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
722    /// account changesets.
723    fn populate_bundle_state<A, S>(
724        &self,
725        account_changeset: Vec<(u64, AccountBeforeTx)>,
726        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
727        plain_accounts_cursor: &mut A,
728        plain_storage_cursor: &mut S,
729    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
730    where
731        A: DbCursorRO<PlainAccountState>,
732        S: DbDupCursorRO<PlainStorageState>,
733    {
734        // iterate previous value and get plain state value to create changeset
735        // Double option around Account represent if Account state is know (first option) and
736        // account is removed (Second Option)
737        let mut state: BundleStateInit = HashMap::default();
738
739        // This is not working for blocks that are not at tip. as plain state is not the last
740        // state of end range. We should rename the functions or add support to access
741        // History state. Accessing history state can be tricky but we are not gaining
742        // anything.
743
744        let mut reverts: RevertsInit = HashMap::default();
745
746        // add account changeset changes
747        for (block_number, account_before) in account_changeset.into_iter().rev() {
748            let AccountBeforeTx { info: old_info, address } = account_before;
749            match state.entry(address) {
750                hash_map::Entry::Vacant(entry) => {
751                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
752                    entry.insert((old_info, new_info, HashMap::default()));
753                }
754                hash_map::Entry::Occupied(mut entry) => {
755                    // overwrite old account state.
756                    entry.get_mut().0 = old_info;
757                }
758            }
759            // insert old info into reverts.
760            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
761        }
762
763        // add storage changeset changes
764        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
765            let BlockNumberAddress((block_number, address)) = block_and_address;
766            // get account state or insert from plain state.
767            let account_state = match state.entry(address) {
768                hash_map::Entry::Vacant(entry) => {
769                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
770                    entry.insert((present_info, present_info, HashMap::default()))
771                }
772                hash_map::Entry::Occupied(entry) => entry.into_mut(),
773            };
774
775            // match storage.
776            match account_state.2.entry(old_storage.key) {
777                hash_map::Entry::Vacant(entry) => {
778                    let new_storage = plain_storage_cursor
779                        .seek_by_key_subkey(address, old_storage.key)?
780                        .filter(|storage| storage.key == old_storage.key)
781                        .unwrap_or_default();
782                    entry.insert((old_storage.value, new_storage.value));
783                }
784                hash_map::Entry::Occupied(mut entry) => {
785                    entry.get_mut().0 = old_storage.value;
786                }
787            };
788
789            reverts
790                .entry(block_number)
791                .or_default()
792                .entry(address)
793                .or_default()
794                .1
795                .push(old_storage);
796        }
797
798        Ok((state, reverts))
799    }
800}
801
802impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
803    /// Load shard and remove it. If list is empty, last shard was full or
804    /// there are no shards at all.
805    fn take_shard<T>(
806        &self,
807        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
808        key: T::Key,
809    ) -> ProviderResult<Vec<u64>>
810    where
811        T: Table<Value = BlockNumberList>,
812    {
813        if let Some((_, list)) = cursor.seek_exact(key)? {
814            // delete old shard so new one can be inserted.
815            cursor.delete_current()?;
816            let list = list.iter().collect::<Vec<_>>();
817            return Ok(list)
818        }
819        Ok(Vec::new())
820    }
821
822    /// Insert history index to the database.
823    ///
824    /// For each updated partial key, this function removes the last shard from
825    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
826    /// inserts the new shards back into the database.
827    ///
828    /// This function is used by history indexing stages.
829    fn append_history_index<P, T>(
830        &self,
831        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
832        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
833    ) -> ProviderResult<()>
834    where
835        P: Copy,
836        T: Table<Value = BlockNumberList>,
837    {
838        let mut cursor = self.tx.cursor_write::<T>()?;
839        for (partial_key, indices) in index_updates {
840            let mut last_shard =
841                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
842            last_shard.extend(indices);
843            // Chunk indices and insert them in shards of N size.
844            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
845            while let Some(list) = chunks.next() {
846                let highest_block_number = if chunks.peek().is_some() {
847                    *list.last().expect("`chunks` does not return empty list")
848                } else {
849                    // Insert last list with `u64::MAX`.
850                    u64::MAX
851                };
852                cursor.insert(
853                    sharded_key_factory(partial_key, highest_block_number),
854                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
855                )?;
856            }
857        }
858        Ok(())
859    }
860}
861
862impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
863    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
864        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
865    }
866}
867
868impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
869    fn changed_accounts_with_range(
870        &self,
871        range: impl RangeBounds<BlockNumber>,
872    ) -> ProviderResult<BTreeSet<Address>> {
873        self.tx
874            .cursor_read::<tables::AccountChangeSets>()?
875            .walk_range(range)?
876            .map(|entry| {
877                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
878            })
879            .collect()
880    }
881
882    fn basic_accounts(
883        &self,
884        iter: impl IntoIterator<Item = Address>,
885    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
886        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
887        Ok(iter
888            .into_iter()
889            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
890            .collect::<Result<Vec<_>, _>>()?)
891    }
892
893    fn changed_accounts_and_blocks_with_range(
894        &self,
895        range: RangeInclusive<BlockNumber>,
896    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
897        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
898
899        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
900            BTreeMap::new(),
901            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
902                let (index, account) = entry?;
903                accounts.entry(account.address).or_default().push(index);
904                Ok(accounts)
905            },
906        )?;
907
908        Ok(account_transitions)
909    }
910}
911
912impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
913    fn storage_changeset(
914        &self,
915        block_number: BlockNumber,
916    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
917        let range = block_number..=block_number;
918        let storage_range = BlockNumberAddress::range(range);
919        self.tx
920            .cursor_dup_read::<tables::StorageChangeSets>()?
921            .walk_range(storage_range)?
922            .map(|result| -> ProviderResult<_> { Ok(result?) })
923            .collect()
924    }
925}
926
927impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
928    fn account_block_changeset(
929        &self,
930        block_number: BlockNumber,
931    ) -> ProviderResult<Vec<AccountBeforeTx>> {
932        let range = block_number..=block_number;
933        self.tx
934            .cursor_read::<tables::AccountChangeSets>()?
935            .walk_range(range)?
936            .map(|result| -> ProviderResult<_> {
937                let (_, account_before) = result?;
938                Ok(account_before)
939            })
940            .collect()
941    }
942
943    fn get_account_before_block(
944        &self,
945        block_number: BlockNumber,
946        address: Address,
947    ) -> ProviderResult<Option<AccountBeforeTx>> {
948        self.tx
949            .cursor_dup_read::<tables::AccountChangeSets>()?
950            .seek_by_key_subkey(block_number, address)?
951            .filter(|acc| acc.address == address)
952            .map(Ok)
953            .transpose()
954    }
955}
956
957impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
958    for DatabaseProvider<TX, N>
959{
960    type Header = HeaderTy<N>;
961
962    fn local_tip_header(
963        &self,
964        highest_uninterrupted_block: BlockNumber,
965    ) -> ProviderResult<SealedHeader<Self::Header>> {
966        let static_file_provider = self.static_file_provider();
967
968        // Make sure Headers static file is at the same height. If it's further, this
969        // input execution was interrupted previously and we need to unwind the static file.
970        let next_static_file_block_num = static_file_provider
971            .get_highest_static_file_block(StaticFileSegment::Headers)
972            .map(|id| id + 1)
973            .unwrap_or_default();
974        let next_block = highest_uninterrupted_block + 1;
975
976        match next_static_file_block_num.cmp(&next_block) {
977            // The node shutdown between an executed static file commit and before the database
978            // commit, so we need to unwind the static files.
979            Ordering::Greater => {
980                let mut static_file_producer =
981                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
982                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
983                // Since this is a database <-> static file inconsistency, we commit the change
984                // straight away.
985                static_file_producer.commit()?
986            }
987            Ordering::Less => {
988                // There's either missing or corrupted files.
989                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
990            }
991            Ordering::Equal => {}
992        }
993
994        let local_head = static_file_provider
995            .sealed_header(highest_uninterrupted_block)?
996            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
997
998        Ok(local_head)
999    }
1000}
1001
1002impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1003    type Header = HeaderTy<N>;
1004
1005    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1006        if let Some(num) = self.block_number(block_hash)? {
1007            Ok(self.header_by_number(num)?)
1008        } else {
1009            Ok(None)
1010        }
1011    }
1012
1013    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1014        self.static_file_provider.header_by_number(num)
1015    }
1016
1017    fn header_td(&self, block_hash: BlockHash) -> ProviderResult<Option<U256>> {
1018        if let Some(num) = self.block_number(block_hash)? {
1019            self.header_td_by_number(num)
1020        } else {
1021            Ok(None)
1022        }
1023    }
1024
1025    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1026        if self.chain_spec.is_paris_active_at_block(number) &&
1027            let Some(td) = self.chain_spec.final_paris_total_difficulty()
1028        {
1029            // if this block is higher than the final paris(merge) block, return the final paris
1030            // difficulty
1031            return Ok(Some(td))
1032        }
1033
1034        self.static_file_provider.header_td_by_number(number)
1035    }
1036
1037    fn headers_range(
1038        &self,
1039        range: impl RangeBounds<BlockNumber>,
1040    ) -> ProviderResult<Vec<Self::Header>> {
1041        self.static_file_provider.headers_range(range)
1042    }
1043
1044    fn sealed_header(
1045        &self,
1046        number: BlockNumber,
1047    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1048        self.static_file_provider.sealed_header(number)
1049    }
1050
1051    fn sealed_headers_while(
1052        &self,
1053        range: impl RangeBounds<BlockNumber>,
1054        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1055    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1056        self.static_file_provider.sealed_headers_while(range, predicate)
1057    }
1058}
1059
1060impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1061    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1062        self.static_file_provider.block_hash(number)
1063    }
1064
1065    fn canonical_hashes_range(
1066        &self,
1067        start: BlockNumber,
1068        end: BlockNumber,
1069    ) -> ProviderResult<Vec<B256>> {
1070        self.static_file_provider.canonical_hashes_range(start, end)
1071    }
1072}
1073
1074impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1075    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1076        let best_number = self.best_block_number()?;
1077        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1078        Ok(ChainInfo { best_hash, best_number })
1079    }
1080
1081    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1082        // The best block number is tracked via the finished stage which gets updated in the same tx
1083        // when new blocks committed
1084        Ok(self
1085            .get_stage_checkpoint(StageId::Finish)?
1086            .map(|checkpoint| checkpoint.block_number)
1087            .unwrap_or_default())
1088    }
1089
1090    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1091        self.static_file_provider.last_block_number()
1092    }
1093
1094    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1095        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1096    }
1097}
1098
1099impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1100    type Block = BlockTy<N>;
1101
1102    fn find_block_by_hash(
1103        &self,
1104        hash: B256,
1105        source: BlockSource,
1106    ) -> ProviderResult<Option<Self::Block>> {
1107        if source.is_canonical() {
1108            self.block(hash.into())
1109        } else {
1110            Ok(None)
1111        }
1112    }
1113
1114    /// Returns the block with matching number from database.
1115    ///
1116    /// If the header for this block is not found, this returns `None`.
1117    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1118    /// will return None.
1119    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1120        if let Some(number) = self.convert_hash_or_number(id)? &&
1121            let Some(header) = self.header_by_number(number)?
1122        {
1123            // If the body indices are not found, this means that the transactions either do not
1124            // exist in the database yet, or they do exit but are not indexed.
1125            // If they exist but are not indexed, we don't have enough
1126            // information to return the block anyways, so we return `None`.
1127            let Some(transactions) = self.transactions_by_block(number.into())? else {
1128                return Ok(None)
1129            };
1130
1131            let body = self
1132                .storage
1133                .reader()
1134                .read_block_bodies(self, vec![(&header, transactions)])?
1135                .pop()
1136                .ok_or(ProviderError::InvalidStorageOutput)?;
1137
1138            return Ok(Some(Self::Block::new(header, body)))
1139        }
1140
1141        Ok(None)
1142    }
1143
1144    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1145        Ok(None)
1146    }
1147
1148    fn pending_block_and_receipts(
1149        &self,
1150    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1151        Ok(None)
1152    }
1153
1154    /// Returns the block with senders with matching number or hash from database.
1155    ///
1156    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1157    /// spot, and we want fast querying.**
1158    ///
1159    /// If the header for this block is not found, this returns `None`.
1160    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1161    /// will return None.
1162    fn recovered_block(
1163        &self,
1164        id: BlockHashOrNumber,
1165        transaction_kind: TransactionVariant,
1166    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1167        self.recovered_block(
1168            id,
1169            transaction_kind,
1170            |block_number| self.header_by_number(block_number),
1171            |header, body, senders| {
1172                Self::Block::new(header, body)
1173                    // Note: we're using unchecked here because we know the block contains valid txs
1174                    // wrt to its height and can ignore the s value check so pre
1175                    // EIP-2 txs are allowed
1176                    .try_into_recovered_unchecked(senders)
1177                    .map(Some)
1178                    .map_err(|_| ProviderError::SenderRecoveryError)
1179            },
1180        )
1181    }
1182
1183    fn sealed_block_with_senders(
1184        &self,
1185        id: BlockHashOrNumber,
1186        transaction_kind: TransactionVariant,
1187    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1188        self.recovered_block(
1189            id,
1190            transaction_kind,
1191            |block_number| self.sealed_header(block_number),
1192            |header, body, senders| {
1193                Self::Block::new_sealed(header, body)
1194                    // Note: we're using unchecked here because we know the block contains valid txs
1195                    // wrt to its height and can ignore the s value check so pre
1196                    // EIP-2 txs are allowed
1197                    .try_with_senders_unchecked(senders)
1198                    .map(Some)
1199                    .map_err(|_| ProviderError::SenderRecoveryError)
1200            },
1201        )
1202    }
1203
1204    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1205        self.block_range(
1206            range,
1207            |range| self.headers_range(range),
1208            |header, body, _| Ok(Self::Block::new(header, body)),
1209        )
1210    }
1211
1212    fn block_with_senders_range(
1213        &self,
1214        range: RangeInclusive<BlockNumber>,
1215    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1216        self.block_with_senders_range(
1217            range,
1218            |range| self.headers_range(range),
1219            |header, body, senders| {
1220                Self::Block::new(header, body)
1221                    .try_into_recovered_unchecked(senders)
1222                    .map_err(|_| ProviderError::SenderRecoveryError)
1223            },
1224        )
1225    }
1226
1227    fn recovered_block_range(
1228        &self,
1229        range: RangeInclusive<BlockNumber>,
1230    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1231        self.block_with_senders_range(
1232            range,
1233            |range| self.sealed_headers_range(range),
1234            |header, body, senders| {
1235                Self::Block::new_sealed(header, body)
1236                    .try_with_senders(senders)
1237                    .map_err(|_| ProviderError::SenderRecoveryError)
1238            },
1239        )
1240    }
1241
1242    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1243        Ok(self
1244            .tx
1245            .cursor_read::<tables::TransactionBlocks>()?
1246            .seek(id)
1247            .map(|b| b.map(|(_, bn)| bn))?)
1248    }
1249}
1250
1251impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1252    for DatabaseProvider<TX, N>
1253{
1254    /// Recovers transaction hashes by walking through `Transactions` table and
1255    /// calculating them in a parallel manner. Returned unsorted.
1256    fn transaction_hashes_by_range(
1257        &self,
1258        tx_range: Range<TxNumber>,
1259    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1260        self.static_file_provider.transaction_hashes_by_range(tx_range)
1261    }
1262}
1263
1264// Calculates the hash of the given transaction
1265impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1266    type Transaction = TxTy<N>;
1267
1268    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1269        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1270    }
1271
1272    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1273        self.static_file_provider.transaction_by_id(id)
1274    }
1275
1276    fn transaction_by_id_unhashed(
1277        &self,
1278        id: TxNumber,
1279    ) -> ProviderResult<Option<Self::Transaction>> {
1280        self.static_file_provider.transaction_by_id_unhashed(id)
1281    }
1282
1283    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1284        if let Some(id) = self.transaction_id(hash)? {
1285            Ok(self.transaction_by_id_unhashed(id)?)
1286        } else {
1287            Ok(None)
1288        }
1289    }
1290
1291    fn transaction_by_hash_with_meta(
1292        &self,
1293        tx_hash: TxHash,
1294    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1295        if let Some(transaction_id) = self.transaction_id(tx_hash)? &&
1296            let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? &&
1297            let Some(block_number) = self.block_by_transaction_id(transaction_id)? &&
1298            let Some(sealed_header) = self.sealed_header(block_number)?
1299        {
1300            let (header, block_hash) = sealed_header.split();
1301            if let Some(block_body) = self.block_body_indices(block_number)? {
1302                // the index of the tx in the block is the offset:
1303                // len([start..tx_id])
1304                // NOTE: `transaction_id` is always `>=` the block's first
1305                // index
1306                let index = transaction_id - block_body.first_tx_num();
1307
1308                let meta = TransactionMeta {
1309                    tx_hash,
1310                    index,
1311                    block_hash,
1312                    block_number,
1313                    base_fee: header.base_fee_per_gas(),
1314                    excess_blob_gas: header.excess_blob_gas(),
1315                    timestamp: header.timestamp(),
1316                };
1317
1318                return Ok(Some((transaction, meta)))
1319            }
1320        }
1321
1322        Ok(None)
1323    }
1324
1325    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1326        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1327        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1328    }
1329
1330    fn transactions_by_block(
1331        &self,
1332        id: BlockHashOrNumber,
1333    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1334        if let Some(block_number) = self.convert_hash_or_number(id)? &&
1335            let Some(body) = self.block_body_indices(block_number)?
1336        {
1337            let tx_range = body.tx_num_range();
1338            return if tx_range.is_empty() {
1339                Ok(Some(Vec::new()))
1340            } else {
1341                self.transactions_by_tx_range(tx_range).map(Some)
1342            }
1343        }
1344        Ok(None)
1345    }
1346
1347    fn transactions_by_block_range(
1348        &self,
1349        range: impl RangeBounds<BlockNumber>,
1350    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1351        let range = to_range(range);
1352
1353        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1354            .into_iter()
1355            .map(|body| {
1356                let tx_num_range = body.tx_num_range();
1357                if tx_num_range.is_empty() {
1358                    Ok(Vec::new())
1359                } else {
1360                    self.transactions_by_tx_range(tx_num_range)
1361                }
1362            })
1363            .collect()
1364    }
1365
1366    fn transactions_by_tx_range(
1367        &self,
1368        range: impl RangeBounds<TxNumber>,
1369    ) -> ProviderResult<Vec<Self::Transaction>> {
1370        self.static_file_provider.transactions_by_tx_range(range)
1371    }
1372
1373    fn senders_by_tx_range(
1374        &self,
1375        range: impl RangeBounds<TxNumber>,
1376    ) -> ProviderResult<Vec<Address>> {
1377        self.cursor_read_collect::<tables::TransactionSenders>(range)
1378    }
1379
1380    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1381        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1382    }
1383}
1384
1385impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1386    type Receipt = ReceiptTy<N>;
1387
1388    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1389        self.static_file_provider.get_with_static_file_or_database(
1390            StaticFileSegment::Receipts,
1391            id,
1392            |static_file| static_file.receipt(id),
1393            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1394        )
1395    }
1396
1397    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1398        if let Some(id) = self.transaction_id(hash)? {
1399            self.receipt(id)
1400        } else {
1401            Ok(None)
1402        }
1403    }
1404
1405    fn receipts_by_block(
1406        &self,
1407        block: BlockHashOrNumber,
1408    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1409        if let Some(number) = self.convert_hash_or_number(block)? &&
1410            let Some(body) = self.block_body_indices(number)?
1411        {
1412            let tx_range = body.tx_num_range();
1413            return if tx_range.is_empty() {
1414                Ok(Some(Vec::new()))
1415            } else {
1416                self.receipts_by_tx_range(tx_range).map(Some)
1417            }
1418        }
1419        Ok(None)
1420    }
1421
1422    fn receipts_by_tx_range(
1423        &self,
1424        range: impl RangeBounds<TxNumber>,
1425    ) -> ProviderResult<Vec<Self::Receipt>> {
1426        self.static_file_provider.get_range_with_static_file_or_database(
1427            StaticFileSegment::Receipts,
1428            to_range(range),
1429            |static_file, range, _| static_file.receipts_by_tx_range(range),
1430            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1431            |_| true,
1432        )
1433    }
1434
1435    fn receipts_by_block_range(
1436        &self,
1437        block_range: RangeInclusive<BlockNumber>,
1438    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1439        if block_range.is_empty() {
1440            return Ok(Vec::new());
1441        }
1442
1443        // collect block body indices for each block in the range
1444        let mut block_body_indices = Vec::new();
1445        for block_num in block_range {
1446            if let Some(indices) = self.block_body_indices(block_num)? {
1447                block_body_indices.push(indices);
1448            } else {
1449                // use default indices for missing blocks (empty block)
1450                block_body_indices.push(StoredBlockBodyIndices::default());
1451            }
1452        }
1453
1454        if block_body_indices.is_empty() {
1455            return Ok(Vec::new());
1456        }
1457
1458        // find blocks with transactions to determine transaction range
1459        let non_empty_blocks: Vec<_> =
1460            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1461
1462        if non_empty_blocks.is_empty() {
1463            // all blocks are empty
1464            return Ok(vec![Vec::new(); block_body_indices.len()]);
1465        }
1466
1467        // calculate the overall transaction range
1468        let first_tx = non_empty_blocks[0].first_tx_num();
1469        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1470
1471        // fetch all receipts in the transaction range
1472        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1473        let mut receipts_iter = all_receipts.into_iter();
1474
1475        // distribute receipts to their respective blocks
1476        let mut result = Vec::with_capacity(block_body_indices.len());
1477        for indices in &block_body_indices {
1478            if indices.tx_count == 0 {
1479                result.push(Vec::new());
1480            } else {
1481                let block_receipts =
1482                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1483                result.push(block_receipts);
1484            }
1485        }
1486
1487        Ok(result)
1488    }
1489}
1490
1491impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1492    for DatabaseProvider<TX, N>
1493{
1494    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1495        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1496    }
1497
1498    fn block_body_indices_range(
1499        &self,
1500        range: RangeInclusive<BlockNumber>,
1501    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1502        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1503    }
1504}
1505
1506impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1507    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1508        Ok(if let Some(encoded) = id.get_pre_encoded() {
1509            self.tx.get_by_encoded_key::<tables::StageCheckpoints>(encoded)?
1510        } else {
1511            self.tx.get::<tables::StageCheckpoints>(id.to_string())?
1512        })
1513    }
1514
1515    /// Get stage checkpoint progress.
1516    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1517        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1518    }
1519
1520    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1521        self.tx
1522            .cursor_read::<tables::StageCheckpoints>()?
1523            .walk(None)?
1524            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1525            .map_err(ProviderError::Database)
1526    }
1527}
1528
1529impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1530    /// Save stage checkpoint.
1531    fn save_stage_checkpoint(
1532        &self,
1533        id: StageId,
1534        checkpoint: StageCheckpoint,
1535    ) -> ProviderResult<()> {
1536        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1537    }
1538
1539    /// Save stage checkpoint progress.
1540    fn save_stage_checkpoint_progress(
1541        &self,
1542        id: StageId,
1543        checkpoint: Vec<u8>,
1544    ) -> ProviderResult<()> {
1545        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1546    }
1547
1548    fn update_pipeline_stages(
1549        &self,
1550        block_number: BlockNumber,
1551        drop_stage_checkpoint: bool,
1552    ) -> ProviderResult<()> {
1553        // iterate over all existing stages in the table and update its progress.
1554        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1555        for stage_id in StageId::ALL {
1556            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1557            cursor.upsert(
1558                stage_id.to_string(),
1559                &StageCheckpoint {
1560                    block_number,
1561                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1562                },
1563            )?;
1564        }
1565
1566        Ok(())
1567    }
1568}
1569
1570impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1571    fn plain_state_storages(
1572        &self,
1573        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1574    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1575        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1576
1577        addresses_with_keys
1578            .into_iter()
1579            .map(|(address, storage)| {
1580                storage
1581                    .into_iter()
1582                    .map(|key| -> ProviderResult<_> {
1583                        Ok(plain_storage
1584                            .seek_by_key_subkey(address, key)?
1585                            .filter(|v| v.key == key)
1586                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1587                    })
1588                    .collect::<ProviderResult<Vec<_>>>()
1589                    .map(|storage| (address, storage))
1590            })
1591            .collect::<ProviderResult<Vec<(_, _)>>>()
1592    }
1593
1594    fn changed_storages_with_range(
1595        &self,
1596        range: RangeInclusive<BlockNumber>,
1597    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1598        self.tx
1599            .cursor_read::<tables::StorageChangeSets>()?
1600            .walk_range(BlockNumberAddress::range(range))?
1601            // fold all storages and save its old state so we can remove it from HashedStorage
1602            // it is needed as it is dup table.
1603            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1604                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1605                accounts.entry(address).or_default().insert(storage_entry.key);
1606                Ok(accounts)
1607            })
1608    }
1609
1610    fn changed_storages_and_blocks_with_range(
1611        &self,
1612        range: RangeInclusive<BlockNumber>,
1613    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1614        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1615
1616        let storage_changeset_lists =
1617            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1618                BTreeMap::new(),
1619                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1620                    let (index, storage) = entry?;
1621                    storages
1622                        .entry((index.address(), storage.key))
1623                        .or_default()
1624                        .push(index.block_number());
1625                    Ok(storages)
1626                },
1627            )?;
1628
1629        Ok(storage_changeset_lists)
1630    }
1631}
1632
1633impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1634    for DatabaseProvider<TX, N>
1635{
1636    type Receipt = ReceiptTy<N>;
1637
1638    fn write_state(
1639        &self,
1640        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1641        is_value_known: OriginalValuesKnown,
1642    ) -> ProviderResult<()> {
1643        let first_block = execution_outcome.first_block();
1644        let block_count = execution_outcome.len() as u64;
1645        let last_block = execution_outcome.last_block();
1646        let block_range = first_block..=last_block;
1647
1648        let tip = self.last_block_number()?.max(last_block);
1649
1650        let (plain_state, reverts) =
1651            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1652
1653        self.write_state_reverts(reverts, first_block)?;
1654        self.write_state_changes(plain_state)?;
1655
1656        // Fetch the first transaction number for each block in the range
1657        let block_indices: Vec<_> = self
1658            .block_body_indices_range(block_range)?
1659            .into_iter()
1660            .map(|b| b.first_tx_num)
1661            .collect();
1662
1663        // Ensure all expected blocks are present.
1664        if block_indices.len() < block_count as usize {
1665            let missing_blocks = block_count - block_indices.len() as u64;
1666            return Err(ProviderError::BlockBodyIndicesNotFound(
1667                last_block.saturating_sub(missing_blocks - 1),
1668            ));
1669        }
1670
1671        let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1672
1673        // Prepare receipts cursor if we are going to write receipts to the database
1674        //
1675        // We are writing to database if requested or if there's any kind of receipt pruning
1676        // configured
1677        let mut receipts_cursor = self.tx.cursor_write::<tables::Receipts<Self::Receipt>>()?;
1678
1679        // Prepare receipts static writer if we are going to write receipts to static files
1680        //
1681        // We are writing to static files if requested and if there's no receipt pruning configured
1682        let mut receipts_static_writer = has_receipts_pruning
1683            .not()
1684            .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1685            .transpose()?;
1686
1687        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1688        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1689
1690        // All receipts from the last 128 blocks are required for blockchain tree, even with
1691        // [`PruneSegment::ContractLogs`].
1692        let prunable_receipts =
1693            PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1694
1695        // Prepare set of addresses which logs should not be pruned.
1696        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1697        for (_, addresses) in contract_log_pruner.range(..first_block) {
1698            allowed_addresses.extend(addresses.iter().copied());
1699        }
1700
1701        for (idx, (receipts, first_tx_index)) in
1702            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1703        {
1704            let block_number = first_block + idx as u64;
1705
1706            // Increment block number for receipts static file writer
1707            if let Some(writer) = receipts_static_writer.as_mut() {
1708                writer.increment_block(block_number)?;
1709            }
1710
1711            // Skip writing receipts if pruning configuration requires us to.
1712            if prunable_receipts &&
1713                self.prune_modes
1714                    .receipts
1715                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1716            {
1717                continue
1718            }
1719
1720            // If there are new addresses to retain after this block number, track them
1721            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1722                allowed_addresses.extend(new_addresses.iter().copied());
1723            }
1724
1725            for (idx, receipt) in receipts.iter().enumerate() {
1726                let receipt_idx = first_tx_index + idx as u64;
1727                // Skip writing receipt if log filter is active and it does not have any logs to
1728                // retain
1729                if prunable_receipts &&
1730                    has_contract_log_filter &&
1731                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1732                {
1733                    continue
1734                }
1735
1736                if let Some(writer) = &mut receipts_static_writer {
1737                    writer.append_receipt(receipt_idx, receipt)?;
1738                }
1739
1740                receipts_cursor.append(receipt_idx, receipt)?;
1741            }
1742        }
1743
1744        Ok(())
1745    }
1746
1747    fn write_state_reverts(
1748        &self,
1749        reverts: PlainStateReverts,
1750        first_block: BlockNumber,
1751    ) -> ProviderResult<()> {
1752        // Write storage changes
1753        tracing::trace!("Writing storage changes");
1754        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1755        let mut storage_changeset_cursor =
1756            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1757        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1758            let block_number = first_block + block_index as BlockNumber;
1759
1760            tracing::trace!(block_number, "Writing block change");
1761            // sort changes by address.
1762            storage_changes.par_sort_unstable_by_key(|a| a.address);
1763            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1764                let storage_id = BlockNumberAddress((block_number, address));
1765
1766                let mut storage = storage_revert
1767                    .into_iter()
1768                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1769                    .collect::<Vec<_>>();
1770                // sort storage slots by key.
1771                storage.par_sort_unstable_by_key(|a| a.0);
1772
1773                // If we are writing the primary storage wipe transition, the pre-existing plain
1774                // storage state has to be taken from the database and written to storage history.
1775                // See [StorageWipe::Primary] for more details.
1776                let mut wiped_storage = Vec::new();
1777                if wiped {
1778                    tracing::trace!(?address, "Wiping storage");
1779                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1780                        wiped_storage.push((entry.key, entry.value));
1781                        while let Some(entry) = storages_cursor.next_dup_val()? {
1782                            wiped_storage.push((entry.key, entry.value))
1783                        }
1784                    }
1785                }
1786
1787                tracing::trace!(?address, ?storage, "Writing storage reverts");
1788                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1789                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1790                }
1791            }
1792        }
1793
1794        // Write account changes
1795        tracing::trace!("Writing account changes");
1796        let mut account_changeset_cursor =
1797            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1798
1799        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1800            let block_number = first_block + block_index as BlockNumber;
1801            // Sort accounts by address.
1802            account_block_reverts.par_sort_by_key(|a| a.0);
1803
1804            for (address, info) in account_block_reverts {
1805                account_changeset_cursor.append_dup(
1806                    block_number,
1807                    AccountBeforeTx { address, info: info.map(Into::into) },
1808                )?;
1809            }
1810        }
1811
1812        Ok(())
1813    }
1814
1815    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1816        // sort all entries so they can be written to database in more performant way.
1817        // and take smaller memory footprint.
1818        changes.accounts.par_sort_by_key(|a| a.0);
1819        changes.storage.par_sort_by_key(|a| a.address);
1820        changes.contracts.par_sort_by_key(|a| a.0);
1821
1822        // Write new account state
1823        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1824        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1825        // write account to database.
1826        for (address, account) in changes.accounts {
1827            if let Some(account) = account {
1828                tracing::trace!(?address, "Updating plain state account");
1829                accounts_cursor.upsert(address, &account.into())?;
1830            } else if accounts_cursor.seek_exact(address)?.is_some() {
1831                tracing::trace!(?address, "Deleting plain state account");
1832                accounts_cursor.delete_current()?;
1833            }
1834        }
1835
1836        // Write bytecode
1837        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1838        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1839        for (hash, bytecode) in changes.contracts {
1840            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1841        }
1842
1843        // Write new storage state and wipe storage if needed.
1844        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1845        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1846        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1847            // Wiping of storage.
1848            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1849                storages_cursor.delete_current_duplicates()?;
1850            }
1851            // cast storages to B256.
1852            let mut storage = storage
1853                .into_iter()
1854                .map(|(k, value)| StorageEntry { key: k.into(), value })
1855                .collect::<Vec<_>>();
1856            // sort storage slots by key.
1857            storage.par_sort_unstable_by_key(|a| a.key);
1858
1859            for entry in storage {
1860                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1861                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? &&
1862                    db_entry.key == entry.key
1863                {
1864                    storages_cursor.delete_current()?;
1865                }
1866
1867                if !entry.value.is_zero() {
1868                    storages_cursor.upsert(address, &entry)?;
1869                }
1870            }
1871        }
1872
1873        Ok(())
1874    }
1875
1876    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1877        // Write hashed account updates.
1878        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1879        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
1880            if let Some(account) = account {
1881                hashed_accounts_cursor.upsert(hashed_address, &account)?;
1882            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
1883                hashed_accounts_cursor.delete_current()?;
1884            }
1885        }
1886
1887        // Write hashed storage changes.
1888        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
1889        let mut hashed_storage_cursor =
1890            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
1891        for (hashed_address, storage) in sorted_storages {
1892            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
1893                hashed_storage_cursor.delete_current_duplicates()?;
1894            }
1895
1896            for (hashed_slot, value) in storage.storage_slots_sorted() {
1897                let entry = StorageEntry { key: hashed_slot, value };
1898                if let Some(db_entry) =
1899                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)? &&
1900                    db_entry.key == entry.key
1901                {
1902                    hashed_storage_cursor.delete_current()?;
1903                }
1904
1905                if !entry.value.is_zero() {
1906                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
1907                }
1908            }
1909        }
1910
1911        Ok(())
1912    }
1913
1914    /// Remove the last N blocks of state.
1915    ///
1916    /// The latest state will be unwound
1917    ///
1918    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
1919    ///    transaction ids.
1920    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
1921    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
1922    ///    the changesets.
1923    ///    - In order to have both the old and new values in the changesets, we also access the
1924    ///      plain state tables.
1925    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
1926    ///    we:
1927    ///     1. Take the old value from the changeset
1928    ///     2. Take the new value from the plain state
1929    ///     3. Save the old value to the local state
1930    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
1931    ///    have seen before we:
1932    ///     1. Take the old value from the changeset
1933    ///     2. Take the new value from the local state
1934    ///     3. Set the local state to the value in the changeset
1935    fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> {
1936        let range = block + 1..=self.last_block_number()?;
1937
1938        if range.is_empty() {
1939            return Ok(());
1940        }
1941
1942        // We are not removing block meta as it is used to get block changesets.
1943        let block_bodies = self.block_body_indices_range(range.clone())?;
1944
1945        // get transaction receipts
1946        let from_transaction_num =
1947            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
1948
1949        let storage_range = BlockNumberAddress::range(range.clone());
1950
1951        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
1952        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
1953
1954        // This is not working for blocks that are not at tip. as plain state is not the last
1955        // state of end range. We should rename the functions or add support to access
1956        // History state. Accessing history state can be tricky but we are not gaining
1957        // anything.
1958        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
1959        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
1960
1961        let (state, _) = self.populate_bundle_state(
1962            account_changeset,
1963            storage_changeset,
1964            &mut plain_accounts_cursor,
1965            &mut plain_storage_cursor,
1966        )?;
1967
1968        // iterate over local plain state remove all account and all storages.
1969        for (address, (old_account, new_account, storage)) in &state {
1970            // revert account if needed.
1971            if old_account != new_account {
1972                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
1973                if let Some(account) = old_account {
1974                    plain_accounts_cursor.upsert(*address, account)?;
1975                } else if existing_entry.is_some() {
1976                    plain_accounts_cursor.delete_current()?;
1977                }
1978            }
1979
1980            // revert storages
1981            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
1982                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
1983                // delete previous value
1984                // TODO: This does not use dupsort features
1985                if plain_storage_cursor
1986                    .seek_by_key_subkey(*address, *storage_key)?
1987                    .filter(|s| s.key == *storage_key)
1988                    .is_some()
1989                {
1990                    plain_storage_cursor.delete_current()?
1991                }
1992
1993                // insert value if needed
1994                if !old_storage_value.is_zero() {
1995                    plain_storage_cursor.upsert(*address, &storage_entry)?;
1996                }
1997            }
1998        }
1999
2000        self.remove_receipts_from(from_transaction_num, block)?;
2001
2002        Ok(())
2003    }
2004
2005    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2006    ///
2007    /// The latest state will be unwound and returned back with all the blocks
2008    ///
2009    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2010    ///    transaction ids.
2011    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2012    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2013    ///    the changesets.
2014    ///    - In order to have both the old and new values in the changesets, we also access the
2015    ///      plain state tables.
2016    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2017    ///    we:
2018    ///     1. Take the old value from the changeset
2019    ///     2. Take the new value from the plain state
2020    ///     3. Save the old value to the local state
2021    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2022    ///    have seen before we:
2023    ///     1. Take the old value from the changeset
2024    ///     2. Take the new value from the local state
2025    ///     3. Set the local state to the value in the changeset
2026    fn take_state_above(
2027        &self,
2028        block: BlockNumber,
2029    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2030        let range = block + 1..=self.last_block_number()?;
2031
2032        if range.is_empty() {
2033            return Ok(ExecutionOutcome::default())
2034        }
2035        let start_block_number = *range.start();
2036
2037        // We are not removing block meta as it is used to get block changesets.
2038        let block_bodies = self.block_body_indices_range(range.clone())?;
2039
2040        // get transaction receipts
2041        let from_transaction_num =
2042            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2043        let to_transaction_num =
2044            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2045
2046        let storage_range = BlockNumberAddress::range(range.clone());
2047
2048        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2049        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2050
2051        // This is not working for blocks that are not at tip. as plain state is not the last
2052        // state of end range. We should rename the functions or add support to access
2053        // History state. Accessing history state can be tricky but we are not gaining
2054        // anything.
2055        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2056        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2057
2058        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2059        // remove, and return later
2060        let (state, reverts) = self.populate_bundle_state(
2061            account_changeset,
2062            storage_changeset,
2063            &mut plain_accounts_cursor,
2064            &mut plain_storage_cursor,
2065        )?;
2066
2067        // iterate over local plain state remove all account and all storages.
2068        for (address, (old_account, new_account, storage)) in &state {
2069            // revert account if needed.
2070            if old_account != new_account {
2071                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2072                if let Some(account) = old_account {
2073                    plain_accounts_cursor.upsert(*address, account)?;
2074                } else if existing_entry.is_some() {
2075                    plain_accounts_cursor.delete_current()?;
2076                }
2077            }
2078
2079            // revert storages
2080            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2081                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2082                // delete previous value
2083                // TODO: This does not use dupsort features
2084                if plain_storage_cursor
2085                    .seek_by_key_subkey(*address, *storage_key)?
2086                    .filter(|s| s.key == *storage_key)
2087                    .is_some()
2088                {
2089                    plain_storage_cursor.delete_current()?
2090                }
2091
2092                // insert value if needed
2093                if !old_storage_value.is_zero() {
2094                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2095                }
2096            }
2097        }
2098
2099        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2100        let mut receipts_iter = self
2101            .static_file_provider
2102            .get_range_with_static_file_or_database(
2103                StaticFileSegment::Receipts,
2104                from_transaction_num..to_transaction_num + 1,
2105                |static_file, range, _| {
2106                    static_file
2107                        .receipts_by_tx_range(range.clone())
2108                        .map(|r| range.into_iter().zip(r).collect())
2109                },
2110                |range, _| {
2111                    self.tx
2112                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2113                        .walk_range(range)?
2114                        .map(|r| r.map_err(Into::into))
2115                        .collect()
2116                },
2117                |_| true,
2118            )?
2119            .into_iter()
2120            .peekable();
2121
2122        let mut receipts = Vec::with_capacity(block_bodies.len());
2123        // loop break if we are at the end of the blocks.
2124        for block_body in block_bodies {
2125            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2126            for num in block_body.tx_num_range() {
2127                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2128                    block_receipts.push(receipts_iter.next().unwrap().1);
2129                }
2130            }
2131            receipts.push(block_receipts);
2132        }
2133
2134        self.remove_receipts_from(from_transaction_num, block)?;
2135
2136        Ok(ExecutionOutcome::new_init(
2137            state,
2138            reverts,
2139            Vec::new(),
2140            receipts,
2141            start_block_number,
2142            Vec::new(),
2143        ))
2144    }
2145}
2146
2147impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2148    /// Writes trie updates. Returns the number of entries modified.
2149    fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2150        if trie_updates.is_empty() {
2151            return Ok(0)
2152        }
2153
2154        // Track the number of inserted entries.
2155        let mut num_entries = 0;
2156
2157        // Merge updated and removed nodes. Updated nodes must take precedence.
2158        let mut account_updates = trie_updates
2159            .removed_nodes_ref()
2160            .iter()
2161            .filter_map(|n| {
2162                (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2163            })
2164            .collect::<Vec<_>>();
2165        account_updates.extend(
2166            trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2167        );
2168        // Sort trie node updates.
2169        account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2170
2171        let tx = self.tx_ref();
2172        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2173        for (key, updated_node) in account_updates {
2174            let nibbles = StoredNibbles(*key);
2175            match updated_node {
2176                Some(node) => {
2177                    if !nibbles.0.is_empty() {
2178                        num_entries += 1;
2179                        account_trie_cursor.upsert(nibbles, node)?;
2180                    }
2181                }
2182                None => {
2183                    num_entries += 1;
2184                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2185                        account_trie_cursor.delete_current()?;
2186                    }
2187                }
2188            }
2189        }
2190
2191        num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref().iter())?;
2192
2193        Ok(num_entries)
2194    }
2195}
2196
2197impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2198    /// Writes storage trie updates from the given storage trie map. First sorts the storage trie
2199    /// updates by the hashed address, writing in sorted order.
2200    fn write_storage_trie_updates<'a>(
2201        &self,
2202        storage_tries: impl Iterator<Item = (&'a B256, &'a StorageTrieUpdates)>,
2203    ) -> ProviderResult<usize> {
2204        let mut num_entries = 0;
2205        let mut storage_tries = storage_tries.collect::<Vec<_>>();
2206        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2207        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2208        for (hashed_address, storage_trie_updates) in storage_tries {
2209            let mut db_storage_trie_cursor =
2210                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2211            num_entries +=
2212                db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2213            cursor = db_storage_trie_cursor.cursor;
2214        }
2215
2216        Ok(num_entries)
2217    }
2218}
2219
2220impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2221    fn unwind_account_hashing<'a>(
2222        &self,
2223        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2224    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2225        // Aggregate all block changesets and make a list of accounts that have been changed.
2226        // Note that collecting and then reversing the order is necessary to ensure that the
2227        // changes are applied in the correct order.
2228        let hashed_accounts = changesets
2229            .into_iter()
2230            .map(|(_, e)| (keccak256(e.address), e.info))
2231            .collect::<Vec<_>>()
2232            .into_iter()
2233            .rev()
2234            .collect::<BTreeMap<_, _>>();
2235
2236        // Apply values to HashedState, and remove the account if it's None.
2237        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2238        for (hashed_address, account) in &hashed_accounts {
2239            if let Some(account) = account {
2240                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2241            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2242                hashed_accounts_cursor.delete_current()?;
2243            }
2244        }
2245
2246        Ok(hashed_accounts)
2247    }
2248
2249    fn unwind_account_hashing_range(
2250        &self,
2251        range: impl RangeBounds<BlockNumber>,
2252    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2253        let changesets = self
2254            .tx
2255            .cursor_read::<tables::AccountChangeSets>()?
2256            .walk_range(range)?
2257            .collect::<Result<Vec<_>, _>>()?;
2258        self.unwind_account_hashing(changesets.iter())
2259    }
2260
2261    fn insert_account_for_hashing(
2262        &self,
2263        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2264    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2265        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2266        let hashed_accounts =
2267            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2268        for (hashed_address, account) in &hashed_accounts {
2269            if let Some(account) = account {
2270                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2271            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2272                hashed_accounts_cursor.delete_current()?;
2273            }
2274        }
2275        Ok(hashed_accounts)
2276    }
2277
2278    fn unwind_storage_hashing(
2279        &self,
2280        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2281    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2282        // Aggregate all block changesets and make list of accounts that have been changed.
2283        let mut hashed_storages = changesets
2284            .into_iter()
2285            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2286                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2287            })
2288            .collect::<Vec<_>>();
2289        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2290
2291        // Apply values to HashedState, and remove the account if it's None.
2292        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2293            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2294        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2295        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2296            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2297
2298            if hashed_storage
2299                .seek_by_key_subkey(hashed_address, key)?
2300                .filter(|entry| entry.key == key)
2301                .is_some()
2302            {
2303                hashed_storage.delete_current()?;
2304            }
2305
2306            if !value.is_zero() {
2307                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2308            }
2309        }
2310        Ok(hashed_storage_keys)
2311    }
2312
2313    fn unwind_storage_hashing_range(
2314        &self,
2315        range: impl RangeBounds<BlockNumberAddress>,
2316    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2317        let changesets = self
2318            .tx
2319            .cursor_read::<tables::StorageChangeSets>()?
2320            .walk_range(range)?
2321            .collect::<Result<Vec<_>, _>>()?;
2322        self.unwind_storage_hashing(changesets.into_iter())
2323    }
2324
2325    fn insert_storage_for_hashing(
2326        &self,
2327        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2328    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2329        // hash values
2330        let hashed_storages =
2331            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2332                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2333                    map.insert(keccak256(entry.key), entry.value);
2334                    map
2335                });
2336                map.insert(keccak256(address), storage);
2337                map
2338            });
2339
2340        let hashed_storage_keys = hashed_storages
2341            .iter()
2342            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2343            .collect();
2344
2345        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2346        // Hash the address and key and apply them to HashedStorage (if Storage is None
2347        // just remove it);
2348        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2349            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2350                if hashed_storage_cursor
2351                    .seek_by_key_subkey(hashed_address, key)?
2352                    .filter(|entry| entry.key == key)
2353                    .is_some()
2354                {
2355                    hashed_storage_cursor.delete_current()?;
2356                }
2357
2358                if !value.is_zero() {
2359                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2360                }
2361                Ok(())
2362            })
2363        })?;
2364
2365        Ok(hashed_storage_keys)
2366    }
2367}
2368
2369impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2370    fn unwind_account_history_indices<'a>(
2371        &self,
2372        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2373    ) -> ProviderResult<usize> {
2374        let mut last_indices = changesets
2375            .into_iter()
2376            .map(|(index, account)| (account.address, *index))
2377            .collect::<Vec<_>>();
2378        last_indices.sort_by_key(|(a, _)| *a);
2379
2380        // Unwind the account history index.
2381        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2382        for &(address, rem_index) in &last_indices {
2383            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2384                &mut cursor,
2385                ShardedKey::last(address),
2386                rem_index,
2387                |sharded_key| sharded_key.key == address,
2388            )?;
2389
2390            // Check the last returned partial shard.
2391            // If it's not empty, the shard needs to be reinserted.
2392            if !partial_shard.is_empty() {
2393                cursor.insert(
2394                    ShardedKey::last(address),
2395                    &BlockNumberList::new_pre_sorted(partial_shard),
2396                )?;
2397            }
2398        }
2399
2400        let changesets = last_indices.len();
2401        Ok(changesets)
2402    }
2403
2404    fn unwind_account_history_indices_range(
2405        &self,
2406        range: impl RangeBounds<BlockNumber>,
2407    ) -> ProviderResult<usize> {
2408        let changesets = self
2409            .tx
2410            .cursor_read::<tables::AccountChangeSets>()?
2411            .walk_range(range)?
2412            .collect::<Result<Vec<_>, _>>()?;
2413        self.unwind_account_history_indices(changesets.iter())
2414    }
2415
2416    fn insert_account_history_index(
2417        &self,
2418        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2419    ) -> ProviderResult<()> {
2420        self.append_history_index::<_, tables::AccountsHistory>(
2421            account_transitions,
2422            ShardedKey::new,
2423        )
2424    }
2425
2426    fn unwind_storage_history_indices(
2427        &self,
2428        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2429    ) -> ProviderResult<usize> {
2430        let mut storage_changesets = changesets
2431            .into_iter()
2432            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2433            .collect::<Vec<_>>();
2434        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2435
2436        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2437        for &(address, storage_key, rem_index) in &storage_changesets {
2438            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2439                &mut cursor,
2440                StorageShardedKey::last(address, storage_key),
2441                rem_index,
2442                |storage_sharded_key| {
2443                    storage_sharded_key.address == address &&
2444                        storage_sharded_key.sharded_key.key == storage_key
2445                },
2446            )?;
2447
2448            // Check the last returned partial shard.
2449            // If it's not empty, the shard needs to be reinserted.
2450            if !partial_shard.is_empty() {
2451                cursor.insert(
2452                    StorageShardedKey::last(address, storage_key),
2453                    &BlockNumberList::new_pre_sorted(partial_shard),
2454                )?;
2455            }
2456        }
2457
2458        let changesets = storage_changesets.len();
2459        Ok(changesets)
2460    }
2461
2462    fn unwind_storage_history_indices_range(
2463        &self,
2464        range: impl RangeBounds<BlockNumberAddress>,
2465    ) -> ProviderResult<usize> {
2466        let changesets = self
2467            .tx
2468            .cursor_read::<tables::StorageChangeSets>()?
2469            .walk_range(range)?
2470            .collect::<Result<Vec<_>, _>>()?;
2471        self.unwind_storage_history_indices(changesets.into_iter())
2472    }
2473
2474    fn insert_storage_history_index(
2475        &self,
2476        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2477    ) -> ProviderResult<()> {
2478        self.append_history_index::<_, tables::StoragesHistory>(
2479            storage_transitions,
2480            |(address, storage_key), highest_block_number| {
2481                StorageShardedKey::new(address, storage_key, highest_block_number)
2482            },
2483        )
2484    }
2485
2486    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2487        // account history stage
2488        {
2489            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2490            self.insert_account_history_index(indices)?;
2491        }
2492
2493        // storage history stage
2494        {
2495            let indices = self.changed_storages_and_blocks_with_range(range)?;
2496            self.insert_storage_history_index(indices)?;
2497        }
2498
2499        Ok(())
2500    }
2501}
2502
2503impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2504    for DatabaseProvider<TX, N>
2505{
2506    fn take_block_and_execution_above(
2507        &self,
2508        block: BlockNumber,
2509    ) -> ProviderResult<Chain<Self::Primitives>> {
2510        let range = block + 1..=self.last_block_number()?;
2511
2512        self.unwind_trie_state_range(range.clone())?;
2513
2514        // get execution res
2515        let execution_state = self.take_state_above(block)?;
2516
2517        let blocks = self.recovered_block_range(range)?;
2518
2519        // remove block bodies it is needed for both get block range and get block execution results
2520        // that is why it is deleted afterwards.
2521        self.remove_blocks_above(block)?;
2522
2523        // Update pipeline progress
2524        self.update_pipeline_stages(block, true)?;
2525
2526        Ok(Chain::new(blocks, execution_state, None))
2527    }
2528
2529    fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> {
2530        let range = block + 1..=self.last_block_number()?;
2531
2532        self.unwind_trie_state_range(range)?;
2533
2534        // remove execution res
2535        self.remove_state_above(block)?;
2536
2537        // remove block bodies it is needed for both get block range and get block execution results
2538        // that is why it is deleted afterwards.
2539        self.remove_blocks_above(block)?;
2540
2541        // Update pipeline progress
2542        self.update_pipeline_stages(block, true)?;
2543
2544        Ok(())
2545    }
2546}
2547
2548impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2549    for DatabaseProvider<TX, N>
2550{
2551    type Block = BlockTy<N>;
2552    type Receipt = ReceiptTy<N>;
2553
2554    /// Inserts the block into the database, always modifying the following static file segments and
2555    /// tables:
2556    /// * [`StaticFileSegment::Headers`]
2557    /// * [`tables::HeaderNumbers`]
2558    /// * [`tables::HeaderTerminalDifficulties`]
2559    /// * [`tables::BlockBodyIndices`]
2560    ///
2561    /// If there are transactions in the block, the following static file segments and tables will
2562    /// be modified:
2563    /// * [`StaticFileSegment::Transactions`]
2564    /// * [`tables::TransactionBlocks`]
2565    ///
2566    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2567    /// If withdrawals are not empty, this will modify
2568    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2569    ///
2570    /// If the provider has __not__ configured full sender pruning, this will modify
2571    /// [`TransactionSenders`](tables::TransactionSenders).
2572    ///
2573    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2574    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2575    fn insert_block(
2576        &self,
2577        block: RecoveredBlock<Self::Block>,
2578    ) -> ProviderResult<StoredBlockBodyIndices> {
2579        let block_number = block.number();
2580
2581        let mut durations_recorder = metrics::DurationsRecorder::default();
2582
2583        // total difficulty
2584        let ttd = if block_number == 0 {
2585            block.header().difficulty()
2586        } else {
2587            let parent_block_number = block_number - 1;
2588            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2589            durations_recorder.record_relative(metrics::Action::GetParentTD);
2590            parent_ttd + block.header().difficulty()
2591        };
2592
2593        self.static_file_provider
2594            .get_writer(block_number, StaticFileSegment::Headers)?
2595            .append_header(block.header(), ttd, &block.hash())?;
2596
2597        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2598        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2599
2600        let mut next_tx_num = self
2601            .tx
2602            .cursor_read::<tables::TransactionBlocks>()?
2603            .last()?
2604            .map(|(n, _)| n + 1)
2605            .unwrap_or_default();
2606        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2607        let first_tx_num = next_tx_num;
2608
2609        let tx_count = block.body().transaction_count() as u64;
2610
2611        // Ensures we have all the senders for the block's transactions.
2612        for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2613            let hash = transaction.tx_hash();
2614
2615            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2616                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2617            }
2618
2619            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2620                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2621            }
2622            next_tx_num += 1;
2623        }
2624
2625        self.append_block_bodies(vec![(block_number, Some(block.into_body()))])?;
2626
2627        debug!(
2628            target: "providers::db",
2629            ?block_number,
2630            actions = ?durations_recorder.actions,
2631            "Inserted block"
2632        );
2633
2634        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2635    }
2636
2637    fn append_block_bodies(
2638        &self,
2639        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2640    ) -> ProviderResult<()> {
2641        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2642
2643        // Initialize writer if we will be writing transactions to staticfiles
2644        let mut tx_writer =
2645            self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?;
2646
2647        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2648        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2649
2650        // Get id for the next tx_num or zero if there are no transactions.
2651        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2652
2653        for (block_number, body) in &bodies {
2654            // Increment block on static file header.
2655            tx_writer.increment_block(*block_number)?;
2656
2657            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2658            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2659
2660            let mut durations_recorder = metrics::DurationsRecorder::default();
2661
2662            // insert block meta
2663            block_indices_cursor.append(*block_number, &block_indices)?;
2664
2665            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2666
2667            let Some(body) = body else { continue };
2668
2669            // write transaction block index
2670            if !body.transactions().is_empty() {
2671                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2672                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2673            }
2674
2675            // write transactions
2676            for transaction in body.transactions() {
2677                tx_writer.append_transaction(next_tx_num, transaction)?;
2678
2679                // Increment transaction id for each transaction.
2680                next_tx_num += 1;
2681            }
2682        }
2683
2684        self.storage.writer().write_block_bodies(self, bodies)?;
2685
2686        Ok(())
2687    }
2688
2689    fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> {
2690        // Clean up HeaderNumbers for blocks being removed, we must clear all indexes from MDBX.
2691        for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2692            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2693        }
2694
2695        // Get highest static file block for the total block range
2696        let highest_static_file_block = self
2697            .static_file_provider()
2698            .get_highest_static_file_block(StaticFileSegment::Headers)
2699            .expect("todo: error handling, headers should exist");
2700
2701        // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
2702        // we remove only what is ABOVE the block.
2703        //
2704        // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
2705        // will have three blocks to remove, which will be block 8, 7, and 6.
2706        debug!(target: "providers::db", ?block, "Removing static file blocks above block_number");
2707        self.static_file_provider()
2708            .get_writer(block, StaticFileSegment::Headers)?
2709            .prune_headers(highest_static_file_block.saturating_sub(block))?;
2710
2711        // First transaction to be removed
2712        let unwind_tx_from = self
2713            .block_body_indices(block)?
2714            .map(|b| b.next_tx_num())
2715            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2716
2717        // Last transaction to be removed
2718        let unwind_tx_to = self
2719            .tx
2720            .cursor_read::<tables::BlockBodyIndices>()?
2721            .last()?
2722            // shouldn't happen because this was OK above
2723            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2724            .1
2725            .last_tx_num();
2726
2727        if unwind_tx_from <= unwind_tx_to {
2728            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2729                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2730            }
2731        }
2732
2733        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
2734
2735        self.remove_bodies_above(block)?;
2736
2737        Ok(())
2738    }
2739
2740    fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> {
2741        self.storage.writer().remove_block_bodies_above(self, block)?;
2742
2743        // First transaction to be removed
2744        let unwind_tx_from = self
2745            .block_body_indices(block)?
2746            .map(|b| b.next_tx_num())
2747            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2748
2749        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
2750        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
2751
2752        let static_file_tx_num =
2753            self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);
2754
2755        let to_delete = static_file_tx_num
2756            .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
2757            .unwrap_or_default();
2758
2759        self.static_file_provider
2760            .latest_writer(StaticFileSegment::Transactions)?
2761            .prune_transactions(to_delete, block)?;
2762
2763        Ok(())
2764    }
2765
2766    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
2767    fn append_blocks_with_state(
2768        &self,
2769        blocks: Vec<RecoveredBlock<Self::Block>>,
2770        execution_outcome: &ExecutionOutcome<Self::Receipt>,
2771        hashed_state: HashedPostStateSorted,
2772    ) -> ProviderResult<()> {
2773        if blocks.is_empty() {
2774            debug!(target: "providers::db", "Attempted to append empty block range");
2775            return Ok(())
2776        }
2777
2778        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
2779        // `None`.
2780        let first_number = blocks[0].number();
2781
2782        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
2783        // `None`.
2784        let last_block_number = blocks[blocks.len() - 1].number();
2785
2786        let mut durations_recorder = metrics::DurationsRecorder::default();
2787
2788        // Insert the blocks
2789        for block in blocks {
2790            self.insert_block(block)?;
2791            durations_recorder.record_relative(metrics::Action::InsertBlock);
2792        }
2793
2794        self.write_state(execution_outcome, OriginalValuesKnown::No)?;
2795        durations_recorder.record_relative(metrics::Action::InsertState);
2796
2797        // insert hashes and intermediate merkle nodes
2798        self.write_hashed_state(&hashed_state)?;
2799        durations_recorder.record_relative(metrics::Action::InsertHashes);
2800
2801        self.update_history_indices(first_number..=last_block_number)?;
2802        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
2803
2804        // Update pipeline progress
2805        self.update_pipeline_stages(last_block_number, false)?;
2806        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
2807
2808        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
2809
2810        Ok(())
2811    }
2812}
2813
2814impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
2815    fn get_prune_checkpoint(
2816        &self,
2817        segment: PruneSegment,
2818    ) -> ProviderResult<Option<PruneCheckpoint>> {
2819        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
2820    }
2821
2822    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
2823        Ok(self
2824            .tx
2825            .cursor_read::<tables::PruneCheckpoints>()?
2826            .walk(None)?
2827            .collect::<Result<_, _>>()?)
2828    }
2829}
2830
2831impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
2832    fn save_prune_checkpoint(
2833        &self,
2834        segment: PruneSegment,
2835        checkpoint: PruneCheckpoint,
2836    ) -> ProviderResult<()> {
2837        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
2838    }
2839}
2840
2841impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
2842    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2843        let db_entries = self.tx.entries::<T>()?;
2844        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
2845            Ok(entries) => entries,
2846            Err(ProviderError::UnsupportedProvider) => 0,
2847            Err(err) => return Err(err),
2848        };
2849
2850        Ok(db_entries + static_file_entries)
2851    }
2852}
2853
2854impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
2855    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
2856        let mut finalized_blocks = self
2857            .tx
2858            .cursor_read::<tables::ChainState>()?
2859            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
2860            .take(1)
2861            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
2862
2863        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
2864        Ok(last_finalized_block_number)
2865    }
2866
2867    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
2868        let mut finalized_blocks = self
2869            .tx
2870            .cursor_read::<tables::ChainState>()?
2871            .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
2872            .take(1)
2873            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
2874
2875        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
2876        Ok(last_finalized_block_number)
2877    }
2878}
2879
2880impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
2881    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
2882        Ok(self
2883            .tx
2884            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
2885    }
2886
2887    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
2888        Ok(self
2889            .tx
2890            .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
2891    }
2892}
2893
2894impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
2895    type Tx = TX;
2896
2897    fn tx_ref(&self) -> &Self::Tx {
2898        &self.tx
2899    }
2900
2901    fn tx_mut(&mut self) -> &mut Self::Tx {
2902        &mut self.tx
2903    }
2904
2905    fn into_tx(self) -> Self::Tx {
2906        self.tx
2907    }
2908
2909    fn prune_modes_ref(&self) -> &PruneModes {
2910        self.prune_modes_ref()
2911    }
2912
2913    /// Commit database transaction and static files.
2914    fn commit(self) -> ProviderResult<bool> {
2915        // For unwinding it makes more sense to commit the database first, since if
2916        // it is interrupted before the static files commit, we can just
2917        // truncate the static files according to the
2918        // checkpoints on the next start-up.
2919        if self.static_file_provider.has_unwind_queued() {
2920            self.tx.commit()?;
2921            self.static_file_provider.commit()?;
2922        } else {
2923            self.static_file_provider.commit()?;
2924            self.tx.commit()?;
2925        }
2926
2927        Ok(true)
2928    }
2929}
2930
2931#[cfg(test)]
2932mod tests {
2933    use super::*;
2934    use crate::{
2935        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
2936        BlockWriter,
2937    };
2938    use reth_testing_utils::generators::{self, random_block, BlockParams};
2939
2940    #[test]
2941    fn test_receipts_by_block_range_empty_range() {
2942        let factory = create_test_provider_factory();
2943        let provider = factory.provider().unwrap();
2944
2945        // empty range should return empty vec
2946        let start = 10u64;
2947        let end = 9u64;
2948        let result = provider.receipts_by_block_range(start..=end).unwrap();
2949        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
2950    }
2951
2952    #[test]
2953    fn test_receipts_by_block_range_nonexistent_blocks() {
2954        let factory = create_test_provider_factory();
2955        let provider = factory.provider().unwrap();
2956
2957        // non-existent blocks should return empty vecs for each block
2958        let result = provider.receipts_by_block_range(10..=12).unwrap();
2959        assert_eq!(result, vec![vec![], vec![], vec![]]);
2960    }
2961
2962    #[test]
2963    fn test_receipts_by_block_range_single_block() {
2964        let factory = create_test_provider_factory();
2965        let data = BlockchainTestData::default();
2966
2967        let provider_rw = factory.provider_rw().unwrap();
2968        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
2969        provider_rw
2970            .write_state(
2971                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
2972                crate::OriginalValuesKnown::No,
2973            )
2974            .unwrap();
2975        provider_rw.insert_block(data.blocks[0].0.clone()).unwrap();
2976        provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap();
2977        provider_rw.commit().unwrap();
2978
2979        let provider = factory.provider().unwrap();
2980        let result = provider.receipts_by_block_range(1..=1).unwrap();
2981
2982        // should have one vec with one receipt
2983        assert_eq!(result.len(), 1);
2984        assert_eq!(result[0].len(), 1);
2985        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
2986    }
2987
2988    #[test]
2989    fn test_receipts_by_block_range_multiple_blocks() {
2990        let factory = create_test_provider_factory();
2991        let data = BlockchainTestData::default();
2992
2993        let provider_rw = factory.provider_rw().unwrap();
2994        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
2995        provider_rw
2996            .write_state(
2997                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
2998                crate::OriginalValuesKnown::No,
2999            )
3000            .unwrap();
3001        for i in 0..3 {
3002            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3003            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3004        }
3005        provider_rw.commit().unwrap();
3006
3007        let provider = factory.provider().unwrap();
3008        let result = provider.receipts_by_block_range(1..=3).unwrap();
3009
3010        // should have 3 vecs, each with one receipt
3011        assert_eq!(result.len(), 3);
3012        for (i, block_receipts) in result.iter().enumerate() {
3013            assert_eq!(block_receipts.len(), 1);
3014            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3015        }
3016    }
3017
3018    #[test]
3019    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3020        let factory = create_test_provider_factory();
3021        let data = BlockchainTestData::default();
3022
3023        let provider_rw = factory.provider_rw().unwrap();
3024        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3025        provider_rw
3026            .write_state(
3027                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3028                crate::OriginalValuesKnown::No,
3029            )
3030            .unwrap();
3031
3032        // insert blocks 1-3 with receipts
3033        for i in 0..3 {
3034            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3035            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3036        }
3037        provider_rw.commit().unwrap();
3038
3039        let provider = factory.provider().unwrap();
3040        let result = provider.receipts_by_block_range(1..=3).unwrap();
3041
3042        // verify each block has one receipt
3043        assert_eq!(result.len(), 3);
3044        for block_receipts in &result {
3045            assert_eq!(block_receipts.len(), 1);
3046        }
3047    }
3048
3049    #[test]
3050    fn test_receipts_by_block_range_partial_range() {
3051        let factory = create_test_provider_factory();
3052        let data = BlockchainTestData::default();
3053
3054        let provider_rw = factory.provider_rw().unwrap();
3055        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3056        provider_rw
3057            .write_state(
3058                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3059                crate::OriginalValuesKnown::No,
3060            )
3061            .unwrap();
3062        for i in 0..3 {
3063            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3064            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3065        }
3066        provider_rw.commit().unwrap();
3067
3068        let provider = factory.provider().unwrap();
3069
3070        // request range that includes both existing and non-existing blocks
3071        let result = provider.receipts_by_block_range(2..=5).unwrap();
3072        assert_eq!(result.len(), 4);
3073
3074        // blocks 2-3 should have receipts, blocks 4-5 should be empty
3075        assert_eq!(result[0].len(), 1); // block 2
3076        assert_eq!(result[1].len(), 1); // block 3
3077        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
3078        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
3079
3080        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3081        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3082    }
3083
3084    #[test]
3085    fn test_receipts_by_block_range_all_empty_blocks() {
3086        let factory = create_test_provider_factory();
3087        let mut rng = generators::rng();
3088
3089        // create blocks with no transactions
3090        let mut blocks = Vec::new();
3091        for i in 0..3 {
3092            let block =
3093                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3094            blocks.push(block);
3095        }
3096
3097        let provider_rw = factory.provider_rw().unwrap();
3098        for block in blocks {
3099            provider_rw.insert_block(block.try_recover().unwrap()).unwrap();
3100        }
3101        provider_rw.commit().unwrap();
3102
3103        let provider = factory.provider().unwrap();
3104        let result = provider.receipts_by_block_range(1..=3).unwrap();
3105
3106        assert_eq!(result.len(), 3);
3107        for block_receipts in result {
3108            assert_eq!(block_receipts.len(), 0);
3109        }
3110    }
3111
3112    #[test]
3113    fn test_receipts_by_block_range_consistency_with_individual_calls() {
3114        let factory = create_test_provider_factory();
3115        let data = BlockchainTestData::default();
3116
3117        let provider_rw = factory.provider_rw().unwrap();
3118        provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap();
3119        provider_rw
3120            .write_state(
3121                &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() },
3122                crate::OriginalValuesKnown::No,
3123            )
3124            .unwrap();
3125        for i in 0..3 {
3126            provider_rw.insert_block(data.blocks[i].0.clone()).unwrap();
3127            provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap();
3128        }
3129        provider_rw.commit().unwrap();
3130
3131        let provider = factory.provider().unwrap();
3132
3133        // get receipts using block range method
3134        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3135
3136        // get receipts using individual block calls
3137        let mut individual_results = Vec::new();
3138        for block_num in 1..=3 {
3139            let receipts =
3140                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3141            individual_results.push(receipts);
3142        }
3143
3144        assert_eq!(range_result, individual_results);
3145    }
3146}