reth_provider/providers/database/
provider.rs

1use crate::{
2    bundle_state::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        static_file::StaticFileWriter,
6        NodeTypesForProvider, StaticFileProvider,
7    },
8    to_range,
9    traits::{
10        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11    },
12    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14    DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15    HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16    OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17    StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
18    StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
19    TransactionsProviderExt, TrieWriter,
20};
21use alloy_consensus::{
22    transaction::{SignerRecoverable, TransactionMeta},
23    BlockHeader, Header, TxReceipt,
24};
25use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
26use alloy_primitives::{
27    keccak256,
28    map::{hash_map, B256Map, HashMap, HashSet},
29    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
30};
31use itertools::Itertools;
32use rayon::slice::ParallelSliceMut;
33use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
34use reth_db_api::{
35    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
36    database::Database,
37    models::{
38        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
39        ShardedKey, StoredBlockBodyIndices,
40    },
41    table::Table,
42    tables,
43    transaction::{DbTx, DbTxMut},
44    BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
45};
46use reth_execution_types::{Chain, ExecutionOutcome};
47use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
48use reth_primitives_traits::{
49    Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
50    SealedHeader, SignedTransaction, StorageEntry,
51};
52use reth_prune_types::{
53    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
54};
55use reth_stages_types::{StageCheckpoint, StageId};
56use reth_static_file_types::StaticFileSegment;
57use reth_storage_api::{
58    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, StateProvider,
59    StorageChangeSetReader, TryIntoHistoricalStateProvider,
60};
61use reth_storage_errors::provider::{ProviderResult, RootMismatch};
62use reth_trie::{
63    prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
64    updates::{StorageTrieUpdates, TrieUpdates},
65    HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
66};
67use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
68use revm_database::states::{
69    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
70};
71use std::{
72    cmp::Ordering,
73    collections::{BTreeMap, BTreeSet},
74    fmt::Debug,
75    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
76    sync::{mpsc, Arc},
77};
78use tracing::{debug, trace};
79
80/// A [`DatabaseProvider`] that holds a read-only database transaction.
81pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
82
83/// A [`DatabaseProvider`] that holds a read-write database transaction.
84///
85/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
86/// Once that issue is solved, we can probably revert back to being an alias type.
87#[derive(Debug)]
88pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
89    pub DatabaseProvider<<DB as Database>::TXMut, N>,
90);
91
92impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
93    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
94
95    fn deref(&self) -> &Self::Target {
96        &self.0
97    }
98}
99
100impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
101    fn deref_mut(&mut self) -> &mut Self::Target {
102        &mut self.0
103    }
104}
105
106impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
107    for DatabaseProviderRW<DB, N>
108{
109    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
110        &self.0
111    }
112}
113
114impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
115    /// Commit database transaction and static file if it exists.
116    pub fn commit(self) -> ProviderResult<bool> {
117        self.0.commit()
118    }
119
120    /// Consume `DbTx` or `DbTxMut`.
121    pub fn into_tx(self) -> <DB as Database>::TXMut {
122        self.0.into_tx()
123    }
124}
125
126impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
127    for DatabaseProvider<<DB as Database>::TXMut, N>
128{
129    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
130        provider.0
131    }
132}
133
134/// A provider struct that fetches data from the database.
135/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
136#[derive(Debug)]
137pub struct DatabaseProvider<TX, N: NodeTypes> {
138    /// Database transaction.
139    tx: TX,
140    /// Chain spec
141    chain_spec: Arc<N::ChainSpec>,
142    /// Static File provider
143    static_file_provider: StaticFileProvider<N::Primitives>,
144    /// Pruning configuration
145    prune_modes: PruneModes,
146    /// Node storage handler.
147    storage: Arc<N::Storage>,
148}
149
150impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
151    /// Returns reference to prune modes.
152    pub const fn prune_modes_ref(&self) -> &PruneModes {
153        &self.prune_modes
154    }
155}
156
157impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
158    /// State provider for latest state
159    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
160        trace!(target: "providers::db", "Returning latest state provider");
161        Box::new(LatestStateProviderRef::new(self))
162    }
163
164    /// Storage provider for state at that given block hash
165    pub fn history_by_block_hash<'a>(
166        &'a self,
167        block_hash: BlockHash,
168    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
169        let mut block_number =
170            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
171        if block_number == self.best_block_number().unwrap_or_default() &&
172            block_number == self.last_block_number().unwrap_or_default()
173        {
174            return Ok(Box::new(LatestStateProviderRef::new(self)))
175        }
176
177        // +1 as the changeset that we want is the one that was applied after this block.
178        block_number += 1;
179
180        let account_history_prune_checkpoint =
181            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
182        let storage_history_prune_checkpoint =
183            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
184
185        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
186
187        // If we pruned account or storage history, we can't return state on every historical block.
188        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
189        if let Some(prune_checkpoint_block_number) =
190            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
191        {
192            state_provider = state_provider.with_lowest_available_account_history_block_number(
193                prune_checkpoint_block_number + 1,
194            );
195        }
196        if let Some(prune_checkpoint_block_number) =
197            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
198        {
199            state_provider = state_provider.with_lowest_available_storage_history_block_number(
200                prune_checkpoint_block_number + 1,
201            );
202        }
203
204        Ok(Box::new(state_provider))
205    }
206
207    #[cfg(feature = "test-utils")]
208    /// Sets the prune modes for provider.
209    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
210        self.prune_modes = prune_modes;
211    }
212}
213
214impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
215    type Primitives = N::Primitives;
216}
217
218impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
219    /// Returns a static file provider
220    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
221        self.static_file_provider.clone()
222    }
223}
224
225impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
226    for DatabaseProvider<TX, N>
227{
228    type ChainSpec = N::ChainSpec;
229
230    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
231        self.chain_spec.clone()
232    }
233}
234
235impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
236    /// Creates a provider with an inner read-write transaction.
237    pub const fn new_rw(
238        tx: TX,
239        chain_spec: Arc<N::ChainSpec>,
240        static_file_provider: StaticFileProvider<N::Primitives>,
241        prune_modes: PruneModes,
242        storage: Arc<N::Storage>,
243    ) -> Self {
244        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
245    }
246}
247
248impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
249    fn as_ref(&self) -> &Self {
250        self
251    }
252}
253
254impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
255    /// Unwinds trie state for the given range.
256    ///
257    /// This includes calculating the resulted state root and comparing it with the parent block
258    /// state root.
259    pub fn unwind_trie_state_range(
260        &self,
261        range: RangeInclusive<BlockNumber>,
262    ) -> ProviderResult<()> {
263        let changed_accounts = self
264            .tx
265            .cursor_read::<tables::AccountChangeSets>()?
266            .walk_range(range.clone())?
267            .collect::<Result<Vec<_>, _>>()?;
268
269        // Unwind account hashes. Add changed accounts to account prefix set.
270        let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
271        let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
272        let mut destroyed_accounts = HashSet::default();
273        for (hashed_address, account) in hashed_addresses {
274            account_prefix_set.insert(Nibbles::unpack(hashed_address));
275            if account.is_none() {
276                destroyed_accounts.insert(hashed_address);
277            }
278        }
279
280        // Unwind account history indices.
281        self.unwind_account_history_indices(changed_accounts.iter())?;
282        let storage_range = BlockNumberAddress::range(range.clone());
283
284        let changed_storages = self
285            .tx
286            .cursor_read::<tables::StorageChangeSets>()?
287            .walk_range(storage_range)?
288            .collect::<Result<Vec<_>, _>>()?;
289
290        // Unwind storage hashes. Add changed account and storage keys to corresponding prefix
291        // sets.
292        let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
293        let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
294        for (hashed_address, hashed_slots) in storage_entries {
295            account_prefix_set.insert(Nibbles::unpack(hashed_address));
296            let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
297            for slot in hashed_slots {
298                storage_prefix_set.insert(Nibbles::unpack(slot));
299            }
300            storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
301        }
302
303        // Unwind storage history indices.
304        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
305
306        // Calculate the reverted merkle root.
307        // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
308        // are pre-loaded.
309        let prefix_sets = TriePrefixSets {
310            account_prefix_set: account_prefix_set.freeze(),
311            storage_prefix_sets,
312            destroyed_accounts,
313        };
314        let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
315            .with_prefix_sets(prefix_sets)
316            .root_with_updates()
317            .map_err(reth_db_api::DatabaseError::from)?;
318
319        let parent_number = range.start().saturating_sub(1);
320        let parent_state_root = self
321            .header_by_number(parent_number)?
322            .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
323            .state_root();
324
325        // state root should be always correct as we are reverting state.
326        // but for sake of double verification we will check it again.
327        if new_state_root != parent_state_root {
328            let parent_hash = self
329                .block_hash(parent_number)?
330                .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
331            return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
332                root: GotExpected { got: new_state_root, expected: parent_state_root },
333                block_number: parent_number,
334                block_hash: parent_hash,
335            })))
336        }
337        self.write_trie_updates(&trie_updates)?;
338
339        Ok(())
340    }
341
342    /// Removes receipts from all transactions starting with provided number (inclusive).
343    fn remove_receipts_from(
344        &self,
345        from_tx: TxNumber,
346        last_block: BlockNumber,
347        remove_from: StorageLocation,
348    ) -> ProviderResult<()> {
349        if remove_from.database() {
350            // iterate over block body and remove receipts
351            self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
352        }
353
354        if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
355            let static_file_receipt_num =
356                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
357
358            let to_delete = static_file_receipt_num
359                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
360                .unwrap_or_default();
361
362            self.static_file_provider
363                .latest_writer(StaticFileSegment::Receipts)?
364                .prune_receipts(to_delete, last_block)?;
365        }
366
367        Ok(())
368    }
369}
370
371impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
372    fn try_into_history_at_block(
373        self,
374        mut block_number: BlockNumber,
375    ) -> ProviderResult<StateProviderBox> {
376        // if the block number is the same as the currently best block number on disk we can use the
377        // latest state provider here
378        if block_number == self.best_block_number().unwrap_or_default() {
379            return Ok(Box::new(LatestStateProvider::new(self)))
380        }
381
382        // +1 as the changeset that we want is the one that was applied after this block.
383        block_number += 1;
384
385        let account_history_prune_checkpoint =
386            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
387        let storage_history_prune_checkpoint =
388            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
389
390        let mut state_provider = HistoricalStateProvider::new(self, block_number);
391
392        // If we pruned account or storage history, we can't return state on every historical block.
393        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
394        if let Some(prune_checkpoint_block_number) =
395            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
396        {
397            state_provider = state_provider.with_lowest_available_account_history_block_number(
398                prune_checkpoint_block_number + 1,
399            );
400        }
401        if let Some(prune_checkpoint_block_number) =
402            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
403        {
404            state_provider = state_provider.with_lowest_available_storage_history_block_number(
405                prune_checkpoint_block_number + 1,
406            );
407        }
408
409        Ok(Box::new(state_provider))
410    }
411}
412
413impl<
414        Tx: DbTx + DbTxMut + 'static,
415        N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
416    > DatabaseProvider<Tx, N>
417{
418    // TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev.
419    // #[cfg(any(test, feature = "test-utils"))]
420    /// Inserts an historical block. **Used for setting up test environments**
421    pub fn insert_historical_block(
422        &self,
423        block: RecoveredBlock<<Self as BlockWriter>::Block>,
424    ) -> ProviderResult<StoredBlockBodyIndices> {
425        let ttd = if block.number() == 0 {
426            block.header().difficulty()
427        } else {
428            let parent_block_number = block.number() - 1;
429            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
430            parent_ttd + block.header().difficulty()
431        };
432
433        let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
434
435        // Backfill: some tests start at a forward block number, but static files require no gaps.
436        let segment_header = writer.user_header();
437        if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
438            for block_number in 0..block.number() {
439                let mut prev = block.clone_header();
440                prev.number = block_number;
441                writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
442            }
443        }
444
445        writer.append_header(block.header(), ttd, &block.hash())?;
446
447        self.insert_block(block, StorageLocation::Database)
448    }
449}
450
451/// For a given key, unwind all history shards that contain block numbers at or above the given
452/// block number.
453///
454/// S - Sharded key subtype.
455/// T - Table to walk over.
456/// C - Cursor implementation.
457///
458/// This function walks the entries from the given start key and deletes all shards that belong to
459/// the key and contain block numbers at or above the given block number. Shards entirely below
460/// the block number are preserved.
461///
462/// The boundary shard (the shard that spans across the block number) is removed from the database.
463/// Any indices that are below the block number are filtered out and returned for reinsertion.
464/// The boundary shard is returned for reinsertion (if it's not empty).
465fn unwind_history_shards<S, T, C>(
466    cursor: &mut C,
467    start_key: T::Key,
468    block_number: BlockNumber,
469    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
470) -> ProviderResult<Vec<u64>>
471where
472    T: Table<Value = BlockNumberList>,
473    T::Key: AsRef<ShardedKey<S>>,
474    C: DbCursorRO<T> + DbCursorRW<T>,
475{
476    // Start from the given key and iterate through shards
477    let mut item = cursor.seek_exact(start_key)?;
478    while let Some((sharded_key, list)) = item {
479        // If the shard does not belong to the key, break.
480        if !shard_belongs_to_key(&sharded_key) {
481            break
482        }
483
484        // Always delete the current shard from the database first
485        // We'll decide later what (if anything) to reinsert
486        cursor.delete_current()?;
487
488        // Get the first (lowest) block number in this shard
489        // All block numbers in a shard are sorted in ascending order
490        let first = list.iter().next().expect("List can't be empty");
491
492        // Case 1: Entire shard is at or above the unwinding point
493        // Keep it deleted (don't return anything for reinsertion)
494        if first >= block_number {
495            item = cursor.prev()?;
496            continue
497        }
498        // Case 2: This is a boundary shard (spans across the unwinding point)
499        // The shard contains some blocks below and some at/above the unwinding point
500        else if block_number <= sharded_key.as_ref().highest_block_number {
501            // Return only the block numbers that are below the unwinding point
502            // These will be reinserted to preserve the historical data
503            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
504        }
505        // Case 3: Entire shard is below the unwinding point
506        // Return all block numbers for reinsertion (preserve entire shard)
507        return Ok(list.iter().collect::<Vec<_>>())
508    }
509
510    // No shards found or all processed
511    Ok(Vec::new())
512}
513
514impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
515    /// Creates a provider with an inner read-only transaction.
516    pub const fn new(
517        tx: TX,
518        chain_spec: Arc<N::ChainSpec>,
519        static_file_provider: StaticFileProvider<N::Primitives>,
520        prune_modes: PruneModes,
521        storage: Arc<N::Storage>,
522    ) -> Self {
523        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
524    }
525
526    /// Consume `DbTx` or `DbTxMut`.
527    pub fn into_tx(self) -> TX {
528        self.tx
529    }
530
531    /// Pass `DbTx` or `DbTxMut` mutable reference.
532    pub const fn tx_mut(&mut self) -> &mut TX {
533        &mut self.tx
534    }
535
536    /// Pass `DbTx` or `DbTxMut` immutable reference.
537    pub const fn tx_ref(&self) -> &TX {
538        &self.tx
539    }
540
541    /// Returns a reference to the chain specification.
542    pub fn chain_spec(&self) -> &N::ChainSpec {
543        &self.chain_spec
544    }
545}
546
547impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
548    fn transactions_by_tx_range_with_cursor<C>(
549        &self,
550        range: impl RangeBounds<TxNumber>,
551        cursor: &mut C,
552    ) -> ProviderResult<Vec<TxTy<N>>>
553    where
554        C: DbCursorRO<tables::Transactions<TxTy<N>>>,
555    {
556        self.static_file_provider.get_range_with_static_file_or_database(
557            StaticFileSegment::Transactions,
558            to_range(range),
559            |static_file, range, _| static_file.transactions_by_tx_range(range),
560            |range, _| self.cursor_collect(cursor, range),
561            |_| true,
562        )
563    }
564
565    fn recovered_block<H, HF, B, BF>(
566        &self,
567        id: BlockHashOrNumber,
568        _transaction_kind: TransactionVariant,
569        header_by_number: HF,
570        construct_block: BF,
571    ) -> ProviderResult<Option<B>>
572    where
573        H: AsRef<HeaderTy<N>>,
574        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
575        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
576    {
577        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
578        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
579
580        // Get the block body
581        //
582        // If the body indices are not found, this means that the transactions either do not exist
583        // in the database yet, or they do exit but are not indexed. If they exist but are not
584        // indexed, we don't have enough information to return the block anyways, so we return
585        // `None`.
586        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
587
588        let tx_range = body.tx_num_range();
589
590        let (transactions, senders) = if tx_range.is_empty() {
591            (vec![], vec![])
592        } else {
593            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
594        };
595
596        let body = self
597            .storage
598            .reader()
599            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
600            .pop()
601            .ok_or(ProviderError::InvalidStorageOutput)?;
602
603        construct_block(header, body, senders)
604    }
605
606    /// Returns a range of blocks from the database.
607    ///
608    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
609    /// construct blocks from the following inputs:
610    ///     – Header
611    ///     - Range of transaction numbers
612    ///     – Ommers
613    ///     – Withdrawals
614    ///     – Senders
615    fn block_range<F, H, HF, R>(
616        &self,
617        range: RangeInclusive<BlockNumber>,
618        headers_range: HF,
619        mut assemble_block: F,
620    ) -> ProviderResult<Vec<R>>
621    where
622        H: AsRef<HeaderTy<N>>,
623        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
624        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
625    {
626        if range.is_empty() {
627            return Ok(Vec::new())
628        }
629
630        let len = range.end().saturating_sub(*range.start()) as usize;
631        let mut blocks = Vec::with_capacity(len);
632
633        let headers = headers_range(range.clone())?;
634        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
635
636        // If the body indices are not found, this means that the transactions either do
637        // not exist in the database yet, or they do exit but are
638        // not indexed. If they exist but are not indexed, we don't
639        // have enough information to return the block anyways, so
640        // we skip the block.
641        let present_headers = self
642            .block_body_indices_range(range)?
643            .into_iter()
644            .map(|b| b.tx_num_range())
645            .zip(headers)
646            .collect::<Vec<_>>();
647
648        let mut inputs = Vec::new();
649        for (tx_range, header) in &present_headers {
650            let transactions = if tx_range.is_empty() {
651                Vec::new()
652            } else {
653                self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
654            };
655
656            inputs.push((header.as_ref(), transactions));
657        }
658
659        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
660
661        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
662            blocks.push(assemble_block(header, body, tx_range)?);
663        }
664
665        Ok(blocks)
666    }
667
668    /// Returns a range of blocks from the database, along with the senders of each
669    /// transaction in the blocks.
670    ///
671    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
672    /// construct blocks from the following inputs:
673    ///     – Header
674    ///     - Transactions
675    ///     – Ommers
676    ///     – Withdrawals
677    ///     – Senders
678    fn block_with_senders_range<H, HF, B, BF>(
679        &self,
680        range: RangeInclusive<BlockNumber>,
681        headers_range: HF,
682        assemble_block: BF,
683    ) -> ProviderResult<Vec<B>>
684    where
685        H: AsRef<HeaderTy<N>>,
686        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
687        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
688    {
689        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
690
691        self.block_range(range, headers_range, |header, body, tx_range| {
692            let senders = if tx_range.is_empty() {
693                Vec::new()
694            } else {
695                // fetch senders from the senders table
696                let known_senders =
697                    senders_cursor
698                        .walk_range(tx_range.clone())?
699                        .collect::<Result<HashMap<_, _>, _>>()?;
700
701                let mut senders = Vec::with_capacity(body.transactions().len());
702                for (tx_num, tx) in tx_range.zip(body.transactions()) {
703                    match known_senders.get(&tx_num) {
704                        None => {
705                            // recover the sender from the transaction if not found
706                            let sender = tx.recover_signer_unchecked()?;
707                            senders.push(sender);
708                        }
709                        Some(sender) => senders.push(*sender),
710                    }
711                }
712
713                senders
714            };
715
716            assemble_block(header, body, senders)
717        })
718    }
719
720    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
721    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
722    /// account changesets.
723    fn populate_bundle_state<A, S>(
724        &self,
725        account_changeset: Vec<(u64, AccountBeforeTx)>,
726        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
727        plain_accounts_cursor: &mut A,
728        plain_storage_cursor: &mut S,
729    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
730    where
731        A: DbCursorRO<PlainAccountState>,
732        S: DbDupCursorRO<PlainStorageState>,
733    {
734        // iterate previous value and get plain state value to create changeset
735        // Double option around Account represent if Account state is know (first option) and
736        // account is removed (Second Option)
737        let mut state: BundleStateInit = HashMap::default();
738
739        // This is not working for blocks that are not at tip. as plain state is not the last
740        // state of end range. We should rename the functions or add support to access
741        // History state. Accessing history state can be tricky but we are not gaining
742        // anything.
743
744        let mut reverts: RevertsInit = HashMap::default();
745
746        // add account changeset changes
747        for (block_number, account_before) in account_changeset.into_iter().rev() {
748            let AccountBeforeTx { info: old_info, address } = account_before;
749            match state.entry(address) {
750                hash_map::Entry::Vacant(entry) => {
751                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
752                    entry.insert((old_info, new_info, HashMap::default()));
753                }
754                hash_map::Entry::Occupied(mut entry) => {
755                    // overwrite old account state.
756                    entry.get_mut().0 = old_info;
757                }
758            }
759            // insert old info into reverts.
760            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
761        }
762
763        // add storage changeset changes
764        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
765            let BlockNumberAddress((block_number, address)) = block_and_address;
766            // get account state or insert from plain state.
767            let account_state = match state.entry(address) {
768                hash_map::Entry::Vacant(entry) => {
769                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
770                    entry.insert((present_info, present_info, HashMap::default()))
771                }
772                hash_map::Entry::Occupied(entry) => entry.into_mut(),
773            };
774
775            // match storage.
776            match account_state.2.entry(old_storage.key) {
777                hash_map::Entry::Vacant(entry) => {
778                    let new_storage = plain_storage_cursor
779                        .seek_by_key_subkey(address, old_storage.key)?
780                        .filter(|storage| storage.key == old_storage.key)
781                        .unwrap_or_default();
782                    entry.insert((old_storage.value, new_storage.value));
783                }
784                hash_map::Entry::Occupied(mut entry) => {
785                    entry.get_mut().0 = old_storage.value;
786                }
787            };
788
789            reverts
790                .entry(block_number)
791                .or_default()
792                .entry(address)
793                .or_default()
794                .1
795                .push(old_storage);
796        }
797
798        Ok((state, reverts))
799    }
800}
801
802impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
803    /// Commit database transaction.
804    pub fn commit(self) -> ProviderResult<bool> {
805        Ok(self.tx.commit()?)
806    }
807
808    /// Load shard and remove it. If list is empty, last shard was full or
809    /// there are no shards at all.
810    fn take_shard<T>(
811        &self,
812        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
813        key: T::Key,
814    ) -> ProviderResult<Vec<u64>>
815    where
816        T: Table<Value = BlockNumberList>,
817    {
818        if let Some((_, list)) = cursor.seek_exact(key)? {
819            // delete old shard so new one can be inserted.
820            cursor.delete_current()?;
821            let list = list.iter().collect::<Vec<_>>();
822            return Ok(list)
823        }
824        Ok(Vec::new())
825    }
826
827    /// Insert history index to the database.
828    ///
829    /// For each updated partial key, this function removes the last shard from
830    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
831    /// inserts the new shards back into the database.
832    ///
833    /// This function is used by history indexing stages.
834    fn append_history_index<P, T>(
835        &self,
836        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
837        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
838    ) -> ProviderResult<()>
839    where
840        P: Copy,
841        T: Table<Value = BlockNumberList>,
842    {
843        let mut cursor = self.tx.cursor_write::<T>()?;
844        for (partial_key, indices) in index_updates {
845            let mut last_shard =
846                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
847            last_shard.extend(indices);
848            // Chunk indices and insert them in shards of N size.
849            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
850            while let Some(list) = chunks.next() {
851                let highest_block_number = if chunks.peek().is_some() {
852                    *list.last().expect("`chunks` does not return empty list")
853                } else {
854                    // Insert last list with `u64::MAX`.
855                    u64::MAX
856                };
857                cursor.insert(
858                    sharded_key_factory(partial_key, highest_block_number),
859                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
860                )?;
861            }
862        }
863        Ok(())
864    }
865}
866
867impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
868    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
869        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
870    }
871}
872
873impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
874    fn changed_accounts_with_range(
875        &self,
876        range: impl RangeBounds<BlockNumber>,
877    ) -> ProviderResult<BTreeSet<Address>> {
878        self.tx
879            .cursor_read::<tables::AccountChangeSets>()?
880            .walk_range(range)?
881            .map(|entry| {
882                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
883            })
884            .collect()
885    }
886
887    fn basic_accounts(
888        &self,
889        iter: impl IntoIterator<Item = Address>,
890    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
891        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
892        Ok(iter
893            .into_iter()
894            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
895            .collect::<Result<Vec<_>, _>>()?)
896    }
897
898    fn changed_accounts_and_blocks_with_range(
899        &self,
900        range: RangeInclusive<BlockNumber>,
901    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
902        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
903
904        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
905            BTreeMap::new(),
906            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
907                let (index, account) = entry?;
908                accounts.entry(account.address).or_default().push(index);
909                Ok(accounts)
910            },
911        )?;
912
913        Ok(account_transitions)
914    }
915}
916
917impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
918    fn storage_changeset(
919        &self,
920        block_number: BlockNumber,
921    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
922        let range = block_number..=block_number;
923        let storage_range = BlockNumberAddress::range(range);
924        self.tx
925            .cursor_dup_read::<tables::StorageChangeSets>()?
926            .walk_range(storage_range)?
927            .map(|result| -> ProviderResult<_> { Ok(result?) })
928            .collect()
929    }
930}
931
932impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
933    fn account_block_changeset(
934        &self,
935        block_number: BlockNumber,
936    ) -> ProviderResult<Vec<AccountBeforeTx>> {
937        let range = block_number..=block_number;
938        self.tx
939            .cursor_read::<tables::AccountChangeSets>()?
940            .walk_range(range)?
941            .map(|result| -> ProviderResult<_> {
942                let (_, account_before) = result?;
943                Ok(account_before)
944            })
945            .collect()
946    }
947}
948
949impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
950    for DatabaseProvider<TX, N>
951{
952    type Header = HeaderTy<N>;
953
954    fn local_tip_header(
955        &self,
956        highest_uninterrupted_block: BlockNumber,
957    ) -> ProviderResult<SealedHeader<Self::Header>> {
958        let static_file_provider = self.static_file_provider();
959
960        // Make sure Headers static file is at the same height. If it's further, this
961        // input execution was interrupted previously and we need to unwind the static file.
962        let next_static_file_block_num = static_file_provider
963            .get_highest_static_file_block(StaticFileSegment::Headers)
964            .map(|id| id + 1)
965            .unwrap_or_default();
966        let next_block = highest_uninterrupted_block + 1;
967
968        match next_static_file_block_num.cmp(&next_block) {
969            // The node shutdown between an executed static file commit and before the database
970            // commit, so we need to unwind the static files.
971            Ordering::Greater => {
972                let mut static_file_producer =
973                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
974                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
975                // Since this is a database <-> static file inconsistency, we commit the change
976                // straight away.
977                static_file_producer.commit()?
978            }
979            Ordering::Less => {
980                // There's either missing or corrupted files.
981                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
982            }
983            Ordering::Equal => {}
984        }
985
986        let local_head = static_file_provider
987            .sealed_header(highest_uninterrupted_block)?
988            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
989
990        Ok(local_head)
991    }
992}
993
994impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
995    type Header = HeaderTy<N>;
996
997    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
998        if let Some(num) = self.block_number(*block_hash)? {
999            Ok(self.header_by_number(num)?)
1000        } else {
1001            Ok(None)
1002        }
1003    }
1004
1005    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1006        self.static_file_provider.get_with_static_file_or_database(
1007            StaticFileSegment::Headers,
1008            num,
1009            |static_file| static_file.header_by_number(num),
1010            || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1011        )
1012    }
1013
1014    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1015        if let Some(num) = self.block_number(*block_hash)? {
1016            self.header_td_by_number(num)
1017        } else {
1018            Ok(None)
1019        }
1020    }
1021
1022    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1023        if self.chain_spec.is_paris_active_at_block(number) {
1024            if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1025                // if this block is higher than the final paris(merge) block, return the final paris
1026                // difficulty
1027                return Ok(Some(td))
1028            }
1029        }
1030
1031        self.static_file_provider.get_with_static_file_or_database(
1032            StaticFileSegment::Headers,
1033            number,
1034            |static_file| static_file.header_td_by_number(number),
1035            || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1036        )
1037    }
1038
1039    fn headers_range(
1040        &self,
1041        range: impl RangeBounds<BlockNumber>,
1042    ) -> ProviderResult<Vec<Self::Header>> {
1043        self.static_file_provider.get_range_with_static_file_or_database(
1044            StaticFileSegment::Headers,
1045            to_range(range),
1046            |static_file, range, _| static_file.headers_range(range),
1047            |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1048            |_| true,
1049        )
1050    }
1051
1052    fn sealed_header(
1053        &self,
1054        number: BlockNumber,
1055    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1056        self.static_file_provider.get_with_static_file_or_database(
1057            StaticFileSegment::Headers,
1058            number,
1059            |static_file| static_file.sealed_header(number),
1060            || {
1061                if let Some(header) = self.header_by_number(number)? {
1062                    let hash = self
1063                        .block_hash(number)?
1064                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1065                    Ok(Some(SealedHeader::new(header, hash)))
1066                } else {
1067                    Ok(None)
1068                }
1069            },
1070        )
1071    }
1072
1073    fn sealed_headers_while(
1074        &self,
1075        range: impl RangeBounds<BlockNumber>,
1076        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1077    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1078        self.static_file_provider.get_range_with_static_file_or_database(
1079            StaticFileSegment::Headers,
1080            to_range(range),
1081            |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1082            |range, mut predicate| {
1083                let mut headers = vec![];
1084                for entry in
1085                    self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1086                {
1087                    let (number, header) = entry?;
1088                    let hash = self
1089                        .block_hash(number)?
1090                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1091                    let sealed = SealedHeader::new(header, hash);
1092                    if !predicate(&sealed) {
1093                        break
1094                    }
1095                    headers.push(sealed);
1096                }
1097                Ok(headers)
1098            },
1099            predicate,
1100        )
1101    }
1102}
1103
1104impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1105    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1106        self.static_file_provider.get_with_static_file_or_database(
1107            StaticFileSegment::Headers,
1108            number,
1109            |static_file| static_file.block_hash(number),
1110            || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1111        )
1112    }
1113
1114    fn canonical_hashes_range(
1115        &self,
1116        start: BlockNumber,
1117        end: BlockNumber,
1118    ) -> ProviderResult<Vec<B256>> {
1119        self.static_file_provider.get_range_with_static_file_or_database(
1120            StaticFileSegment::Headers,
1121            start..end,
1122            |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1123            |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1124            |_| true,
1125        )
1126    }
1127}
1128
1129impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1130    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1131        let best_number = self.best_block_number()?;
1132        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1133        Ok(ChainInfo { best_hash, best_number })
1134    }
1135
1136    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1137        // The best block number is tracked via the finished stage which gets updated in the same tx
1138        // when new blocks committed
1139        Ok(self
1140            .get_stage_checkpoint(StageId::Finish)?
1141            .map(|checkpoint| checkpoint.block_number)
1142            .unwrap_or_default())
1143    }
1144
1145    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1146        Ok(self
1147            .tx
1148            .cursor_read::<tables::CanonicalHeaders>()?
1149            .last()?
1150            .map(|(num, _)| num)
1151            .max(
1152                self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1153            )
1154            .unwrap_or_default())
1155    }
1156
1157    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1158        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1159    }
1160}
1161
1162impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1163    type Block = BlockTy<N>;
1164
1165    fn find_block_by_hash(
1166        &self,
1167        hash: B256,
1168        source: BlockSource,
1169    ) -> ProviderResult<Option<Self::Block>> {
1170        if source.is_canonical() {
1171            self.block(hash.into())
1172        } else {
1173            Ok(None)
1174        }
1175    }
1176
1177    /// Returns the block with matching number from database.
1178    ///
1179    /// If the header for this block is not found, this returns `None`.
1180    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1181    /// will return None.
1182    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1183        if let Some(number) = self.convert_hash_or_number(id)? {
1184            if let Some(header) = self.header_by_number(number)? {
1185                // If the body indices are not found, this means that the transactions either do not
1186                // exist in the database yet, or they do exit but are not indexed.
1187                // If they exist but are not indexed, we don't have enough
1188                // information to return the block anyways, so we return `None`.
1189                let Some(transactions) = self.transactions_by_block(number.into())? else {
1190                    return Ok(None)
1191                };
1192
1193                let body = self
1194                    .storage
1195                    .reader()
1196                    .read_block_bodies(self, vec![(&header, transactions)])?
1197                    .pop()
1198                    .ok_or(ProviderError::InvalidStorageOutput)?;
1199
1200                return Ok(Some(Self::Block::new(header, body)))
1201            }
1202        }
1203
1204        Ok(None)
1205    }
1206    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1207        Ok(None)
1208    }
1209
1210    fn pending_block_and_receipts(
1211        &self,
1212    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1213        Ok(None)
1214    }
1215
1216    /// Returns the block with senders with matching number or hash from database.
1217    ///
1218    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1219    /// spot, and we want fast querying.**
1220    ///
1221    /// If the header for this block is not found, this returns `None`.
1222    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1223    /// will return None.
1224    fn recovered_block(
1225        &self,
1226        id: BlockHashOrNumber,
1227        transaction_kind: TransactionVariant,
1228    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1229        self.recovered_block(
1230            id,
1231            transaction_kind,
1232            |block_number| self.header_by_number(block_number),
1233            |header, body, senders| {
1234                Self::Block::new(header, body)
1235                    // Note: we're using unchecked here because we know the block contains valid txs
1236                    // wrt to its height and can ignore the s value check so pre
1237                    // EIP-2 txs are allowed
1238                    .try_into_recovered_unchecked(senders)
1239                    .map(Some)
1240                    .map_err(|_| ProviderError::SenderRecoveryError)
1241            },
1242        )
1243    }
1244
1245    fn sealed_block_with_senders(
1246        &self,
1247        id: BlockHashOrNumber,
1248        transaction_kind: TransactionVariant,
1249    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1250        self.recovered_block(
1251            id,
1252            transaction_kind,
1253            |block_number| self.sealed_header(block_number),
1254            |header, body, senders| {
1255                Self::Block::new_sealed(header, body)
1256                    // Note: we're using unchecked here because we know the block contains valid txs
1257                    // wrt to its height and can ignore the s value check so pre
1258                    // EIP-2 txs are allowed
1259                    .try_with_senders_unchecked(senders)
1260                    .map(Some)
1261                    .map_err(|_| ProviderError::SenderRecoveryError)
1262            },
1263        )
1264    }
1265
1266    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1267        self.block_range(
1268            range,
1269            |range| self.headers_range(range),
1270            |header, body, _| Ok(Self::Block::new(header, body)),
1271        )
1272    }
1273
1274    fn block_with_senders_range(
1275        &self,
1276        range: RangeInclusive<BlockNumber>,
1277    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1278        self.block_with_senders_range(
1279            range,
1280            |range| self.headers_range(range),
1281            |header, body, senders| {
1282                Self::Block::new(header, body)
1283                    .try_into_recovered_unchecked(senders)
1284                    .map_err(|_| ProviderError::SenderRecoveryError)
1285            },
1286        )
1287    }
1288
1289    fn recovered_block_range(
1290        &self,
1291        range: RangeInclusive<BlockNumber>,
1292    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1293        self.block_with_senders_range(
1294            range,
1295            |range| self.sealed_headers_range(range),
1296            |header, body, senders| {
1297                Self::Block::new_sealed(header, body)
1298                    .try_with_senders(senders)
1299                    .map_err(|_| ProviderError::SenderRecoveryError)
1300            },
1301        )
1302    }
1303}
1304
1305impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1306    for DatabaseProvider<TX, N>
1307{
1308    /// Recovers transaction hashes by walking through `Transactions` table and
1309    /// calculating them in a parallel manner. Returned unsorted.
1310    fn transaction_hashes_by_range(
1311        &self,
1312        tx_range: Range<TxNumber>,
1313    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1314        self.static_file_provider.get_range_with_static_file_or_database(
1315            StaticFileSegment::Transactions,
1316            tx_range,
1317            |static_file, range, _| static_file.transaction_hashes_by_range(range),
1318            |tx_range, _| {
1319                let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1320                let tx_range_size = tx_range.clone().count();
1321                let tx_walker = tx_cursor.walk_range(tx_range)?;
1322
1323                let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1324                let mut channels = Vec::with_capacity(chunk_size);
1325                let mut transaction_count = 0;
1326
1327                #[inline]
1328                fn calculate_hash<T>(
1329                    entry: Result<(TxNumber, T), DatabaseError>,
1330                    rlp_buf: &mut Vec<u8>,
1331                ) -> Result<(B256, TxNumber), Box<ProviderError>>
1332                where
1333                    T: Encodable2718,
1334                {
1335                    let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1336                    tx.encode_2718(rlp_buf);
1337                    Ok((keccak256(rlp_buf), tx_id))
1338                }
1339
1340                for chunk in &tx_walker.chunks(chunk_size) {
1341                    let (tx, rx) = mpsc::channel();
1342                    channels.push(rx);
1343
1344                    // Note: Unfortunate side-effect of how chunk is designed in itertools (it is
1345                    // not Send)
1346                    let chunk: Vec<_> = chunk.collect();
1347                    transaction_count += chunk.len();
1348
1349                    // Spawn the task onto the global rayon pool
1350                    // This task will send the results through the channel after it has calculated
1351                    // the hash.
1352                    rayon::spawn(move || {
1353                        let mut rlp_buf = Vec::with_capacity(128);
1354                        for entry in chunk {
1355                            rlp_buf.clear();
1356                            let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1357                        }
1358                    });
1359                }
1360                let mut tx_list = Vec::with_capacity(transaction_count);
1361
1362                // Iterate over channels and append the tx hashes unsorted
1363                for channel in channels {
1364                    while let Ok(tx) = channel.recv() {
1365                        let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1366                        tx_list.push((tx_hash, tx_id));
1367                    }
1368                }
1369
1370                Ok(tx_list)
1371            },
1372            |_| true,
1373        )
1374    }
1375}
1376
1377// Calculates the hash of the given transaction
1378impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1379    type Transaction = TxTy<N>;
1380
1381    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1382        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1383    }
1384
1385    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1386        self.static_file_provider.get_with_static_file_or_database(
1387            StaticFileSegment::Transactions,
1388            id,
1389            |static_file| static_file.transaction_by_id(id),
1390            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1391        )
1392    }
1393
1394    fn transaction_by_id_unhashed(
1395        &self,
1396        id: TxNumber,
1397    ) -> ProviderResult<Option<Self::Transaction>> {
1398        self.static_file_provider.get_with_static_file_or_database(
1399            StaticFileSegment::Transactions,
1400            id,
1401            |static_file| static_file.transaction_by_id_unhashed(id),
1402            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1403        )
1404    }
1405
1406    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1407        if let Some(id) = self.transaction_id(hash)? {
1408            Ok(self.transaction_by_id_unhashed(id)?)
1409        } else {
1410            Ok(None)
1411        }
1412    }
1413
1414    fn transaction_by_hash_with_meta(
1415        &self,
1416        tx_hash: TxHash,
1417    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1418        let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1419        if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1420            if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1421                if let Some(block_number) =
1422                    transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1423                {
1424                    if let Some(sealed_header) = self.sealed_header(block_number)? {
1425                        let (header, block_hash) = sealed_header.split();
1426                        if let Some(block_body) = self.block_body_indices(block_number)? {
1427                            // the index of the tx in the block is the offset:
1428                            // len([start..tx_id])
1429                            // NOTE: `transaction_id` is always `>=` the block's first
1430                            // index
1431                            let index = transaction_id - block_body.first_tx_num();
1432
1433                            let meta = TransactionMeta {
1434                                tx_hash,
1435                                index,
1436                                block_hash,
1437                                block_number,
1438                                base_fee: header.base_fee_per_gas(),
1439                                excess_blob_gas: header.excess_blob_gas(),
1440                                timestamp: header.timestamp(),
1441                            };
1442
1443                            return Ok(Some((transaction, meta)))
1444                        }
1445                    }
1446                }
1447            }
1448        }
1449
1450        Ok(None)
1451    }
1452
1453    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1454        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1455        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1456    }
1457
1458    fn transactions_by_block(
1459        &self,
1460        id: BlockHashOrNumber,
1461    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1462        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1463
1464        if let Some(block_number) = self.convert_hash_or_number(id)? {
1465            if let Some(body) = self.block_body_indices(block_number)? {
1466                let tx_range = body.tx_num_range();
1467                return if tx_range.is_empty() {
1468                    Ok(Some(Vec::new()))
1469                } else {
1470                    Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1471                }
1472            }
1473        }
1474        Ok(None)
1475    }
1476
1477    fn transactions_by_block_range(
1478        &self,
1479        range: impl RangeBounds<BlockNumber>,
1480    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1481        let range = to_range(range);
1482        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1483
1484        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1485            .into_iter()
1486            .map(|body| {
1487                let tx_num_range = body.tx_num_range();
1488                if tx_num_range.is_empty() {
1489                    Ok(Vec::new())
1490                } else {
1491                    Ok(self
1492                        .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1493                        .into_iter()
1494                        .collect())
1495                }
1496            })
1497            .collect()
1498    }
1499
1500    fn transactions_by_tx_range(
1501        &self,
1502        range: impl RangeBounds<TxNumber>,
1503    ) -> ProviderResult<Vec<Self::Transaction>> {
1504        self.transactions_by_tx_range_with_cursor(
1505            range,
1506            &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1507        )
1508    }
1509
1510    fn senders_by_tx_range(
1511        &self,
1512        range: impl RangeBounds<TxNumber>,
1513    ) -> ProviderResult<Vec<Address>> {
1514        self.cursor_read_collect::<tables::TransactionSenders>(range)
1515    }
1516
1517    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1518        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1519    }
1520}
1521
1522impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1523    type Receipt = ReceiptTy<N>;
1524
1525    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1526        self.static_file_provider.get_with_static_file_or_database(
1527            StaticFileSegment::Receipts,
1528            id,
1529            |static_file| static_file.receipt(id),
1530            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1531        )
1532    }
1533
1534    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1535        if let Some(id) = self.transaction_id(hash)? {
1536            self.receipt(id)
1537        } else {
1538            Ok(None)
1539        }
1540    }
1541
1542    fn receipts_by_block(
1543        &self,
1544        block: BlockHashOrNumber,
1545    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1546        if let Some(number) = self.convert_hash_or_number(block)? {
1547            if let Some(body) = self.block_body_indices(number)? {
1548                let tx_range = body.tx_num_range();
1549                return if tx_range.is_empty() {
1550                    Ok(Some(Vec::new()))
1551                } else {
1552                    self.receipts_by_tx_range(tx_range).map(Some)
1553                }
1554            }
1555        }
1556        Ok(None)
1557    }
1558
1559    fn receipts_by_tx_range(
1560        &self,
1561        range: impl RangeBounds<TxNumber>,
1562    ) -> ProviderResult<Vec<Self::Receipt>> {
1563        self.static_file_provider.get_range_with_static_file_or_database(
1564            StaticFileSegment::Receipts,
1565            to_range(range),
1566            |static_file, range, _| static_file.receipts_by_tx_range(range),
1567            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1568            |_| true,
1569        )
1570    }
1571
1572    fn receipts_by_block_range(
1573        &self,
1574        block_range: RangeInclusive<BlockNumber>,
1575    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1576        if block_range.is_empty() {
1577            return Ok(Vec::new());
1578        }
1579
1580        // collect block body indices for each block in the range
1581        let mut block_body_indices = Vec::new();
1582        for block_num in block_range {
1583            if let Some(indices) = self.block_body_indices(block_num)? {
1584                block_body_indices.push(indices);
1585            } else {
1586                // use default indices for missing blocks (empty block)
1587                block_body_indices.push(StoredBlockBodyIndices::default());
1588            }
1589        }
1590
1591        if block_body_indices.is_empty() {
1592            return Ok(Vec::new());
1593        }
1594
1595        // find blocks with transactions to determine transaction range
1596        let non_empty_blocks: Vec<_> =
1597            block_body_indices.iter().filter(|indices| indices.tx_count > 0).collect();
1598
1599        if non_empty_blocks.is_empty() {
1600            // all blocks are empty
1601            return Ok(vec![Vec::new(); block_body_indices.len()]);
1602        }
1603
1604        // calculate the overall transaction range
1605        let first_tx = non_empty_blocks[0].first_tx_num();
1606        let last_tx = non_empty_blocks[non_empty_blocks.len() - 1].last_tx_num();
1607
1608        // fetch all receipts in the transaction range
1609        let all_receipts = self.receipts_by_tx_range(first_tx..=last_tx)?;
1610        let mut receipts_iter = all_receipts.into_iter();
1611
1612        // distribute receipts to their respective blocks
1613        let mut result = Vec::with_capacity(block_body_indices.len());
1614        for indices in &block_body_indices {
1615            if indices.tx_count == 0 {
1616                result.push(Vec::new());
1617            } else {
1618                let block_receipts =
1619                    receipts_iter.by_ref().take(indices.tx_count as usize).collect();
1620                result.push(block_receipts);
1621            }
1622        }
1623
1624        Ok(result)
1625    }
1626}
1627
1628impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1629    for DatabaseProvider<TX, N>
1630{
1631    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1632        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1633    }
1634
1635    fn block_body_indices_range(
1636        &self,
1637        range: RangeInclusive<BlockNumber>,
1638    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1639        self.cursor_read_collect::<tables::BlockBodyIndices>(range)
1640    }
1641}
1642
1643impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1644    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1645        Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1646    }
1647
1648    /// Get stage checkpoint progress.
1649    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1650        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1651    }
1652
1653    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1654        self.tx
1655            .cursor_read::<tables::StageCheckpoints>()?
1656            .walk(None)?
1657            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1658            .map_err(ProviderError::Database)
1659    }
1660}
1661
1662impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1663    /// Save stage checkpoint.
1664    fn save_stage_checkpoint(
1665        &self,
1666        id: StageId,
1667        checkpoint: StageCheckpoint,
1668    ) -> ProviderResult<()> {
1669        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1670    }
1671
1672    /// Save stage checkpoint progress.
1673    fn save_stage_checkpoint_progress(
1674        &self,
1675        id: StageId,
1676        checkpoint: Vec<u8>,
1677    ) -> ProviderResult<()> {
1678        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1679    }
1680
1681    fn update_pipeline_stages(
1682        &self,
1683        block_number: BlockNumber,
1684        drop_stage_checkpoint: bool,
1685    ) -> ProviderResult<()> {
1686        // iterate over all existing stages in the table and update its progress.
1687        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1688        for stage_id in StageId::ALL {
1689            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1690            cursor.upsert(
1691                stage_id.to_string(),
1692                &StageCheckpoint {
1693                    block_number,
1694                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1695                },
1696            )?;
1697        }
1698
1699        Ok(())
1700    }
1701}
1702
1703impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1704    fn plain_state_storages(
1705        &self,
1706        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1707    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1708        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1709
1710        addresses_with_keys
1711            .into_iter()
1712            .map(|(address, storage)| {
1713                storage
1714                    .into_iter()
1715                    .map(|key| -> ProviderResult<_> {
1716                        Ok(plain_storage
1717                            .seek_by_key_subkey(address, key)?
1718                            .filter(|v| v.key == key)
1719                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1720                    })
1721                    .collect::<ProviderResult<Vec<_>>>()
1722                    .map(|storage| (address, storage))
1723            })
1724            .collect::<ProviderResult<Vec<(_, _)>>>()
1725    }
1726
1727    fn changed_storages_with_range(
1728        &self,
1729        range: RangeInclusive<BlockNumber>,
1730    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1731        self.tx
1732            .cursor_read::<tables::StorageChangeSets>()?
1733            .walk_range(BlockNumberAddress::range(range))?
1734            // fold all storages and save its old state so we can remove it from HashedStorage
1735            // it is needed as it is dup table.
1736            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1737                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1738                accounts.entry(address).or_default().insert(storage_entry.key);
1739                Ok(accounts)
1740            })
1741    }
1742
1743    fn changed_storages_and_blocks_with_range(
1744        &self,
1745        range: RangeInclusive<BlockNumber>,
1746    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1747        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1748
1749        let storage_changeset_lists =
1750            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1751                BTreeMap::new(),
1752                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1753                    let (index, storage) = entry?;
1754                    storages
1755                        .entry((index.address(), storage.key))
1756                        .or_default()
1757                        .push(index.block_number());
1758                    Ok(storages)
1759                },
1760            )?;
1761
1762        Ok(storage_changeset_lists)
1763    }
1764}
1765
1766impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1767    for DatabaseProvider<TX, N>
1768{
1769    type Receipt = ReceiptTy<N>;
1770
1771    fn write_state(
1772        &self,
1773        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1774        is_value_known: OriginalValuesKnown,
1775        write_receipts_to: StorageLocation,
1776    ) -> ProviderResult<()> {
1777        let first_block = execution_outcome.first_block();
1778        let block_count = execution_outcome.len() as u64;
1779        let last_block = execution_outcome.last_block();
1780        let block_range = first_block..=last_block;
1781
1782        let tip = self.last_block_number()?.max(last_block);
1783
1784        let (plain_state, reverts) =
1785            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1786
1787        self.write_state_reverts(reverts, first_block)?;
1788        self.write_state_changes(plain_state)?;
1789
1790        // Fetch the first transaction number for each block in the range
1791        let block_indices: Vec<_> = self
1792            .block_body_indices_range(block_range)?
1793            .into_iter()
1794            .map(|b| b.first_tx_num)
1795            .collect();
1796
1797        // Ensure all expected blocks are present.
1798        if block_indices.len() < block_count as usize {
1799            let missing_blocks = block_count - block_indices.len() as u64;
1800            return Err(ProviderError::BlockBodyIndicesNotFound(
1801                last_block.saturating_sub(missing_blocks - 1),
1802            ));
1803        }
1804
1805        let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1806
1807        // Prepare receipts cursor if we are going to write receipts to the database
1808        //
1809        // We are writing to database if requested or if there's any kind of receipt pruning
1810        // configured
1811        let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1812            .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1813            .transpose()?;
1814
1815        // Prepare receipts static writer if we are going to write receipts to static files
1816        //
1817        // We are writing to static files if requested and if there's no receipt pruning configured
1818        let mut receipts_static_writer = (write_receipts_to.static_files() &&
1819            !has_receipts_pruning)
1820            .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1821            .transpose()?;
1822
1823        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1824        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1825
1826        // All receipts from the last 128 blocks are required for blockchain tree, even with
1827        // [`PruneSegment::ContractLogs`].
1828        let prunable_receipts =
1829            PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1830
1831        // Prepare set of addresses which logs should not be pruned.
1832        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1833        for (_, addresses) in contract_log_pruner.range(..first_block) {
1834            allowed_addresses.extend(addresses.iter().copied());
1835        }
1836
1837        for (idx, (receipts, first_tx_index)) in
1838            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1839        {
1840            let block_number = first_block + idx as u64;
1841
1842            // Increment block number for receipts static file writer
1843            if let Some(writer) = receipts_static_writer.as_mut() {
1844                writer.increment_block(block_number)?;
1845            }
1846
1847            // Skip writing receipts if pruning configuration requires us to.
1848            if prunable_receipts &&
1849                self.prune_modes
1850                    .receipts
1851                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1852            {
1853                continue
1854            }
1855
1856            // If there are new addresses to retain after this block number, track them
1857            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1858                allowed_addresses.extend(new_addresses.iter().copied());
1859            }
1860
1861            for (idx, receipt) in receipts.iter().enumerate() {
1862                let receipt_idx = first_tx_index + idx as u64;
1863                // Skip writing receipt if log filter is active and it does not have any logs to
1864                // retain
1865                if prunable_receipts &&
1866                    has_contract_log_filter &&
1867                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1868                {
1869                    continue
1870                }
1871
1872                if let Some(writer) = &mut receipts_static_writer {
1873                    writer.append_receipt(receipt_idx, receipt)?;
1874                }
1875
1876                if let Some(cursor) = &mut receipts_cursor {
1877                    cursor.append(receipt_idx, receipt)?;
1878                }
1879            }
1880        }
1881
1882        Ok(())
1883    }
1884
1885    fn write_state_reverts(
1886        &self,
1887        reverts: PlainStateReverts,
1888        first_block: BlockNumber,
1889    ) -> ProviderResult<()> {
1890        // Write storage changes
1891        tracing::trace!("Writing storage changes");
1892        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1893        let mut storage_changeset_cursor =
1894            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1895        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1896            let block_number = first_block + block_index as BlockNumber;
1897
1898            tracing::trace!(block_number, "Writing block change");
1899            // sort changes by address.
1900            storage_changes.par_sort_unstable_by_key(|a| a.address);
1901            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1902                let storage_id = BlockNumberAddress((block_number, address));
1903
1904                let mut storage = storage_revert
1905                    .into_iter()
1906                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1907                    .collect::<Vec<_>>();
1908                // sort storage slots by key.
1909                storage.par_sort_unstable_by_key(|a| a.0);
1910
1911                // If we are writing the primary storage wipe transition, the pre-existing plain
1912                // storage state has to be taken from the database and written to storage history.
1913                // See [StorageWipe::Primary] for more details.
1914                let mut wiped_storage = Vec::new();
1915                if wiped {
1916                    tracing::trace!(?address, "Wiping storage");
1917                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1918                        wiped_storage.push((entry.key, entry.value));
1919                        while let Some(entry) = storages_cursor.next_dup_val()? {
1920                            wiped_storage.push((entry.key, entry.value))
1921                        }
1922                    }
1923                }
1924
1925                tracing::trace!(?address, ?storage, "Writing storage reverts");
1926                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1927                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1928                }
1929            }
1930        }
1931
1932        // Write account changes
1933        tracing::trace!("Writing account changes");
1934        let mut account_changeset_cursor =
1935            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1936
1937        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1938            let block_number = first_block + block_index as BlockNumber;
1939            // Sort accounts by address.
1940            account_block_reverts.par_sort_by_key(|a| a.0);
1941
1942            for (address, info) in account_block_reverts {
1943                account_changeset_cursor.append_dup(
1944                    block_number,
1945                    AccountBeforeTx { address, info: info.map(Into::into) },
1946                )?;
1947            }
1948        }
1949
1950        Ok(())
1951    }
1952
1953    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1954        // sort all entries so they can be written to database in more performant way.
1955        // and take smaller memory footprint.
1956        changes.accounts.par_sort_by_key(|a| a.0);
1957        changes.storage.par_sort_by_key(|a| a.address);
1958        changes.contracts.par_sort_by_key(|a| a.0);
1959
1960        // Write new account state
1961        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1962        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1963        // write account to database.
1964        for (address, account) in changes.accounts {
1965            if let Some(account) = account {
1966                tracing::trace!(?address, "Updating plain state account");
1967                accounts_cursor.upsert(address, &account.into())?;
1968            } else if accounts_cursor.seek_exact(address)?.is_some() {
1969                tracing::trace!(?address, "Deleting plain state account");
1970                accounts_cursor.delete_current()?;
1971            }
1972        }
1973
1974        // Write bytecode
1975        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1976        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1977        for (hash, bytecode) in changes.contracts {
1978            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1979        }
1980
1981        // Write new storage state and wipe storage if needed.
1982        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1983        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1984        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1985            // Wiping of storage.
1986            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1987                storages_cursor.delete_current_duplicates()?;
1988            }
1989            // cast storages to B256.
1990            let mut storage = storage
1991                .into_iter()
1992                .map(|(k, value)| StorageEntry { key: k.into(), value })
1993                .collect::<Vec<_>>();
1994            // sort storage slots by key.
1995            storage.par_sort_unstable_by_key(|a| a.key);
1996
1997            for entry in storage {
1998                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1999                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2000                    if db_entry.key == entry.key {
2001                        storages_cursor.delete_current()?;
2002                    }
2003                }
2004
2005                if !entry.value.is_zero() {
2006                    storages_cursor.upsert(address, &entry)?;
2007                }
2008            }
2009        }
2010
2011        Ok(())
2012    }
2013
2014    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2015        // Write hashed account updates.
2016        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2017        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2018            if let Some(account) = account {
2019                hashed_accounts_cursor.upsert(hashed_address, &account)?;
2020            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2021                hashed_accounts_cursor.delete_current()?;
2022            }
2023        }
2024
2025        // Write hashed storage changes.
2026        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2027        let mut hashed_storage_cursor =
2028            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2029        for (hashed_address, storage) in sorted_storages {
2030            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2031                hashed_storage_cursor.delete_current_duplicates()?;
2032            }
2033
2034            for (hashed_slot, value) in storage.storage_slots_sorted() {
2035                let entry = StorageEntry { key: hashed_slot, value };
2036                if let Some(db_entry) =
2037                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2038                {
2039                    if db_entry.key == entry.key {
2040                        hashed_storage_cursor.delete_current()?;
2041                    }
2042                }
2043
2044                if !entry.value.is_zero() {
2045                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2046                }
2047            }
2048        }
2049
2050        Ok(())
2051    }
2052
2053    /// Remove the last N blocks of state.
2054    ///
2055    /// The latest state will be unwound
2056    ///
2057    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2058    ///    transaction ids.
2059    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2060    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2061    ///    the changesets.
2062    ///    - In order to have both the old and new values in the changesets, we also access the
2063    ///      plain state tables.
2064    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2065    ///    we:
2066    ///     1. Take the old value from the changeset
2067    ///     2. Take the new value from the plain state
2068    ///     3. Save the old value to the local state
2069    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2070    ///    have seen before we:
2071    ///     1. Take the old value from the changeset
2072    ///     2. Take the new value from the local state
2073    ///     3. Set the local state to the value in the changeset
2074    fn remove_state_above(
2075        &self,
2076        block: BlockNumber,
2077        remove_receipts_from: StorageLocation,
2078    ) -> ProviderResult<()> {
2079        let range = block + 1..=self.last_block_number()?;
2080
2081        if range.is_empty() {
2082            return Ok(());
2083        }
2084
2085        // We are not removing block meta as it is used to get block changesets.
2086        let block_bodies = self.block_body_indices_range(range.clone())?;
2087
2088        // get transaction receipts
2089        let from_transaction_num =
2090            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2091
2092        let storage_range = BlockNumberAddress::range(range.clone());
2093
2094        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2095        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2096
2097        // This is not working for blocks that are not at tip. as plain state is not the last
2098        // state of end range. We should rename the functions or add support to access
2099        // History state. Accessing history state can be tricky but we are not gaining
2100        // anything.
2101        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2102        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2103
2104        let (state, _) = self.populate_bundle_state(
2105            account_changeset,
2106            storage_changeset,
2107            &mut plain_accounts_cursor,
2108            &mut plain_storage_cursor,
2109        )?;
2110
2111        // iterate over local plain state remove all account and all storages.
2112        for (address, (old_account, new_account, storage)) in &state {
2113            // revert account if needed.
2114            if old_account != new_account {
2115                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2116                if let Some(account) = old_account {
2117                    plain_accounts_cursor.upsert(*address, account)?;
2118                } else if existing_entry.is_some() {
2119                    plain_accounts_cursor.delete_current()?;
2120                }
2121            }
2122
2123            // revert storages
2124            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2125                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2126                // delete previous value
2127                // TODO: This does not use dupsort features
2128                if plain_storage_cursor
2129                    .seek_by_key_subkey(*address, *storage_key)?
2130                    .filter(|s| s.key == *storage_key)
2131                    .is_some()
2132                {
2133                    plain_storage_cursor.delete_current()?
2134                }
2135
2136                // insert value if needed
2137                if !old_storage_value.is_zero() {
2138                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2139                }
2140            }
2141        }
2142
2143        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2144
2145        Ok(())
2146    }
2147
2148    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2149    ///
2150    /// The latest state will be unwound and returned back with all the blocks
2151    ///
2152    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2153    ///    transaction ids.
2154    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2155    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2156    ///    the changesets.
2157    ///    - In order to have both the old and new values in the changesets, we also access the
2158    ///      plain state tables.
2159    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2160    ///    we:
2161    ///     1. Take the old value from the changeset
2162    ///     2. Take the new value from the plain state
2163    ///     3. Save the old value to the local state
2164    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2165    ///    have seen before we:
2166    ///     1. Take the old value from the changeset
2167    ///     2. Take the new value from the local state
2168    ///     3. Set the local state to the value in the changeset
2169    fn take_state_above(
2170        &self,
2171        block: BlockNumber,
2172        remove_receipts_from: StorageLocation,
2173    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2174        let range = block + 1..=self.last_block_number()?;
2175
2176        if range.is_empty() {
2177            return Ok(ExecutionOutcome::default())
2178        }
2179        let start_block_number = *range.start();
2180
2181        // We are not removing block meta as it is used to get block changesets.
2182        let block_bodies = self.block_body_indices_range(range.clone())?;
2183
2184        // get transaction receipts
2185        let from_transaction_num =
2186            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2187        let to_transaction_num =
2188            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2189
2190        let storage_range = BlockNumberAddress::range(range.clone());
2191
2192        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2193        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2194
2195        // This is not working for blocks that are not at tip. as plain state is not the last
2196        // state of end range. We should rename the functions or add support to access
2197        // History state. Accessing history state can be tricky but we are not gaining
2198        // anything.
2199        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2200        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2201
2202        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2203        // remove, and return later
2204        let (state, reverts) = self.populate_bundle_state(
2205            account_changeset,
2206            storage_changeset,
2207            &mut plain_accounts_cursor,
2208            &mut plain_storage_cursor,
2209        )?;
2210
2211        // iterate over local plain state remove all account and all storages.
2212        for (address, (old_account, new_account, storage)) in &state {
2213            // revert account if needed.
2214            if old_account != new_account {
2215                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2216                if let Some(account) = old_account {
2217                    plain_accounts_cursor.upsert(*address, account)?;
2218                } else if existing_entry.is_some() {
2219                    plain_accounts_cursor.delete_current()?;
2220                }
2221            }
2222
2223            // revert storages
2224            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2225                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2226                // delete previous value
2227                // TODO: This does not use dupsort features
2228                if plain_storage_cursor
2229                    .seek_by_key_subkey(*address, *storage_key)?
2230                    .filter(|s| s.key == *storage_key)
2231                    .is_some()
2232                {
2233                    plain_storage_cursor.delete_current()?
2234                }
2235
2236                // insert value if needed
2237                if !old_storage_value.is_zero() {
2238                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2239                }
2240            }
2241        }
2242
2243        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2244        let mut receipts_iter = self
2245            .static_file_provider
2246            .get_range_with_static_file_or_database(
2247                StaticFileSegment::Receipts,
2248                from_transaction_num..to_transaction_num + 1,
2249                |static_file, range, _| {
2250                    static_file
2251                        .receipts_by_tx_range(range.clone())
2252                        .map(|r| range.into_iter().zip(r).collect())
2253                },
2254                |range, _| {
2255                    self.tx
2256                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2257                        .walk_range(range)?
2258                        .map(|r| r.map_err(Into::into))
2259                        .collect()
2260                },
2261                |_| true,
2262            )?
2263            .into_iter()
2264            .peekable();
2265
2266        let mut receipts = Vec::with_capacity(block_bodies.len());
2267        // loop break if we are at the end of the blocks.
2268        for block_body in block_bodies {
2269            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2270            for num in block_body.tx_num_range() {
2271                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2272                    block_receipts.push(receipts_iter.next().unwrap().1);
2273                }
2274            }
2275            receipts.push(block_receipts);
2276        }
2277
2278        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2279
2280        Ok(ExecutionOutcome::new_init(
2281            state,
2282            reverts,
2283            Vec::new(),
2284            receipts,
2285            start_block_number,
2286            Vec::new(),
2287        ))
2288    }
2289}
2290
2291impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2292    /// Writes trie updates. Returns the number of entries modified.
2293    fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2294        if trie_updates.is_empty() {
2295            return Ok(0)
2296        }
2297
2298        // Track the number of inserted entries.
2299        let mut num_entries = 0;
2300
2301        // Merge updated and removed nodes. Updated nodes must take precedence.
2302        let mut account_updates = trie_updates
2303            .removed_nodes_ref()
2304            .iter()
2305            .filter_map(|n| {
2306                (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2307            })
2308            .collect::<Vec<_>>();
2309        account_updates.extend(
2310            trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2311        );
2312        // Sort trie node updates.
2313        account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2314
2315        let tx = self.tx_ref();
2316        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2317        for (key, updated_node) in account_updates {
2318            let nibbles = StoredNibbles(*key);
2319            match updated_node {
2320                Some(node) => {
2321                    if !nibbles.0.is_empty() {
2322                        num_entries += 1;
2323                        account_trie_cursor.upsert(nibbles, node)?;
2324                    }
2325                }
2326                None => {
2327                    num_entries += 1;
2328                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2329                        account_trie_cursor.delete_current()?;
2330                    }
2331                }
2332            }
2333        }
2334
2335        num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2336
2337        Ok(num_entries)
2338    }
2339}
2340
2341impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2342    /// Writes storage trie updates from the given storage trie map. First sorts the storage trie
2343    /// updates by the hashed address, writing in sorted order.
2344    fn write_storage_trie_updates(
2345        &self,
2346        storage_tries: &B256Map<StorageTrieUpdates>,
2347    ) -> ProviderResult<usize> {
2348        let mut num_entries = 0;
2349        let mut storage_tries = Vec::from_iter(storage_tries);
2350        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2351        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2352        for (hashed_address, storage_trie_updates) in storage_tries {
2353            let mut db_storage_trie_cursor =
2354                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2355            num_entries +=
2356                db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2357            cursor = db_storage_trie_cursor.cursor;
2358        }
2359
2360        Ok(num_entries)
2361    }
2362
2363    fn write_individual_storage_trie_updates(
2364        &self,
2365        hashed_address: B256,
2366        updates: &StorageTrieUpdates,
2367    ) -> ProviderResult<usize> {
2368        if updates.is_empty() {
2369            return Ok(0)
2370        }
2371
2372        let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2373        let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2374        Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2375    }
2376}
2377
2378impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2379    fn unwind_account_hashing<'a>(
2380        &self,
2381        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2382    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2383        // Aggregate all block changesets and make a list of accounts that have been changed.
2384        // Note that collecting and then reversing the order is necessary to ensure that the
2385        // changes are applied in the correct order.
2386        let hashed_accounts = changesets
2387            .into_iter()
2388            .map(|(_, e)| (keccak256(e.address), e.info))
2389            .collect::<Vec<_>>()
2390            .into_iter()
2391            .rev()
2392            .collect::<BTreeMap<_, _>>();
2393
2394        // Apply values to HashedState, and remove the account if it's None.
2395        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2396        for (hashed_address, account) in &hashed_accounts {
2397            if let Some(account) = account {
2398                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2399            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2400                hashed_accounts_cursor.delete_current()?;
2401            }
2402        }
2403
2404        Ok(hashed_accounts)
2405    }
2406
2407    fn unwind_account_hashing_range(
2408        &self,
2409        range: impl RangeBounds<BlockNumber>,
2410    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2411        let changesets = self
2412            .tx
2413            .cursor_read::<tables::AccountChangeSets>()?
2414            .walk_range(range)?
2415            .collect::<Result<Vec<_>, _>>()?;
2416        self.unwind_account_hashing(changesets.iter())
2417    }
2418
2419    fn insert_account_for_hashing(
2420        &self,
2421        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2422    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2423        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2424        let hashed_accounts =
2425            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2426        for (hashed_address, account) in &hashed_accounts {
2427            if let Some(account) = account {
2428                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2429            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2430                hashed_accounts_cursor.delete_current()?;
2431            }
2432        }
2433        Ok(hashed_accounts)
2434    }
2435
2436    fn unwind_storage_hashing(
2437        &self,
2438        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2439    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2440        // Aggregate all block changesets and make list of accounts that have been changed.
2441        let mut hashed_storages = changesets
2442            .into_iter()
2443            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2444                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2445            })
2446            .collect::<Vec<_>>();
2447        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2448
2449        // Apply values to HashedState, and remove the account if it's None.
2450        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2451            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2452        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2453        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2454            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2455
2456            if hashed_storage
2457                .seek_by_key_subkey(hashed_address, key)?
2458                .filter(|entry| entry.key == key)
2459                .is_some()
2460            {
2461                hashed_storage.delete_current()?;
2462            }
2463
2464            if !value.is_zero() {
2465                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2466            }
2467        }
2468        Ok(hashed_storage_keys)
2469    }
2470
2471    fn unwind_storage_hashing_range(
2472        &self,
2473        range: impl RangeBounds<BlockNumberAddress>,
2474    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2475        let changesets = self
2476            .tx
2477            .cursor_read::<tables::StorageChangeSets>()?
2478            .walk_range(range)?
2479            .collect::<Result<Vec<_>, _>>()?;
2480        self.unwind_storage_hashing(changesets.into_iter())
2481    }
2482
2483    fn insert_storage_for_hashing(
2484        &self,
2485        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2486    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2487        // hash values
2488        let hashed_storages =
2489            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2490                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2491                    map.insert(keccak256(entry.key), entry.value);
2492                    map
2493                });
2494                map.insert(keccak256(address), storage);
2495                map
2496            });
2497
2498        let hashed_storage_keys = hashed_storages
2499            .iter()
2500            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2501            .collect();
2502
2503        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2504        // Hash the address and key and apply them to HashedStorage (if Storage is None
2505        // just remove it);
2506        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2507            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2508                if hashed_storage_cursor
2509                    .seek_by_key_subkey(hashed_address, key)?
2510                    .filter(|entry| entry.key == key)
2511                    .is_some()
2512                {
2513                    hashed_storage_cursor.delete_current()?;
2514                }
2515
2516                if !value.is_zero() {
2517                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2518                }
2519                Ok(())
2520            })
2521        })?;
2522
2523        Ok(hashed_storage_keys)
2524    }
2525
2526    fn insert_hashes(
2527        &self,
2528        range: RangeInclusive<BlockNumber>,
2529        end_block_hash: B256,
2530        expected_state_root: B256,
2531    ) -> ProviderResult<()> {
2532        // Initialize prefix sets.
2533        let mut account_prefix_set = PrefixSetMut::default();
2534        let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2535        let mut destroyed_accounts = HashSet::default();
2536
2537        let mut durations_recorder = metrics::DurationsRecorder::default();
2538
2539        // storage hashing stage
2540        {
2541            let lists = self.changed_storages_with_range(range.clone())?;
2542            let storages = self.plain_state_storages(lists)?;
2543            let storage_entries = self.insert_storage_for_hashing(storages)?;
2544            for (hashed_address, hashed_slots) in storage_entries {
2545                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2546                for slot in hashed_slots {
2547                    storage_prefix_sets
2548                        .entry(hashed_address)
2549                        .or_default()
2550                        .insert(Nibbles::unpack(slot));
2551                }
2552            }
2553        }
2554        durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2555
2556        // account hashing stage
2557        {
2558            let lists = self.changed_accounts_with_range(range.clone())?;
2559            let accounts = self.basic_accounts(lists)?;
2560            let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2561            for (hashed_address, account) in hashed_addresses {
2562                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2563                if account.is_none() {
2564                    destroyed_accounts.insert(hashed_address);
2565                }
2566            }
2567        }
2568        durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2569
2570        // merkle tree
2571        {
2572            // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
2573            // are pre-loaded.
2574            let prefix_sets = TriePrefixSets {
2575                account_prefix_set: account_prefix_set.freeze(),
2576                storage_prefix_sets: storage_prefix_sets
2577                    .into_iter()
2578                    .map(|(k, v)| (k, v.freeze()))
2579                    .collect(),
2580                destroyed_accounts,
2581            };
2582            let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2583                .with_prefix_sets(prefix_sets)
2584                .root_with_updates()
2585                .map_err(reth_db_api::DatabaseError::from)?;
2586            if state_root != expected_state_root {
2587                return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2588                    root: GotExpected { got: state_root, expected: expected_state_root },
2589                    block_number: *range.end(),
2590                    block_hash: end_block_hash,
2591                })))
2592            }
2593            self.write_trie_updates(&trie_updates)?;
2594        }
2595        durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2596
2597        debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2598
2599        Ok(())
2600    }
2601}
2602
2603impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2604    fn unwind_account_history_indices<'a>(
2605        &self,
2606        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2607    ) -> ProviderResult<usize> {
2608        let mut last_indices = changesets
2609            .into_iter()
2610            .map(|(index, account)| (account.address, *index))
2611            .collect::<Vec<_>>();
2612        last_indices.sort_by_key(|(a, _)| *a);
2613
2614        // Unwind the account history index.
2615        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2616        for &(address, rem_index) in &last_indices {
2617            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2618                &mut cursor,
2619                ShardedKey::last(address),
2620                rem_index,
2621                |sharded_key| sharded_key.key == address,
2622            )?;
2623
2624            // Check the last returned partial shard.
2625            // If it's not empty, the shard needs to be reinserted.
2626            if !partial_shard.is_empty() {
2627                cursor.insert(
2628                    ShardedKey::last(address),
2629                    &BlockNumberList::new_pre_sorted(partial_shard),
2630                )?;
2631            }
2632        }
2633
2634        let changesets = last_indices.len();
2635        Ok(changesets)
2636    }
2637
2638    fn unwind_account_history_indices_range(
2639        &self,
2640        range: impl RangeBounds<BlockNumber>,
2641    ) -> ProviderResult<usize> {
2642        let changesets = self
2643            .tx
2644            .cursor_read::<tables::AccountChangeSets>()?
2645            .walk_range(range)?
2646            .collect::<Result<Vec<_>, _>>()?;
2647        self.unwind_account_history_indices(changesets.iter())
2648    }
2649
2650    fn insert_account_history_index(
2651        &self,
2652        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2653    ) -> ProviderResult<()> {
2654        self.append_history_index::<_, tables::AccountsHistory>(
2655            account_transitions,
2656            ShardedKey::new,
2657        )
2658    }
2659
2660    fn unwind_storage_history_indices(
2661        &self,
2662        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2663    ) -> ProviderResult<usize> {
2664        let mut storage_changesets = changesets
2665            .into_iter()
2666            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2667            .collect::<Vec<_>>();
2668        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2669
2670        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2671        for &(address, storage_key, rem_index) in &storage_changesets {
2672            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2673                &mut cursor,
2674                StorageShardedKey::last(address, storage_key),
2675                rem_index,
2676                |storage_sharded_key| {
2677                    storage_sharded_key.address == address &&
2678                        storage_sharded_key.sharded_key.key == storage_key
2679                },
2680            )?;
2681
2682            // Check the last returned partial shard.
2683            // If it's not empty, the shard needs to be reinserted.
2684            if !partial_shard.is_empty() {
2685                cursor.insert(
2686                    StorageShardedKey::last(address, storage_key),
2687                    &BlockNumberList::new_pre_sorted(partial_shard),
2688                )?;
2689            }
2690        }
2691
2692        let changesets = storage_changesets.len();
2693        Ok(changesets)
2694    }
2695
2696    fn unwind_storage_history_indices_range(
2697        &self,
2698        range: impl RangeBounds<BlockNumberAddress>,
2699    ) -> ProviderResult<usize> {
2700        let changesets = self
2701            .tx
2702            .cursor_read::<tables::StorageChangeSets>()?
2703            .walk_range(range)?
2704            .collect::<Result<Vec<_>, _>>()?;
2705        self.unwind_storage_history_indices(changesets.into_iter())
2706    }
2707
2708    fn insert_storage_history_index(
2709        &self,
2710        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2711    ) -> ProviderResult<()> {
2712        self.append_history_index::<_, tables::StoragesHistory>(
2713            storage_transitions,
2714            |(address, storage_key), highest_block_number| {
2715                StorageShardedKey::new(address, storage_key, highest_block_number)
2716            },
2717        )
2718    }
2719
2720    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2721        // account history stage
2722        {
2723            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2724            self.insert_account_history_index(indices)?;
2725        }
2726
2727        // storage history stage
2728        {
2729            let indices = self.changed_storages_and_blocks_with_range(range)?;
2730            self.insert_storage_history_index(indices)?;
2731        }
2732
2733        Ok(())
2734    }
2735}
2736
2737impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2738    for DatabaseProvider<TX, N>
2739{
2740    fn take_block_and_execution_above(
2741        &self,
2742        block: BlockNumber,
2743        remove_from: StorageLocation,
2744    ) -> ProviderResult<Chain<Self::Primitives>> {
2745        let range = block + 1..=self.last_block_number()?;
2746
2747        self.unwind_trie_state_range(range.clone())?;
2748
2749        // get execution res
2750        let execution_state = self.take_state_above(block, remove_from)?;
2751
2752        let blocks = self.recovered_block_range(range)?;
2753
2754        // remove block bodies it is needed for both get block range and get block execution results
2755        // that is why it is deleted afterwards.
2756        self.remove_blocks_above(block, remove_from)?;
2757
2758        // Update pipeline progress
2759        self.update_pipeline_stages(block, true)?;
2760
2761        Ok(Chain::new(blocks, execution_state, None))
2762    }
2763
2764    fn remove_block_and_execution_above(
2765        &self,
2766        block: BlockNumber,
2767        remove_from: StorageLocation,
2768    ) -> ProviderResult<()> {
2769        let range = block + 1..=self.last_block_number()?;
2770
2771        self.unwind_trie_state_range(range)?;
2772
2773        // remove execution res
2774        self.remove_state_above(block, remove_from)?;
2775
2776        // remove block bodies it is needed for both get block range and get block execution results
2777        // that is why it is deleted afterwards.
2778        self.remove_blocks_above(block, remove_from)?;
2779
2780        // Update pipeline progress
2781        self.update_pipeline_stages(block, true)?;
2782
2783        Ok(())
2784    }
2785}
2786
2787impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2788    for DatabaseProvider<TX, N>
2789{
2790    type Block = BlockTy<N>;
2791    type Receipt = ReceiptTy<N>;
2792
2793    /// Inserts the block into the database, always modifying the following tables:
2794    /// * [`CanonicalHeaders`](tables::CanonicalHeaders)
2795    /// * [`Headers`](tables::Headers)
2796    /// * [`HeaderNumbers`](tables::HeaderNumbers)
2797    /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
2798    /// * [`BlockBodyIndices`](tables::BlockBodyIndices)
2799    ///
2800    /// If there are transactions in the block, the following tables will be modified:
2801    /// * [`Transactions`](tables::Transactions)
2802    /// * [`TransactionBlocks`](tables::TransactionBlocks)
2803    ///
2804    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2805    /// If withdrawals are not empty, this will modify
2806    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2807    ///
2808    /// If the provider has __not__ configured full sender pruning, this will modify
2809    /// [`TransactionSenders`](tables::TransactionSenders).
2810    ///
2811    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2812    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2813    fn insert_block(
2814        &self,
2815        block: RecoveredBlock<Self::Block>,
2816        write_to: StorageLocation,
2817    ) -> ProviderResult<StoredBlockBodyIndices> {
2818        let block_number = block.number();
2819
2820        let mut durations_recorder = metrics::DurationsRecorder::default();
2821
2822        // total difficulty
2823        let ttd = if block_number == 0 {
2824            block.header().difficulty()
2825        } else {
2826            let parent_block_number = block_number - 1;
2827            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2828            durations_recorder.record_relative(metrics::Action::GetParentTD);
2829            parent_ttd + block.header().difficulty()
2830        };
2831
2832        if write_to.database() {
2833            self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2834            durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2835
2836            // Put header with canonical hashes.
2837            self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2838            durations_recorder.record_relative(metrics::Action::InsertHeaders);
2839
2840            self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2841            durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2842        }
2843
2844        if write_to.static_files() {
2845            let mut writer =
2846                self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2847            writer.append_header(block.header(), ttd, &block.hash())?;
2848        }
2849
2850        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2851        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2852
2853        let mut next_tx_num = self
2854            .tx
2855            .cursor_read::<tables::TransactionBlocks>()?
2856            .last()?
2857            .map(|(n, _)| n + 1)
2858            .unwrap_or_default();
2859        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2860        let first_tx_num = next_tx_num;
2861
2862        let tx_count = block.body().transaction_count() as u64;
2863
2864        // Ensures we have all the senders for the block's transactions.
2865        for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2866            let hash = transaction.tx_hash();
2867
2868            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2869                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2870            }
2871
2872            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2873                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2874            }
2875            next_tx_num += 1;
2876        }
2877
2878        self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2879
2880        debug!(
2881            target: "providers::db",
2882            ?block_number,
2883            actions = ?durations_recorder.actions,
2884            "Inserted block"
2885        );
2886
2887        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2888    }
2889
2890    fn append_block_bodies(
2891        &self,
2892        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2893        write_to: StorageLocation,
2894    ) -> ProviderResult<()> {
2895        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2896
2897        // Initialize writer if we will be writing transactions to staticfiles
2898        let mut tx_static_writer = write_to
2899            .static_files()
2900            .then(|| {
2901                self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2902            })
2903            .transpose()?;
2904
2905        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2906        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2907
2908        // Initialize cursor if we will be writing transactions to database
2909        let mut tx_cursor = write_to
2910            .database()
2911            .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2912            .transpose()?;
2913
2914        // Get id for the next tx_num or zero if there are no transactions.
2915        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2916
2917        for (block_number, body) in &bodies {
2918            // Increment block on static file header.
2919            if let Some(writer) = tx_static_writer.as_mut() {
2920                writer.increment_block(*block_number)?;
2921            }
2922
2923            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2924            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2925
2926            let mut durations_recorder = metrics::DurationsRecorder::default();
2927
2928            // insert block meta
2929            block_indices_cursor.append(*block_number, &block_indices)?;
2930
2931            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2932
2933            let Some(body) = body else { continue };
2934
2935            // write transaction block index
2936            if !body.transactions().is_empty() {
2937                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2938                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2939            }
2940
2941            // write transactions
2942            for transaction in body.transactions() {
2943                if let Some(writer) = tx_static_writer.as_mut() {
2944                    writer.append_transaction(next_tx_num, transaction)?;
2945                }
2946                if let Some(cursor) = tx_cursor.as_mut() {
2947                    cursor.append(next_tx_num, transaction)?;
2948                }
2949
2950                // Increment transaction id for each transaction.
2951                next_tx_num += 1;
2952            }
2953        }
2954
2955        self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2956
2957        Ok(())
2958    }
2959
2960    fn remove_blocks_above(
2961        &self,
2962        block: BlockNumber,
2963        remove_from: StorageLocation,
2964    ) -> ProviderResult<()> {
2965        for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2966            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2967        }
2968
2969        // Only prune canonical headers after we've removed the block hashes as we rely on data from
2970        // this table in `canonical_hashes_range`.
2971        self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2972        self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2973        self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2974
2975        // First transaction to be removed
2976        let unwind_tx_from = self
2977            .block_body_indices(block)?
2978            .map(|b| b.next_tx_num())
2979            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2980
2981        // Last transaction to be removed
2982        let unwind_tx_to = self
2983            .tx
2984            .cursor_read::<tables::BlockBodyIndices>()?
2985            .last()?
2986            // shouldn't happen because this was OK above
2987            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
2988            .1
2989            .last_tx_num();
2990
2991        if unwind_tx_from <= unwind_tx_to {
2992            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
2993                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
2994            }
2995        }
2996
2997        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
2998
2999        self.remove_bodies_above(block, remove_from)?;
3000
3001        Ok(())
3002    }
3003
3004    fn remove_bodies_above(
3005        &self,
3006        block: BlockNumber,
3007        remove_from: StorageLocation,
3008    ) -> ProviderResult<()> {
3009        self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3010
3011        // First transaction to be removed
3012        let unwind_tx_from = self
3013            .block_body_indices(block)?
3014            .map(|b| b.next_tx_num())
3015            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3016
3017        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3018        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3019
3020        if remove_from.database() {
3021            self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3022        }
3023
3024        if remove_from.static_files() {
3025            let static_file_tx_num = self
3026                .static_file_provider
3027                .get_highest_static_file_tx(StaticFileSegment::Transactions);
3028
3029            let to_delete = static_file_tx_num
3030                .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3031                .unwrap_or_default();
3032
3033            self.static_file_provider
3034                .latest_writer(StaticFileSegment::Transactions)?
3035                .prune_transactions(to_delete, block)?;
3036        }
3037
3038        Ok(())
3039    }
3040
3041    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3042    fn append_blocks_with_state(
3043        &self,
3044        blocks: Vec<RecoveredBlock<Self::Block>>,
3045        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3046        hashed_state: HashedPostStateSorted,
3047        trie_updates: TrieUpdates,
3048    ) -> ProviderResult<()> {
3049        if blocks.is_empty() {
3050            debug!(target: "providers::db", "Attempted to append empty block range");
3051            return Ok(())
3052        }
3053
3054        // Blocks are not empty, so no need to handle the case of `blocks.first()` being
3055        // `None`.
3056        let first_number = blocks[0].number();
3057
3058        // Blocks are not empty, so no need to handle the case of `blocks.last()` being
3059        // `None`.
3060        let last_block_number = blocks[blocks.len() - 1].number();
3061
3062        let mut durations_recorder = metrics::DurationsRecorder::default();
3063
3064        // Insert the blocks
3065        for block in blocks {
3066            self.insert_block(block, StorageLocation::Database)?;
3067            durations_recorder.record_relative(metrics::Action::InsertBlock);
3068        }
3069
3070        self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3071        durations_recorder.record_relative(metrics::Action::InsertState);
3072
3073        // insert hashes and intermediate merkle nodes
3074        self.write_hashed_state(&hashed_state)?;
3075        self.write_trie_updates(&trie_updates)?;
3076        durations_recorder.record_relative(metrics::Action::InsertHashes);
3077
3078        self.update_history_indices(first_number..=last_block_number)?;
3079        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3080
3081        // Update pipeline progress
3082        self.update_pipeline_stages(last_block_number, false)?;
3083        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3084
3085        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3086
3087        Ok(())
3088    }
3089}
3090
3091impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3092    fn get_prune_checkpoint(
3093        &self,
3094        segment: PruneSegment,
3095    ) -> ProviderResult<Option<PruneCheckpoint>> {
3096        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3097    }
3098
3099    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3100        Ok(self
3101            .tx
3102            .cursor_read::<tables::PruneCheckpoints>()?
3103            .walk(None)?
3104            .collect::<Result<_, _>>()?)
3105    }
3106}
3107
3108impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3109    fn save_prune_checkpoint(
3110        &self,
3111        segment: PruneSegment,
3112        checkpoint: PruneCheckpoint,
3113    ) -> ProviderResult<()> {
3114        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3115    }
3116}
3117
3118impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3119    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3120        let db_entries = self.tx.entries::<T>()?;
3121        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3122            Ok(entries) => entries,
3123            Err(ProviderError::UnsupportedProvider) => 0,
3124            Err(err) => return Err(err),
3125        };
3126
3127        Ok(db_entries + static_file_entries)
3128    }
3129}
3130
3131impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3132    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3133        let mut finalized_blocks = self
3134            .tx
3135            .cursor_read::<tables::ChainState>()?
3136            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3137            .take(1)
3138            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3139
3140        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3141        Ok(last_finalized_block_number)
3142    }
3143
3144    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3145        let mut finalized_blocks = self
3146            .tx
3147            .cursor_read::<tables::ChainState>()?
3148            .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3149            .take(1)
3150            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3151
3152        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3153        Ok(last_finalized_block_number)
3154    }
3155}
3156
3157impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3158    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3159        Ok(self
3160            .tx
3161            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3162    }
3163
3164    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3165        Ok(self
3166            .tx
3167            .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3168    }
3169}
3170
3171impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3172    type Tx = TX;
3173
3174    fn tx_ref(&self) -> &Self::Tx {
3175        &self.tx
3176    }
3177
3178    fn tx_mut(&mut self) -> &mut Self::Tx {
3179        &mut self.tx
3180    }
3181
3182    fn into_tx(self) -> Self::Tx {
3183        self.tx
3184    }
3185
3186    fn prune_modes_ref(&self) -> &PruneModes {
3187        self.prune_modes_ref()
3188    }
3189}
3190
3191#[cfg(test)]
3192mod tests {
3193    use super::*;
3194    use crate::{
3195        test_utils::{blocks::BlockchainTestData, create_test_provider_factory},
3196        BlockWriter,
3197    };
3198    use reth_testing_utils::generators::{self, random_block, BlockParams};
3199
3200    #[test]
3201    fn test_receipts_by_block_range_empty_range() {
3202        let factory = create_test_provider_factory();
3203        let provider = factory.provider().unwrap();
3204
3205        // empty range should return empty vec
3206        let start = 10u64;
3207        let end = 9u64;
3208        let result = provider.receipts_by_block_range(start..=end).unwrap();
3209        assert_eq!(result, Vec::<Vec<reth_ethereum_primitives::Receipt>>::new());
3210    }
3211
3212    #[test]
3213    fn test_receipts_by_block_range_nonexistent_blocks() {
3214        let factory = create_test_provider_factory();
3215        let provider = factory.provider().unwrap();
3216
3217        // non-existent blocks should return empty vecs for each block
3218        let result = provider.receipts_by_block_range(10..=12).unwrap();
3219        assert_eq!(result, vec![vec![], vec![], vec![]]);
3220    }
3221
3222    #[test]
3223    fn test_receipts_by_block_range_single_block() {
3224        let factory = create_test_provider_factory();
3225        let data = BlockchainTestData::default();
3226
3227        let provider_rw = factory.provider_rw().unwrap();
3228        provider_rw
3229            .insert_block(
3230                data.genesis.clone().try_recover().unwrap(),
3231                crate::StorageLocation::Database,
3232            )
3233            .unwrap();
3234        provider_rw
3235            .insert_block(data.blocks[0].0.clone(), crate::StorageLocation::Database)
3236            .unwrap();
3237        provider_rw
3238            .write_state(
3239                &data.blocks[0].1,
3240                crate::OriginalValuesKnown::No,
3241                crate::StorageLocation::Database,
3242            )
3243            .unwrap();
3244        provider_rw.commit().unwrap();
3245
3246        let provider = factory.provider().unwrap();
3247        let result = provider.receipts_by_block_range(1..=1).unwrap();
3248
3249        // should have one vec with one receipt
3250        assert_eq!(result.len(), 1);
3251        assert_eq!(result[0].len(), 1);
3252        assert_eq!(result[0][0], data.blocks[0].1.receipts()[0][0]);
3253    }
3254
3255    #[test]
3256    fn test_receipts_by_block_range_multiple_blocks() {
3257        let factory = create_test_provider_factory();
3258        let data = BlockchainTestData::default();
3259
3260        let provider_rw = factory.provider_rw().unwrap();
3261        provider_rw
3262            .insert_block(
3263                data.genesis.clone().try_recover().unwrap(),
3264                crate::StorageLocation::Database,
3265            )
3266            .unwrap();
3267        for i in 0..3 {
3268            provider_rw
3269                .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3270                .unwrap();
3271            provider_rw
3272                .write_state(
3273                    &data.blocks[i].1,
3274                    crate::OriginalValuesKnown::No,
3275                    crate::StorageLocation::Database,
3276                )
3277                .unwrap();
3278        }
3279        provider_rw.commit().unwrap();
3280
3281        let provider = factory.provider().unwrap();
3282        let result = provider.receipts_by_block_range(1..=3).unwrap();
3283
3284        // should have 3 vecs, each with one receipt
3285        assert_eq!(result.len(), 3);
3286        for (i, block_receipts) in result.iter().enumerate() {
3287            assert_eq!(block_receipts.len(), 1);
3288            assert_eq!(block_receipts[0], data.blocks[i].1.receipts()[0][0]);
3289        }
3290    }
3291
3292    #[test]
3293    fn test_receipts_by_block_range_blocks_with_varying_tx_counts() {
3294        let factory = create_test_provider_factory();
3295        let data = BlockchainTestData::default();
3296
3297        let provider_rw = factory.provider_rw().unwrap();
3298        provider_rw
3299            .insert_block(
3300                data.genesis.clone().try_recover().unwrap(),
3301                crate::StorageLocation::Database,
3302            )
3303            .unwrap();
3304
3305        // insert blocks 1-3 with receipts
3306        for i in 0..3 {
3307            provider_rw
3308                .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3309                .unwrap();
3310            provider_rw
3311                .write_state(
3312                    &data.blocks[i].1,
3313                    crate::OriginalValuesKnown::No,
3314                    crate::StorageLocation::Database,
3315                )
3316                .unwrap();
3317        }
3318        provider_rw.commit().unwrap();
3319
3320        let provider = factory.provider().unwrap();
3321        let result = provider.receipts_by_block_range(1..=3).unwrap();
3322
3323        // verify each block has one receipt
3324        assert_eq!(result.len(), 3);
3325        for block_receipts in &result {
3326            assert_eq!(block_receipts.len(), 1);
3327        }
3328    }
3329
3330    #[test]
3331    fn test_receipts_by_block_range_partial_range() {
3332        let factory = create_test_provider_factory();
3333        let data = BlockchainTestData::default();
3334
3335        let provider_rw = factory.provider_rw().unwrap();
3336        provider_rw
3337            .insert_block(
3338                data.genesis.clone().try_recover().unwrap(),
3339                crate::StorageLocation::Database,
3340            )
3341            .unwrap();
3342        for i in 0..3 {
3343            provider_rw
3344                .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3345                .unwrap();
3346            provider_rw
3347                .write_state(
3348                    &data.blocks[i].1,
3349                    crate::OriginalValuesKnown::No,
3350                    crate::StorageLocation::Database,
3351                )
3352                .unwrap();
3353        }
3354        provider_rw.commit().unwrap();
3355
3356        let provider = factory.provider().unwrap();
3357
3358        // request range that includes both existing and non-existing blocks
3359        let result = provider.receipts_by_block_range(2..=5).unwrap();
3360        assert_eq!(result.len(), 4);
3361
3362        // blocks 2-3 should have receipts, blocks 4-5 should be empty
3363        assert_eq!(result[0].len(), 1); // block 2
3364        assert_eq!(result[1].len(), 1); // block 3
3365        assert_eq!(result[2].len(), 0); // block 4 (doesn't exist)
3366        assert_eq!(result[3].len(), 0); // block 5 (doesn't exist)
3367
3368        assert_eq!(result[0][0], data.blocks[1].1.receipts()[0][0]);
3369        assert_eq!(result[1][0], data.blocks[2].1.receipts()[0][0]);
3370    }
3371
3372    #[test]
3373    fn test_receipts_by_block_range_all_empty_blocks() {
3374        let factory = create_test_provider_factory();
3375        let mut rng = generators::rng();
3376
3377        // create blocks with no transactions
3378        let mut blocks = Vec::new();
3379        for i in 1..=3 {
3380            let block =
3381                random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() });
3382            blocks.push(block);
3383        }
3384
3385        let provider_rw = factory.provider_rw().unwrap();
3386        for block in blocks {
3387            provider_rw
3388                .insert_block(block.try_recover().unwrap(), crate::StorageLocation::Database)
3389                .unwrap();
3390        }
3391        provider_rw.commit().unwrap();
3392
3393        let provider = factory.provider().unwrap();
3394        let result = provider.receipts_by_block_range(1..=3).unwrap();
3395
3396        assert_eq!(result.len(), 3);
3397        for block_receipts in result {
3398            assert_eq!(block_receipts.len(), 0);
3399        }
3400    }
3401
3402    #[test]
3403    fn test_receipts_by_block_range_consistency_with_individual_calls() {
3404        let factory = create_test_provider_factory();
3405        let data = BlockchainTestData::default();
3406
3407        let provider_rw = factory.provider_rw().unwrap();
3408        provider_rw
3409            .insert_block(
3410                data.genesis.clone().try_recover().unwrap(),
3411                crate::StorageLocation::Database,
3412            )
3413            .unwrap();
3414        for i in 0..3 {
3415            provider_rw
3416                .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database)
3417                .unwrap();
3418            provider_rw
3419                .write_state(
3420                    &data.blocks[i].1,
3421                    crate::OriginalValuesKnown::No,
3422                    crate::StorageLocation::Database,
3423                )
3424                .unwrap();
3425        }
3426        provider_rw.commit().unwrap();
3427
3428        let provider = factory.provider().unwrap();
3429
3430        // get receipts using block range method
3431        let range_result = provider.receipts_by_block_range(1..=3).unwrap();
3432
3433        // get receipts using individual block calls
3434        let mut individual_results = Vec::new();
3435        for block_num in 1..=3 {
3436            let receipts =
3437                provider.receipts_by_block(block_num.into()).unwrap().unwrap_or_default();
3438            individual_results.push(receipts);
3439        }
3440
3441        assert_eq!(range_result, individual_results);
3442    }
3443}