reth_provider/providers/database/
provider.rs

1use crate::{
2    bundle_state::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        static_file::StaticFileWriter,
6        NodeTypesForProvider, StaticFileProvider,
7    },
8    to_range,
9    traits::{
10        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11    },
12    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14    DBProvider, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
15    HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter, LatestStateProvider,
16    LatestStateProviderRef, OriginalValuesKnown, ProviderError, PruneCheckpointReader,
17    PruneCheckpointWriter, RevertsInit, StageCheckpointReader, StateCommitmentProvider,
18    StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation,
19    StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
20    TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
21};
22use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header, TxReceipt};
23use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
24use alloy_primitives::{
25    keccak256,
26    map::{hash_map, B256Map, HashMap, HashSet},
27    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
28};
29use itertools::Itertools;
30use rayon::slice::ParallelSliceMut;
31use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
32use reth_db_api::{
33    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
34    database::Database,
35    models::{
36        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
37        ShardedKey, StoredBlockBodyIndices,
38    },
39    table::Table,
40    tables,
41    transaction::{DbTx, DbTxMut},
42    BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
43};
44use reth_execution_types::{Chain, ExecutionOutcome};
45use reth_network_p2p::headers::downloader::SyncTarget;
46use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
47use reth_primitives::{
48    Account, Bytecode, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
49    StaticFileSegment, StorageEntry,
50};
51use reth_primitives_traits::{Block as _, BlockBody as _, SignedTransaction};
52use reth_prune_types::{
53    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
54};
55use reth_stages_types::{StageCheckpoint, StageId};
56use reth_storage_api::{
57    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
58    StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
59};
60use reth_storage_errors::provider::{ProviderResult, RootMismatch};
61use reth_trie::{
62    prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
63    updates::{StorageTrieUpdates, TrieUpdates},
64    HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
65};
66use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
67use revm_database::states::{
68    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
69};
70use std::{
71    cmp::Ordering,
72    collections::{BTreeMap, BTreeSet},
73    fmt::Debug,
74    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
75    sync::{mpsc, Arc},
76};
77use tokio::sync::watch;
78use tracing::{debug, trace};
79
80/// A [`DatabaseProvider`] that holds a read-only database transaction.
81pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
82
83/// A [`DatabaseProvider`] that holds a read-write database transaction.
84///
85/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
86/// Once that issue is solved, we can probably revert back to being an alias type.
87#[derive(Debug)]
88pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
89    pub DatabaseProvider<<DB as Database>::TXMut, N>,
90);
91
92impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
93    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
94
95    fn deref(&self) -> &Self::Target {
96        &self.0
97    }
98}
99
100impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
101    fn deref_mut(&mut self) -> &mut Self::Target {
102        &mut self.0
103    }
104}
105
106impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
107    for DatabaseProviderRW<DB, N>
108{
109    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
110        &self.0
111    }
112}
113
114impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
115    /// Commit database transaction and static file if it exists.
116    pub fn commit(self) -> ProviderResult<bool> {
117        self.0.commit()
118    }
119
120    /// Consume `DbTx` or `DbTxMut`.
121    pub fn into_tx(self) -> <DB as Database>::TXMut {
122        self.0.into_tx()
123    }
124}
125
126impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
127    for DatabaseProvider<<DB as Database>::TXMut, N>
128{
129    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
130        provider.0
131    }
132}
133
134/// A provider struct that fetches data from the database.
135/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
136#[derive(Debug)]
137pub struct DatabaseProvider<TX, N: NodeTypes> {
138    /// Database transaction.
139    tx: TX,
140    /// Chain spec
141    chain_spec: Arc<N::ChainSpec>,
142    /// Static File provider
143    static_file_provider: StaticFileProvider<N::Primitives>,
144    /// Pruning configuration
145    prune_modes: PruneModes,
146    /// Node storage handler.
147    storage: Arc<N::Storage>,
148}
149
150impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
151    /// Returns reference to prune modes.
152    pub const fn prune_modes_ref(&self) -> &PruneModes {
153        &self.prune_modes
154    }
155}
156
157impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
158    /// State provider for latest state
159    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
160        trace!(target: "providers::db", "Returning latest state provider");
161        Box::new(LatestStateProviderRef::new(self))
162    }
163
164    /// Storage provider for state at that given block hash
165    pub fn history_by_block_hash<'a>(
166        &'a self,
167        block_hash: BlockHash,
168    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
169        let mut block_number =
170            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
171        if block_number == self.best_block_number().unwrap_or_default() &&
172            block_number == self.last_block_number().unwrap_or_default()
173        {
174            return Ok(Box::new(LatestStateProviderRef::new(self)))
175        }
176
177        // +1 as the changeset that we want is the one that was applied after this block.
178        block_number += 1;
179
180        let account_history_prune_checkpoint =
181            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
182        let storage_history_prune_checkpoint =
183            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
184
185        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
186
187        // If we pruned account or storage history, we can't return state on every historical block.
188        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
189        if let Some(prune_checkpoint_block_number) =
190            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
191        {
192            state_provider = state_provider.with_lowest_available_account_history_block_number(
193                prune_checkpoint_block_number + 1,
194            );
195        }
196        if let Some(prune_checkpoint_block_number) =
197            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
198        {
199            state_provider = state_provider.with_lowest_available_storage_history_block_number(
200                prune_checkpoint_block_number + 1,
201            );
202        }
203
204        Ok(Box::new(state_provider))
205    }
206
207    #[cfg(feature = "test-utils")]
208    /// Sets the prune modes for provider.
209    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
210        self.prune_modes = prune_modes;
211    }
212}
213
214impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
215    type Primitives = N::Primitives;
216}
217
218impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
219    /// Returns a static file provider
220    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
221        self.static_file_provider.clone()
222    }
223}
224
225impl<TX: Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
226    for DatabaseProvider<TX, N>
227{
228    type ChainSpec = N::ChainSpec;
229
230    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
231        self.chain_spec.clone()
232    }
233}
234
235impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
236    /// Creates a provider with an inner read-write transaction.
237    pub const fn new_rw(
238        tx: TX,
239        chain_spec: Arc<N::ChainSpec>,
240        static_file_provider: StaticFileProvider<N::Primitives>,
241        prune_modes: PruneModes,
242        storage: Arc<N::Storage>,
243    ) -> Self {
244        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
245    }
246}
247
248impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
249    fn as_ref(&self) -> &Self {
250        self
251    }
252}
253
254impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
255    /// Unwinds trie state for the given range.
256    ///
257    /// This includes calculating the resulted state root and comparing it with the parent block
258    /// state root.
259    pub fn unwind_trie_state_range(
260        &self,
261        range: RangeInclusive<BlockNumber>,
262    ) -> ProviderResult<()> {
263        let changed_accounts = self
264            .tx
265            .cursor_read::<tables::AccountChangeSets>()?
266            .walk_range(range.clone())?
267            .collect::<Result<Vec<_>, _>>()?;
268
269        // Unwind account hashes. Add changed accounts to account prefix set.
270        let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
271        let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
272        let mut destroyed_accounts = HashSet::default();
273        for (hashed_address, account) in hashed_addresses {
274            account_prefix_set.insert(Nibbles::unpack(hashed_address));
275            if account.is_none() {
276                destroyed_accounts.insert(hashed_address);
277            }
278        }
279
280        // Unwind account history indices.
281        self.unwind_account_history_indices(changed_accounts.iter())?;
282        let storage_range = BlockNumberAddress::range(range.clone());
283
284        let changed_storages = self
285            .tx
286            .cursor_read::<tables::StorageChangeSets>()?
287            .walk_range(storage_range)?
288            .collect::<Result<Vec<_>, _>>()?;
289
290        // Unwind storage hashes. Add changed account and storage keys to corresponding prefix
291        // sets.
292        let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
293        let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
294        for (hashed_address, hashed_slots) in storage_entries {
295            account_prefix_set.insert(Nibbles::unpack(hashed_address));
296            let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
297            for slot in hashed_slots {
298                storage_prefix_set.insert(Nibbles::unpack(slot));
299            }
300            storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
301        }
302
303        // Unwind storage history indices.
304        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
305
306        // Calculate the reverted merkle root.
307        // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
308        // are pre-loaded.
309        let prefix_sets = TriePrefixSets {
310            account_prefix_set: account_prefix_set.freeze(),
311            storage_prefix_sets,
312            destroyed_accounts,
313        };
314        let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
315            .with_prefix_sets(prefix_sets)
316            .root_with_updates()
317            .map_err(reth_db_api::DatabaseError::from)?;
318
319        let parent_number = range.start().saturating_sub(1);
320        let parent_state_root = self
321            .header_by_number(parent_number)?
322            .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
323            .state_root();
324
325        // state root should be always correct as we are reverting state.
326        // but for sake of double verification we will check it again.
327        if new_state_root != parent_state_root {
328            let parent_hash = self
329                .block_hash(parent_number)?
330                .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
331            return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
332                root: GotExpected { got: new_state_root, expected: parent_state_root },
333                block_number: parent_number,
334                block_hash: parent_hash,
335            })))
336        }
337        self.write_trie_updates(&trie_updates)?;
338
339        Ok(())
340    }
341
342    /// Removes receipts from all transactions starting with provided number (inclusive).
343    fn remove_receipts_from(
344        &self,
345        from_tx: TxNumber,
346        last_block: BlockNumber,
347        remove_from: StorageLocation,
348    ) -> ProviderResult<()> {
349        if remove_from.database() {
350            // iterate over block body and remove receipts
351            self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
352        }
353
354        if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
355            let static_file_receipt_num =
356                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
357
358            let to_delete = static_file_receipt_num
359                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
360                .unwrap_or_default();
361
362            self.static_file_provider
363                .latest_writer(StaticFileSegment::Receipts)?
364                .prune_receipts(to_delete, last_block)?;
365        }
366
367        Ok(())
368    }
369}
370
371impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
372    fn try_into_history_at_block(
373        self,
374        mut block_number: BlockNumber,
375    ) -> ProviderResult<StateProviderBox> {
376        // if the block number is the same as the currently best block number on disk we can use the
377        // latest state provider here
378        if block_number == self.best_block_number().unwrap_or_default() {
379            return Ok(Box::new(LatestStateProvider::new(self)))
380        }
381
382        // +1 as the changeset that we want is the one that was applied after this block.
383        block_number += 1;
384
385        let account_history_prune_checkpoint =
386            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
387        let storage_history_prune_checkpoint =
388            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
389
390        let mut state_provider = HistoricalStateProvider::new(self, block_number);
391
392        // If we pruned account or storage history, we can't return state on every historical block.
393        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
394        if let Some(prune_checkpoint_block_number) =
395            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
396        {
397            state_provider = state_provider.with_lowest_available_account_history_block_number(
398                prune_checkpoint_block_number + 1,
399            );
400        }
401        if let Some(prune_checkpoint_block_number) =
402            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
403        {
404            state_provider = state_provider.with_lowest_available_storage_history_block_number(
405                prune_checkpoint_block_number + 1,
406            );
407        }
408
409        Ok(Box::new(state_provider))
410    }
411}
412
413impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
414    type StateCommitment = N::StateCommitment;
415}
416
417impl<
418        Tx: DbTx + DbTxMut + 'static,
419        N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
420    > DatabaseProvider<Tx, N>
421{
422    // TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev.
423    // #[cfg(any(test, feature = "test-utils"))]
424    /// Inserts an historical block. **Used for setting up test environments**
425    pub fn insert_historical_block(
426        &self,
427        block: RecoveredBlock<<Self as BlockWriter>::Block>,
428    ) -> ProviderResult<StoredBlockBodyIndices> {
429        let ttd = if block.number() == 0 {
430            block.header().difficulty()
431        } else {
432            let parent_block_number = block.number() - 1;
433            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
434            parent_ttd + block.header().difficulty()
435        };
436
437        let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
438
439        // Backfill: some tests start at a forward block number, but static files require no gaps.
440        let segment_header = writer.user_header();
441        if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
442            for block_number in 0..block.number() {
443                let mut prev = block.clone_header();
444                prev.number = block_number;
445                writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
446            }
447        }
448
449        writer.append_header(block.header(), ttd, &block.hash())?;
450
451        self.insert_block(block, StorageLocation::Database)
452    }
453}
454
455/// For a given key, unwind all history shards that are below the given block number.
456///
457/// S - Sharded key subtype.
458/// T - Table to walk over.
459/// C - Cursor implementation.
460///
461/// This function walks the entries from the given start key and deletes all shards that belong to
462/// the key and are below the given block number.
463///
464/// The boundary shard (the shard is split by the block number) is removed from the database. Any
465/// indices that are above the block number are filtered out. The boundary shard is returned for
466/// reinsertion (if it's not empty).
467fn unwind_history_shards<S, T, C>(
468    cursor: &mut C,
469    start_key: T::Key,
470    block_number: BlockNumber,
471    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
472) -> ProviderResult<Vec<u64>>
473where
474    T: Table<Value = BlockNumberList>,
475    T::Key: AsRef<ShardedKey<S>>,
476    C: DbCursorRO<T> + DbCursorRW<T>,
477{
478    let mut item = cursor.seek_exact(start_key)?;
479    while let Some((sharded_key, list)) = item {
480        // If the shard does not belong to the key, break.
481        if !shard_belongs_to_key(&sharded_key) {
482            break
483        }
484        cursor.delete_current()?;
485
486        // Check the first item.
487        // If it is greater or eq to the block number, delete it.
488        let first = list.iter().next().expect("List can't be empty");
489        if first >= block_number {
490            item = cursor.prev()?;
491            continue
492        } else if block_number <= sharded_key.as_ref().highest_block_number {
493            // Filter out all elements greater than block number.
494            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
495        }
496        return Ok(list.iter().collect::<Vec<_>>())
497    }
498
499    Ok(Vec::new())
500}
501
502impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
503    /// Creates a provider with an inner read-only transaction.
504    pub const fn new(
505        tx: TX,
506        chain_spec: Arc<N::ChainSpec>,
507        static_file_provider: StaticFileProvider<N::Primitives>,
508        prune_modes: PruneModes,
509        storage: Arc<N::Storage>,
510    ) -> Self {
511        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
512    }
513
514    /// Consume `DbTx` or `DbTxMut`.
515    pub fn into_tx(self) -> TX {
516        self.tx
517    }
518
519    /// Pass `DbTx` or `DbTxMut` mutable reference.
520    pub fn tx_mut(&mut self) -> &mut TX {
521        &mut self.tx
522    }
523
524    /// Pass `DbTx` or `DbTxMut` immutable reference.
525    pub const fn tx_ref(&self) -> &TX {
526        &self.tx
527    }
528
529    /// Returns a reference to the chain specification.
530    pub fn chain_spec(&self) -> &N::ChainSpec {
531        &self.chain_spec
532    }
533}
534
535impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
536    fn transactions_by_tx_range_with_cursor<C>(
537        &self,
538        range: impl RangeBounds<TxNumber>,
539        cursor: &mut C,
540    ) -> ProviderResult<Vec<TxTy<N>>>
541    where
542        C: DbCursorRO<tables::Transactions<TxTy<N>>>,
543    {
544        self.static_file_provider.get_range_with_static_file_or_database(
545            StaticFileSegment::Transactions,
546            to_range(range),
547            |static_file, range, _| static_file.transactions_by_tx_range(range),
548            |range, _| self.cursor_collect(cursor, range),
549            |_| true,
550        )
551    }
552
553    fn recovered_block<H, HF, B, BF>(
554        &self,
555        id: BlockHashOrNumber,
556        _transaction_kind: TransactionVariant,
557        header_by_number: HF,
558        construct_block: BF,
559    ) -> ProviderResult<Option<B>>
560    where
561        H: AsRef<HeaderTy<N>>,
562        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
563        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
564    {
565        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
566        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
567
568        // Get the block body
569        //
570        // If the body indices are not found, this means that the transactions either do not exist
571        // in the database yet, or they do exit but are not indexed. If they exist but are not
572        // indexed, we don't have enough information to return the block anyways, so we return
573        // `None`.
574        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
575
576        let tx_range = body.tx_num_range();
577
578        let (transactions, senders) = if tx_range.is_empty() {
579            (vec![], vec![])
580        } else {
581            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
582        };
583
584        let body = self
585            .storage
586            .reader()
587            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
588            .pop()
589            .ok_or(ProviderError::InvalidStorageOutput)?;
590
591        construct_block(header, body, senders)
592    }
593
594    /// Returns a range of blocks from the database.
595    ///
596    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
597    /// construct blocks from the following inputs:
598    ///     – Header
599    ///     - Range of transaction numbers
600    ///     – Ommers
601    ///     – Withdrawals
602    ///     – Senders
603    fn block_range<F, H, HF, R>(
604        &self,
605        range: RangeInclusive<BlockNumber>,
606        headers_range: HF,
607        mut assemble_block: F,
608    ) -> ProviderResult<Vec<R>>
609    where
610        H: AsRef<HeaderTy<N>>,
611        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
612        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
613    {
614        if range.is_empty() {
615            return Ok(Vec::new())
616        }
617
618        let len = range.end().saturating_sub(*range.start()) as usize;
619        let mut blocks = Vec::with_capacity(len);
620
621        let headers = headers_range(range.clone())?;
622        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
623
624        // If the body indices are not found, this means that the transactions either do
625        // not exist in the database yet, or they do exit but are
626        // not indexed. If they exist but are not indexed, we don't
627        // have enough information to return the block anyways, so
628        // we skip the block.
629        let present_headers = self
630            .block_body_indices_range(range)?
631            .into_iter()
632            .map(|b| b.tx_num_range())
633            .zip(headers)
634            .collect::<Vec<_>>();
635
636        let mut inputs = Vec::new();
637        for (tx_range, header) in &present_headers {
638            let transactions = if tx_range.is_empty() {
639                Vec::new()
640            } else {
641                self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
642            };
643
644            inputs.push((header.as_ref(), transactions));
645        }
646
647        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
648
649        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
650            blocks.push(assemble_block(header, body, tx_range)?);
651        }
652
653        Ok(blocks)
654    }
655
656    /// Returns a range of blocks from the database, along with the senders of each
657    /// transaction in the blocks.
658    ///
659    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
660    /// construct blocks from the following inputs:
661    ///     – Header
662    ///     - Transactions
663    ///     – Ommers
664    ///     – Withdrawals
665    ///     – Senders
666    fn block_with_senders_range<H, HF, B, BF>(
667        &self,
668        range: RangeInclusive<BlockNumber>,
669        headers_range: HF,
670        assemble_block: BF,
671    ) -> ProviderResult<Vec<B>>
672    where
673        H: AsRef<HeaderTy<N>>,
674        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
675        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
676    {
677        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
678
679        self.block_range(range, headers_range, |header, body, tx_range| {
680            let senders = if tx_range.is_empty() {
681                Vec::new()
682            } else {
683                // fetch senders from the senders table
684                let known_senders =
685                    senders_cursor
686                        .walk_range(tx_range.clone())?
687                        .collect::<Result<HashMap<_, _>, _>>()?;
688
689                let mut senders = Vec::with_capacity(body.transactions().len());
690                for (tx_num, tx) in tx_range.zip(body.transactions()) {
691                    match known_senders.get(&tx_num) {
692                        None => {
693                            // recover the sender from the transaction if not found
694                            let sender = tx.recover_signer_unchecked()?;
695                            senders.push(sender);
696                        }
697                        Some(sender) => senders.push(*sender),
698                    }
699                }
700
701                senders
702            };
703
704            assemble_block(header, body, senders)
705        })
706    }
707
708    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
709    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
710    /// account changesets.
711    fn populate_bundle_state<A, S>(
712        &self,
713        account_changeset: Vec<(u64, AccountBeforeTx)>,
714        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
715        plain_accounts_cursor: &mut A,
716        plain_storage_cursor: &mut S,
717    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
718    where
719        A: DbCursorRO<PlainAccountState>,
720        S: DbDupCursorRO<PlainStorageState>,
721    {
722        // iterate previous value and get plain state value to create changeset
723        // Double option around Account represent if Account state is know (first option) and
724        // account is removed (Second Option)
725        let mut state: BundleStateInit = HashMap::default();
726
727        // This is not working for blocks that are not at tip. as plain state is not the last
728        // state of end range. We should rename the functions or add support to access
729        // History state. Accessing history state can be tricky but we are not gaining
730        // anything.
731
732        let mut reverts: RevertsInit = HashMap::default();
733
734        // add account changeset changes
735        for (block_number, account_before) in account_changeset.into_iter().rev() {
736            let AccountBeforeTx { info: old_info, address } = account_before;
737            match state.entry(address) {
738                hash_map::Entry::Vacant(entry) => {
739                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
740                    entry.insert((old_info, new_info, HashMap::default()));
741                }
742                hash_map::Entry::Occupied(mut entry) => {
743                    // overwrite old account state.
744                    entry.get_mut().0 = old_info;
745                }
746            }
747            // insert old info into reverts.
748            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
749        }
750
751        // add storage changeset changes
752        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
753            let BlockNumberAddress((block_number, address)) = block_and_address;
754            // get account state or insert from plain state.
755            let account_state = match state.entry(address) {
756                hash_map::Entry::Vacant(entry) => {
757                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
758                    entry.insert((present_info, present_info, HashMap::default()))
759                }
760                hash_map::Entry::Occupied(entry) => entry.into_mut(),
761            };
762
763            // match storage.
764            match account_state.2.entry(old_storage.key) {
765                hash_map::Entry::Vacant(entry) => {
766                    let new_storage = plain_storage_cursor
767                        .seek_by_key_subkey(address, old_storage.key)?
768                        .filter(|storage| storage.key == old_storage.key)
769                        .unwrap_or_default();
770                    entry.insert((old_storage.value, new_storage.value));
771                }
772                hash_map::Entry::Occupied(mut entry) => {
773                    entry.get_mut().0 = old_storage.value;
774                }
775            };
776
777            reverts
778                .entry(block_number)
779                .or_default()
780                .entry(address)
781                .or_default()
782                .1
783                .push(old_storage);
784        }
785
786        Ok((state, reverts))
787    }
788}
789
790impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
791    /// Commit database transaction.
792    pub fn commit(self) -> ProviderResult<bool> {
793        Ok(self.tx.commit()?)
794    }
795
796    /// Load shard and remove it. If list is empty, last shard was full or
797    /// there are no shards at all.
798    fn take_shard<T>(
799        &self,
800        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
801        key: T::Key,
802    ) -> ProviderResult<Vec<u64>>
803    where
804        T: Table<Value = BlockNumberList>,
805    {
806        if let Some((_, list)) = cursor.seek_exact(key)? {
807            // delete old shard so new one can be inserted.
808            cursor.delete_current()?;
809            let list = list.iter().collect::<Vec<_>>();
810            return Ok(list)
811        }
812        Ok(Vec::new())
813    }
814
815    /// Insert history index to the database.
816    ///
817    /// For each updated partial key, this function removes the last shard from
818    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
819    /// inserts the new shards back into the database.
820    ///
821    /// This function is used by history indexing stages.
822    fn append_history_index<P, T>(
823        &self,
824        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
825        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
826    ) -> ProviderResult<()>
827    where
828        P: Copy,
829        T: Table<Value = BlockNumberList>,
830    {
831        let mut cursor = self.tx.cursor_write::<T>()?;
832        for (partial_key, indices) in index_updates {
833            let mut last_shard =
834                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
835            last_shard.extend(indices);
836            // Chunk indices and insert them in shards of N size.
837            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
838            while let Some(list) = chunks.next() {
839                let highest_block_number = if chunks.peek().is_some() {
840                    *list.last().expect("`chunks` does not return empty list")
841                } else {
842                    // Insert last list with `u64::MAX`.
843                    u64::MAX
844                };
845                cursor.insert(
846                    sharded_key_factory(partial_key, highest_block_number),
847                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
848                )?;
849            }
850        }
851        Ok(())
852    }
853}
854
855impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
856    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
857        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
858    }
859}
860
861impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
862    fn changed_accounts_with_range(
863        &self,
864        range: impl RangeBounds<BlockNumber>,
865    ) -> ProviderResult<BTreeSet<Address>> {
866        self.tx
867            .cursor_read::<tables::AccountChangeSets>()?
868            .walk_range(range)?
869            .map(|entry| {
870                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
871            })
872            .collect()
873    }
874
875    fn basic_accounts(
876        &self,
877        iter: impl IntoIterator<Item = Address>,
878    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
879        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
880        Ok(iter
881            .into_iter()
882            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
883            .collect::<Result<Vec<_>, _>>()?)
884    }
885
886    fn changed_accounts_and_blocks_with_range(
887        &self,
888        range: RangeInclusive<BlockNumber>,
889    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
890        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
891
892        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
893            BTreeMap::new(),
894            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
895                let (index, account) = entry?;
896                accounts.entry(account.address).or_default().push(index);
897                Ok(accounts)
898            },
899        )?;
900
901        Ok(account_transitions)
902    }
903}
904
905impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
906    fn storage_changeset(
907        &self,
908        block_number: BlockNumber,
909    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
910        let range = block_number..=block_number;
911        let storage_range = BlockNumberAddress::range(range);
912        self.tx
913            .cursor_dup_read::<tables::StorageChangeSets>()?
914            .walk_range(storage_range)?
915            .map(|result| -> ProviderResult<_> { Ok(result?) })
916            .collect()
917    }
918}
919
920impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
921    fn account_block_changeset(
922        &self,
923        block_number: BlockNumber,
924    ) -> ProviderResult<Vec<AccountBeforeTx>> {
925        let range = block_number..=block_number;
926        self.tx
927            .cursor_read::<tables::AccountChangeSets>()?
928            .walk_range(range)?
929            .map(|result| -> ProviderResult<_> {
930                let (_, account_before) = result?;
931                Ok(account_before)
932            })
933            .collect()
934    }
935}
936
937impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
938    for DatabaseProvider<TX, N>
939{
940    type Header = HeaderTy<N>;
941
942    fn sync_gap(
943        &self,
944        tip: watch::Receiver<B256>,
945        highest_uninterrupted_block: BlockNumber,
946    ) -> ProviderResult<HeaderSyncGap<Self::Header>> {
947        let static_file_provider = self.static_file_provider();
948
949        // Make sure Headers static file is at the same height. If it's further, this
950        // input execution was interrupted previously and we need to unwind the static file.
951        let next_static_file_block_num = static_file_provider
952            .get_highest_static_file_block(StaticFileSegment::Headers)
953            .map(|id| id + 1)
954            .unwrap_or_default();
955        let next_block = highest_uninterrupted_block + 1;
956
957        match next_static_file_block_num.cmp(&next_block) {
958            // The node shutdown between an executed static file commit and before the database
959            // commit, so we need to unwind the static files.
960            Ordering::Greater => {
961                let mut static_file_producer =
962                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
963                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
964                // Since this is a database <-> static file inconsistency, we commit the change
965                // straight away.
966                static_file_producer.commit()?
967            }
968            Ordering::Less => {
969                // There's either missing or corrupted files.
970                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
971            }
972            Ordering::Equal => {}
973        }
974
975        let local_head = static_file_provider
976            .sealed_header(highest_uninterrupted_block)?
977            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
978
979        let target = SyncTarget::Tip(*tip.borrow());
980
981        Ok(HeaderSyncGap { local_head, target })
982    }
983}
984
985impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
986    type Header = HeaderTy<N>;
987
988    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
989        if let Some(num) = self.block_number(*block_hash)? {
990            Ok(self.header_by_number(num)?)
991        } else {
992            Ok(None)
993        }
994    }
995
996    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
997        self.static_file_provider.get_with_static_file_or_database(
998            StaticFileSegment::Headers,
999            num,
1000            |static_file| static_file.header_by_number(num),
1001            || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1002        )
1003    }
1004
1005    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1006        if let Some(num) = self.block_number(*block_hash)? {
1007            self.header_td_by_number(num)
1008        } else {
1009            Ok(None)
1010        }
1011    }
1012
1013    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1014        if self.chain_spec.is_paris_active_at_block(number) {
1015            if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1016                // if this block is higher than the final paris(merge) block, return the final paris
1017                // difficulty
1018                return Ok(Some(td))
1019            }
1020        }
1021
1022        self.static_file_provider.get_with_static_file_or_database(
1023            StaticFileSegment::Headers,
1024            number,
1025            |static_file| static_file.header_td_by_number(number),
1026            || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1027        )
1028    }
1029
1030    fn headers_range(
1031        &self,
1032        range: impl RangeBounds<BlockNumber>,
1033    ) -> ProviderResult<Vec<Self::Header>> {
1034        self.static_file_provider.get_range_with_static_file_or_database(
1035            StaticFileSegment::Headers,
1036            to_range(range),
1037            |static_file, range, _| static_file.headers_range(range),
1038            |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1039            |_| true,
1040        )
1041    }
1042
1043    fn sealed_header(
1044        &self,
1045        number: BlockNumber,
1046    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1047        self.static_file_provider.get_with_static_file_or_database(
1048            StaticFileSegment::Headers,
1049            number,
1050            |static_file| static_file.sealed_header(number),
1051            || {
1052                if let Some(header) = self.header_by_number(number)? {
1053                    let hash = self
1054                        .block_hash(number)?
1055                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1056                    Ok(Some(SealedHeader::new(header, hash)))
1057                } else {
1058                    Ok(None)
1059                }
1060            },
1061        )
1062    }
1063
1064    fn sealed_headers_while(
1065        &self,
1066        range: impl RangeBounds<BlockNumber>,
1067        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1068    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1069        self.static_file_provider.get_range_with_static_file_or_database(
1070            StaticFileSegment::Headers,
1071            to_range(range),
1072            |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1073            |range, mut predicate| {
1074                let mut headers = vec![];
1075                for entry in
1076                    self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1077                {
1078                    let (number, header) = entry?;
1079                    let hash = self
1080                        .block_hash(number)?
1081                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1082                    let sealed = SealedHeader::new(header, hash);
1083                    if !predicate(&sealed) {
1084                        break
1085                    }
1086                    headers.push(sealed);
1087                }
1088                Ok(headers)
1089            },
1090            predicate,
1091        )
1092    }
1093}
1094
1095impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1096    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1097        self.static_file_provider.get_with_static_file_or_database(
1098            StaticFileSegment::Headers,
1099            number,
1100            |static_file| static_file.block_hash(number),
1101            || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1102        )
1103    }
1104
1105    fn canonical_hashes_range(
1106        &self,
1107        start: BlockNumber,
1108        end: BlockNumber,
1109    ) -> ProviderResult<Vec<B256>> {
1110        self.static_file_provider.get_range_with_static_file_or_database(
1111            StaticFileSegment::Headers,
1112            start..end,
1113            |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1114            |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1115            |_| true,
1116        )
1117    }
1118}
1119
1120impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1121    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1122        let best_number = self.best_block_number()?;
1123        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1124        Ok(ChainInfo { best_hash, best_number })
1125    }
1126
1127    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1128        // The best block number is tracked via the finished stage which gets updated in the same tx
1129        // when new blocks committed
1130        Ok(self
1131            .get_stage_checkpoint(StageId::Finish)?
1132            .map(|checkpoint| checkpoint.block_number)
1133            .unwrap_or_default())
1134    }
1135
1136    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1137        Ok(self
1138            .tx
1139            .cursor_read::<tables::CanonicalHeaders>()?
1140            .last()?
1141            .map(|(num, _)| num)
1142            .max(
1143                self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1144            )
1145            .unwrap_or_default())
1146    }
1147
1148    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1149        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1150    }
1151}
1152
1153impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1154    type Block = BlockTy<N>;
1155
1156    fn find_block_by_hash(
1157        &self,
1158        hash: B256,
1159        source: BlockSource,
1160    ) -> ProviderResult<Option<Self::Block>> {
1161        if source.is_canonical() {
1162            self.block(hash.into())
1163        } else {
1164            Ok(None)
1165        }
1166    }
1167
1168    /// Returns the block with matching number from database.
1169    ///
1170    /// If the header for this block is not found, this returns `None`.
1171    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1172    /// will return None.
1173    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1174        if let Some(number) = self.convert_hash_or_number(id)? {
1175            if let Some(header) = self.header_by_number(number)? {
1176                // If the body indices are not found, this means that the transactions either do not
1177                // exist in the database yet, or they do exit but are not indexed.
1178                // If they exist but are not indexed, we don't have enough
1179                // information to return the block anyways, so we return `None`.
1180                let Some(transactions) = self.transactions_by_block(number.into())? else {
1181                    return Ok(None)
1182                };
1183
1184                let body = self
1185                    .storage
1186                    .reader()
1187                    .read_block_bodies(self, vec![(&header, transactions)])?
1188                    .pop()
1189                    .ok_or(ProviderError::InvalidStorageOutput)?;
1190
1191                return Ok(Some(Self::Block::new(header, body)))
1192            }
1193        }
1194
1195        Ok(None)
1196    }
1197
1198    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1199        Ok(None)
1200    }
1201
1202    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1203        Ok(None)
1204    }
1205
1206    fn pending_block_and_receipts(
1207        &self,
1208    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1209        Ok(None)
1210    }
1211
1212    /// Returns the block with senders with matching number or hash from database.
1213    ///
1214    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1215    /// spot, and we want fast querying.**
1216    ///
1217    /// If the header for this block is not found, this returns `None`.
1218    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1219    /// will return None.
1220    fn recovered_block(
1221        &self,
1222        id: BlockHashOrNumber,
1223        transaction_kind: TransactionVariant,
1224    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1225        self.recovered_block(
1226            id,
1227            transaction_kind,
1228            |block_number| self.header_by_number(block_number),
1229            |header, body, senders| {
1230                Self::Block::new(header, body)
1231                    // Note: we're using unchecked here because we know the block contains valid txs
1232                    // wrt to its height and can ignore the s value check so pre
1233                    // EIP-2 txs are allowed
1234                    .try_into_recovered_unchecked(senders)
1235                    .map(Some)
1236                    .map_err(|_| ProviderError::SenderRecoveryError)
1237            },
1238        )
1239    }
1240
1241    fn sealed_block_with_senders(
1242        &self,
1243        id: BlockHashOrNumber,
1244        transaction_kind: TransactionVariant,
1245    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1246        self.recovered_block(
1247            id,
1248            transaction_kind,
1249            |block_number| self.sealed_header(block_number),
1250            |header, body, senders| {
1251                Self::Block::new_sealed(header, body)
1252                    // Note: we're using unchecked here because we know the block contains valid txs
1253                    // wrt to its height and can ignore the s value check so pre
1254                    // EIP-2 txs are allowed
1255                    .try_with_senders_unchecked(senders)
1256                    .map(Some)
1257                    .map_err(|_| ProviderError::SenderRecoveryError)
1258            },
1259        )
1260    }
1261
1262    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1263        self.block_range(
1264            range,
1265            |range| self.headers_range(range),
1266            |header, body, _| Ok(Self::Block::new(header, body)),
1267        )
1268    }
1269
1270    fn block_with_senders_range(
1271        &self,
1272        range: RangeInclusive<BlockNumber>,
1273    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1274        self.block_with_senders_range(
1275            range,
1276            |range| self.headers_range(range),
1277            |header, body, senders| {
1278                Self::Block::new(header, body)
1279                    .try_into_recovered_unchecked(senders)
1280                    .map_err(|_| ProviderError::SenderRecoveryError)
1281            },
1282        )
1283    }
1284
1285    fn recovered_block_range(
1286        &self,
1287        range: RangeInclusive<BlockNumber>,
1288    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1289        self.block_with_senders_range(
1290            range,
1291            |range| self.sealed_headers_range(range),
1292            |header, body, senders| {
1293                Self::Block::new_sealed(header, body)
1294                    .try_with_senders(senders)
1295                    .map_err(|_| ProviderError::SenderRecoveryError)
1296            },
1297        )
1298    }
1299}
1300
1301impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1302    for DatabaseProvider<TX, N>
1303{
1304    /// Recovers transaction hashes by walking through `Transactions` table and
1305    /// calculating them in a parallel manner. Returned unsorted.
1306    fn transaction_hashes_by_range(
1307        &self,
1308        tx_range: Range<TxNumber>,
1309    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1310        self.static_file_provider.get_range_with_static_file_or_database(
1311            StaticFileSegment::Transactions,
1312            tx_range,
1313            |static_file, range, _| static_file.transaction_hashes_by_range(range),
1314            |tx_range, _| {
1315                let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1316                let tx_range_size = tx_range.clone().count();
1317                let tx_walker = tx_cursor.walk_range(tx_range)?;
1318
1319                let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1320                let mut channels = Vec::with_capacity(chunk_size);
1321                let mut transaction_count = 0;
1322
1323                #[inline]
1324                fn calculate_hash<T>(
1325                    entry: Result<(TxNumber, T), DatabaseError>,
1326                    rlp_buf: &mut Vec<u8>,
1327                ) -> Result<(B256, TxNumber), Box<ProviderError>>
1328                where
1329                    T: Encodable2718,
1330                {
1331                    let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1332                    tx.encode_2718(rlp_buf);
1333                    Ok((keccak256(rlp_buf), tx_id))
1334                }
1335
1336                for chunk in &tx_walker.chunks(chunk_size) {
1337                    let (tx, rx) = mpsc::channel();
1338                    channels.push(rx);
1339
1340                    // Note: Unfortunate side-effect of how chunk is designed in itertools (it is
1341                    // not Send)
1342                    let chunk: Vec<_> = chunk.collect();
1343                    transaction_count += chunk.len();
1344
1345                    // Spawn the task onto the global rayon pool
1346                    // This task will send the results through the channel after it has calculated
1347                    // the hash.
1348                    rayon::spawn(move || {
1349                        let mut rlp_buf = Vec::with_capacity(128);
1350                        for entry in chunk {
1351                            rlp_buf.clear();
1352                            let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1353                        }
1354                    });
1355                }
1356                let mut tx_list = Vec::with_capacity(transaction_count);
1357
1358                // Iterate over channels and append the tx hashes unsorted
1359                for channel in channels {
1360                    while let Ok(tx) = channel.recv() {
1361                        let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1362                        tx_list.push((tx_hash, tx_id));
1363                    }
1364                }
1365
1366                Ok(tx_list)
1367            },
1368            |_| true,
1369        )
1370    }
1371}
1372
1373// Calculates the hash of the given transaction
1374impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1375    type Transaction = TxTy<N>;
1376
1377    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1378        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1379    }
1380
1381    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1382        self.static_file_provider.get_with_static_file_or_database(
1383            StaticFileSegment::Transactions,
1384            id,
1385            |static_file| static_file.transaction_by_id(id),
1386            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1387        )
1388    }
1389
1390    fn transaction_by_id_unhashed(
1391        &self,
1392        id: TxNumber,
1393    ) -> ProviderResult<Option<Self::Transaction>> {
1394        self.static_file_provider.get_with_static_file_or_database(
1395            StaticFileSegment::Transactions,
1396            id,
1397            |static_file| static_file.transaction_by_id_unhashed(id),
1398            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1399        )
1400    }
1401
1402    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1403        if let Some(id) = self.transaction_id(hash)? {
1404            Ok(self.transaction_by_id_unhashed(id)?)
1405        } else {
1406            Ok(None)
1407        }
1408    }
1409
1410    fn transaction_by_hash_with_meta(
1411        &self,
1412        tx_hash: TxHash,
1413    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1414        let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1415        if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1416            if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1417                if let Some(block_number) =
1418                    transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1419                {
1420                    if let Some(sealed_header) = self.sealed_header(block_number)? {
1421                        let (header, block_hash) = sealed_header.split();
1422                        if let Some(block_body) = self.block_body_indices(block_number)? {
1423                            // the index of the tx in the block is the offset:
1424                            // len([start..tx_id])
1425                            // NOTE: `transaction_id` is always `>=` the block's first
1426                            // index
1427                            let index = transaction_id - block_body.first_tx_num();
1428
1429                            let meta = TransactionMeta {
1430                                tx_hash,
1431                                index,
1432                                block_hash,
1433                                block_number,
1434                                base_fee: header.base_fee_per_gas(),
1435                                excess_blob_gas: header.excess_blob_gas(),
1436                                timestamp: header.timestamp(),
1437                            };
1438
1439                            return Ok(Some((transaction, meta)))
1440                        }
1441                    }
1442                }
1443            }
1444        }
1445
1446        Ok(None)
1447    }
1448
1449    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1450        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1451        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1452    }
1453
1454    fn transactions_by_block(
1455        &self,
1456        id: BlockHashOrNumber,
1457    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1458        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1459
1460        if let Some(block_number) = self.convert_hash_or_number(id)? {
1461            if let Some(body) = self.block_body_indices(block_number)? {
1462                let tx_range = body.tx_num_range();
1463                return if tx_range.is_empty() {
1464                    Ok(Some(Vec::new()))
1465                } else {
1466                    Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1467                }
1468            }
1469        }
1470        Ok(None)
1471    }
1472
1473    fn transactions_by_block_range(
1474        &self,
1475        range: impl RangeBounds<BlockNumber>,
1476    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1477        let range = to_range(range);
1478        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1479
1480        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1481            .into_iter()
1482            .map(|body| {
1483                let tx_num_range = body.tx_num_range();
1484                if tx_num_range.is_empty() {
1485                    Ok(Vec::new())
1486                } else {
1487                    Ok(self
1488                        .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1489                        .into_iter()
1490                        .collect())
1491                }
1492            })
1493            .collect()
1494    }
1495
1496    fn transactions_by_tx_range(
1497        &self,
1498        range: impl RangeBounds<TxNumber>,
1499    ) -> ProviderResult<Vec<Self::Transaction>> {
1500        self.transactions_by_tx_range_with_cursor(
1501            range,
1502            &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1503        )
1504    }
1505
1506    fn senders_by_tx_range(
1507        &self,
1508        range: impl RangeBounds<TxNumber>,
1509    ) -> ProviderResult<Vec<Address>> {
1510        self.cursor_read_collect::<tables::TransactionSenders>(range)
1511    }
1512
1513    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1514        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1515    }
1516}
1517
1518impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1519    type Receipt = ReceiptTy<N>;
1520
1521    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1522        self.static_file_provider.get_with_static_file_or_database(
1523            StaticFileSegment::Receipts,
1524            id,
1525            |static_file| static_file.receipt(id),
1526            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1527        )
1528    }
1529
1530    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1531        if let Some(id) = self.transaction_id(hash)? {
1532            self.receipt(id)
1533        } else {
1534            Ok(None)
1535        }
1536    }
1537
1538    fn receipts_by_block(
1539        &self,
1540        block: BlockHashOrNumber,
1541    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1542        if let Some(number) = self.convert_hash_or_number(block)? {
1543            if let Some(body) = self.block_body_indices(number)? {
1544                let tx_range = body.tx_num_range();
1545                return if tx_range.is_empty() {
1546                    Ok(Some(Vec::new()))
1547                } else {
1548                    self.receipts_by_tx_range(tx_range).map(Some)
1549                }
1550            }
1551        }
1552        Ok(None)
1553    }
1554
1555    fn receipts_by_tx_range(
1556        &self,
1557        range: impl RangeBounds<TxNumber>,
1558    ) -> ProviderResult<Vec<Self::Receipt>> {
1559        self.static_file_provider.get_range_with_static_file_or_database(
1560            StaticFileSegment::Receipts,
1561            to_range(range),
1562            |static_file, range, _| static_file.receipts_by_tx_range(range),
1563            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1564            |_| true,
1565        )
1566    }
1567}
1568
1569impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1570    for DatabaseProvider<TX, N>
1571{
1572    fn withdrawals_by_block(
1573        &self,
1574        id: BlockHashOrNumber,
1575        timestamp: u64,
1576    ) -> ProviderResult<Option<Withdrawals>> {
1577        if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1578            if let Some(number) = self.convert_hash_or_number(id)? {
1579                return self.static_file_provider.get_with_static_file_or_database(
1580                    StaticFileSegment::BlockMeta,
1581                    number,
1582                    |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1583                    || {
1584                        // If we are past shanghai, then all blocks should have a withdrawal list,
1585                        // even if empty
1586                        let withdrawals = self
1587                            .tx
1588                            .get::<tables::BlockWithdrawals>(number)
1589                            .map(|w| w.map(|w| w.withdrawals))?
1590                            .unwrap_or_default();
1591                        Ok(Some(withdrawals))
1592                    },
1593                )
1594            }
1595        }
1596        Ok(None)
1597    }
1598}
1599
1600impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1601    /// Returns the ommers for the block with matching id from the database.
1602    ///
1603    /// If the block is not found, this returns `None`.
1604    /// If the block exists, but doesn't contain ommers, this returns `None`.
1605    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1606        if let Some(number) = self.convert_hash_or_number(id)? {
1607            // If the Paris (Merge) hardfork block is known and block is after it, return empty
1608            // ommers.
1609            if self.chain_spec.is_paris_active_at_block(number) {
1610                return Ok(Some(Vec::new()))
1611            }
1612
1613            return self.static_file_provider.get_with_static_file_or_database(
1614                StaticFileSegment::BlockMeta,
1615                number,
1616                |static_file| static_file.ommers(id),
1617                || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1618            )
1619        }
1620
1621        Ok(None)
1622    }
1623}
1624
1625impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1626    for DatabaseProvider<TX, N>
1627{
1628    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1629        self.static_file_provider.get_with_static_file_or_database(
1630            StaticFileSegment::BlockMeta,
1631            num,
1632            |static_file| static_file.block_body_indices(num),
1633            || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1634        )
1635    }
1636
1637    fn block_body_indices_range(
1638        &self,
1639        range: RangeInclusive<BlockNumber>,
1640    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1641        self.static_file_provider.get_range_with_static_file_or_database(
1642            StaticFileSegment::BlockMeta,
1643            *range.start()..*range.end() + 1,
1644            |static_file, range, _| {
1645                static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1646            },
1647            |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1648            |_| true,
1649        )
1650    }
1651}
1652
1653impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1654    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1655        Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1656    }
1657
1658    /// Get stage checkpoint progress.
1659    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1660        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1661    }
1662
1663    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1664        self.tx
1665            .cursor_read::<tables::StageCheckpoints>()?
1666            .walk(None)?
1667            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1668            .map_err(ProviderError::Database)
1669    }
1670}
1671
1672impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1673    /// Save stage checkpoint.
1674    fn save_stage_checkpoint(
1675        &self,
1676        id: StageId,
1677        checkpoint: StageCheckpoint,
1678    ) -> ProviderResult<()> {
1679        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1680    }
1681
1682    /// Save stage checkpoint progress.
1683    fn save_stage_checkpoint_progress(
1684        &self,
1685        id: StageId,
1686        checkpoint: Vec<u8>,
1687    ) -> ProviderResult<()> {
1688        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1689    }
1690
1691    fn update_pipeline_stages(
1692        &self,
1693        block_number: BlockNumber,
1694        drop_stage_checkpoint: bool,
1695    ) -> ProviderResult<()> {
1696        // iterate over all existing stages in the table and update its progress.
1697        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1698        for stage_id in StageId::ALL {
1699            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1700            cursor.upsert(
1701                stage_id.to_string(),
1702                &StageCheckpoint {
1703                    block_number,
1704                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1705                },
1706            )?;
1707        }
1708
1709        Ok(())
1710    }
1711}
1712
1713impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1714    fn plain_state_storages(
1715        &self,
1716        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1717    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1718        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1719
1720        addresses_with_keys
1721            .into_iter()
1722            .map(|(address, storage)| {
1723                storage
1724                    .into_iter()
1725                    .map(|key| -> ProviderResult<_> {
1726                        Ok(plain_storage
1727                            .seek_by_key_subkey(address, key)?
1728                            .filter(|v| v.key == key)
1729                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1730                    })
1731                    .collect::<ProviderResult<Vec<_>>>()
1732                    .map(|storage| (address, storage))
1733            })
1734            .collect::<ProviderResult<Vec<(_, _)>>>()
1735    }
1736
1737    fn changed_storages_with_range(
1738        &self,
1739        range: RangeInclusive<BlockNumber>,
1740    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1741        self.tx
1742            .cursor_read::<tables::StorageChangeSets>()?
1743            .walk_range(BlockNumberAddress::range(range))?
1744            // fold all storages and save its old state so we can remove it from HashedStorage
1745            // it is needed as it is dup table.
1746            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1747                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1748                accounts.entry(address).or_default().insert(storage_entry.key);
1749                Ok(accounts)
1750            })
1751    }
1752
1753    fn changed_storages_and_blocks_with_range(
1754        &self,
1755        range: RangeInclusive<BlockNumber>,
1756    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1757        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1758
1759        let storage_changeset_lists =
1760            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1761                BTreeMap::new(),
1762                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1763                    let (index, storage) = entry?;
1764                    storages
1765                        .entry((index.address(), storage.key))
1766                        .or_default()
1767                        .push(index.block_number());
1768                    Ok(storages)
1769                },
1770            )?;
1771
1772        Ok(storage_changeset_lists)
1773    }
1774}
1775
1776impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1777    for DatabaseProvider<TX, N>
1778{
1779    type Receipt = ReceiptTy<N>;
1780
1781    fn write_state(
1782        &self,
1783        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1784        is_value_known: OriginalValuesKnown,
1785        write_receipts_to: StorageLocation,
1786    ) -> ProviderResult<()> {
1787        let first_block = execution_outcome.first_block();
1788        let block_count = execution_outcome.len() as u64;
1789        let last_block = execution_outcome.last_block();
1790        let block_range = first_block..=last_block;
1791
1792        let tip = self.last_block_number()?.max(last_block);
1793
1794        let (plain_state, reverts) =
1795            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1796
1797        self.write_state_reverts(reverts, first_block)?;
1798        self.write_state_changes(plain_state)?;
1799
1800        // Fetch the first transaction number for each block in the range
1801        let block_indices: Vec<_> = self
1802            .block_body_indices_range(block_range)?
1803            .into_iter()
1804            .map(|b| b.first_tx_num)
1805            .collect();
1806
1807        // Ensure all expected blocks are present.
1808        if block_indices.len() < block_count as usize {
1809            let missing_blocks = block_count - block_indices.len() as u64;
1810            return Err(ProviderError::BlockBodyIndicesNotFound(
1811                last_block.saturating_sub(missing_blocks - 1),
1812            ));
1813        }
1814
1815        let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1816
1817        // Prepare receipts cursor if we are going to write receipts to the database
1818        //
1819        // We are writing to database if requested or if there's any kind of receipt pruning
1820        // configured
1821        let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1822            .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1823            .transpose()?;
1824
1825        // Prepare receipts static writer if we are going to write receipts to static files
1826        //
1827        // We are writing to static files if requested and if there's no receipt pruning configured
1828        let mut receipts_static_writer = (write_receipts_to.static_files() &&
1829            !has_receipts_pruning)
1830            .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1831            .transpose()?;
1832
1833        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1834        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1835
1836        // All receipts from the last 128 blocks are required for blockchain tree, even with
1837        // [`PruneSegment::ContractLogs`].
1838        let prunable_receipts =
1839            PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1840
1841        // Prepare set of addresses which logs should not be pruned.
1842        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1843        for (_, addresses) in contract_log_pruner.range(..first_block) {
1844            allowed_addresses.extend(addresses.iter().copied());
1845        }
1846
1847        for (idx, (receipts, first_tx_index)) in
1848            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1849        {
1850            let block_number = first_block + idx as u64;
1851
1852            // Increment block number for receipts static file writer
1853            if let Some(writer) = receipts_static_writer.as_mut() {
1854                writer.increment_block(block_number)?;
1855            }
1856
1857            // Skip writing receipts if pruning configuration requires us to.
1858            if prunable_receipts &&
1859                self.prune_modes
1860                    .receipts
1861                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1862            {
1863                continue
1864            }
1865
1866            // If there are new addresses to retain after this block number, track them
1867            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1868                allowed_addresses.extend(new_addresses.iter().copied());
1869            }
1870
1871            for (idx, receipt) in receipts.iter().enumerate() {
1872                let receipt_idx = first_tx_index + idx as u64;
1873                // Skip writing receipt if log filter is active and it does not have any logs to
1874                // retain
1875                if prunable_receipts &&
1876                    has_contract_log_filter &&
1877                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1878                {
1879                    continue
1880                }
1881
1882                if let Some(writer) = &mut receipts_static_writer {
1883                    writer.append_receipt(receipt_idx, receipt)?;
1884                }
1885
1886                if let Some(cursor) = &mut receipts_cursor {
1887                    cursor.append(receipt_idx, receipt)?;
1888                }
1889            }
1890        }
1891
1892        Ok(())
1893    }
1894
1895    fn write_state_reverts(
1896        &self,
1897        reverts: PlainStateReverts,
1898        first_block: BlockNumber,
1899    ) -> ProviderResult<()> {
1900        // Write storage changes
1901        tracing::trace!("Writing storage changes");
1902        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1903        let mut storage_changeset_cursor =
1904            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1905        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1906            let block_number = first_block + block_index as BlockNumber;
1907
1908            tracing::trace!(block_number, "Writing block change");
1909            // sort changes by address.
1910            storage_changes.par_sort_unstable_by_key(|a| a.address);
1911            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1912                let storage_id = BlockNumberAddress((block_number, address));
1913
1914                let mut storage = storage_revert
1915                    .into_iter()
1916                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1917                    .collect::<Vec<_>>();
1918                // sort storage slots by key.
1919                storage.par_sort_unstable_by_key(|a| a.0);
1920
1921                // If we are writing the primary storage wipe transition, the pre-existing plain
1922                // storage state has to be taken from the database and written to storage history.
1923                // See [StorageWipe::Primary] for more details.
1924                let mut wiped_storage = Vec::new();
1925                if wiped {
1926                    tracing::trace!(?address, "Wiping storage");
1927                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1928                        wiped_storage.push((entry.key, entry.value));
1929                        while let Some(entry) = storages_cursor.next_dup_val()? {
1930                            wiped_storage.push((entry.key, entry.value))
1931                        }
1932                    }
1933                }
1934
1935                tracing::trace!(?address, ?storage, "Writing storage reverts");
1936                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1937                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1938                }
1939            }
1940        }
1941
1942        // Write account changes
1943        tracing::trace!("Writing account changes");
1944        let mut account_changeset_cursor =
1945            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1946
1947        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1948            let block_number = first_block + block_index as BlockNumber;
1949            // Sort accounts by address.
1950            account_block_reverts.par_sort_by_key(|a| a.0);
1951
1952            for (address, info) in account_block_reverts {
1953                account_changeset_cursor.append_dup(
1954                    block_number,
1955                    AccountBeforeTx { address, info: info.map(Into::into) },
1956                )?;
1957            }
1958        }
1959
1960        Ok(())
1961    }
1962
1963    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1964        // sort all entries so they can be written to database in more performant way.
1965        // and take smaller memory footprint.
1966        changes.accounts.par_sort_by_key(|a| a.0);
1967        changes.storage.par_sort_by_key(|a| a.address);
1968        changes.contracts.par_sort_by_key(|a| a.0);
1969
1970        // Write new account state
1971        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1972        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1973        // write account to database.
1974        for (address, account) in changes.accounts {
1975            if let Some(account) = account {
1976                tracing::trace!(?address, "Updating plain state account");
1977                accounts_cursor.upsert(address, &account.into())?;
1978            } else if accounts_cursor.seek_exact(address)?.is_some() {
1979                tracing::trace!(?address, "Deleting plain state account");
1980                accounts_cursor.delete_current()?;
1981            }
1982        }
1983
1984        // Write bytecode
1985        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1986        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1987        for (hash, bytecode) in changes.contracts {
1988            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1989        }
1990
1991        // Write new storage state and wipe storage if needed.
1992        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1993        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1994        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1995            // Wiping of storage.
1996            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1997                storages_cursor.delete_current_duplicates()?;
1998            }
1999            // cast storages to B256.
2000            let mut storage = storage
2001                .into_iter()
2002                .map(|(k, value)| StorageEntry { key: k.into(), value })
2003                .collect::<Vec<_>>();
2004            // sort storage slots by key.
2005            storage.par_sort_unstable_by_key(|a| a.key);
2006
2007            for entry in storage {
2008                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2009                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2010                    if db_entry.key == entry.key {
2011                        storages_cursor.delete_current()?;
2012                    }
2013                }
2014
2015                if !entry.value.is_zero() {
2016                    storages_cursor.upsert(address, &entry)?;
2017                }
2018            }
2019        }
2020
2021        Ok(())
2022    }
2023
2024    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2025        // Write hashed account updates.
2026        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2027        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2028            if let Some(account) = account {
2029                hashed_accounts_cursor.upsert(hashed_address, &account)?;
2030            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2031                hashed_accounts_cursor.delete_current()?;
2032            }
2033        }
2034
2035        // Write hashed storage changes.
2036        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2037        let mut hashed_storage_cursor =
2038            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2039        for (hashed_address, storage) in sorted_storages {
2040            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2041                hashed_storage_cursor.delete_current_duplicates()?;
2042            }
2043
2044            for (hashed_slot, value) in storage.storage_slots_sorted() {
2045                let entry = StorageEntry { key: hashed_slot, value };
2046                if let Some(db_entry) =
2047                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2048                {
2049                    if db_entry.key == entry.key {
2050                        hashed_storage_cursor.delete_current()?;
2051                    }
2052                }
2053
2054                if !entry.value.is_zero() {
2055                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2056                }
2057            }
2058        }
2059
2060        Ok(())
2061    }
2062
2063    /// Remove the last N blocks of state.
2064    ///
2065    /// The latest state will be unwound
2066    ///
2067    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2068    ///    transaction ids.
2069    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2070    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2071    ///    the changesets.
2072    ///    - In order to have both the old and new values in the changesets, we also access the
2073    ///      plain state tables.
2074    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2075    ///    we:
2076    ///     1. Take the old value from the changeset
2077    ///     2. Take the new value from the plain state
2078    ///     3. Save the old value to the local state
2079    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2080    ///    have seen before we:
2081    ///     1. Take the old value from the changeset
2082    ///     2. Take the new value from the local state
2083    ///     3. Set the local state to the value in the changeset
2084    fn remove_state_above(
2085        &self,
2086        block: BlockNumber,
2087        remove_receipts_from: StorageLocation,
2088    ) -> ProviderResult<()> {
2089        let range = block + 1..=self.last_block_number()?;
2090
2091        if range.is_empty() {
2092            return Ok(());
2093        }
2094
2095        // We are not removing block meta as it is used to get block changesets.
2096        let block_bodies = self.block_body_indices_range(range.clone())?;
2097
2098        // get transaction receipts
2099        let from_transaction_num =
2100            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2101
2102        let storage_range = BlockNumberAddress::range(range.clone());
2103
2104        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2105        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2106
2107        // This is not working for blocks that are not at tip. as plain state is not the last
2108        // state of end range. We should rename the functions or add support to access
2109        // History state. Accessing history state can be tricky but we are not gaining
2110        // anything.
2111        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2112        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2113
2114        let (state, _) = self.populate_bundle_state(
2115            account_changeset,
2116            storage_changeset,
2117            &mut plain_accounts_cursor,
2118            &mut plain_storage_cursor,
2119        )?;
2120
2121        // iterate over local plain state remove all account and all storages.
2122        for (address, (old_account, new_account, storage)) in &state {
2123            // revert account if needed.
2124            if old_account != new_account {
2125                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2126                if let Some(account) = old_account {
2127                    plain_accounts_cursor.upsert(*address, account)?;
2128                } else if existing_entry.is_some() {
2129                    plain_accounts_cursor.delete_current()?;
2130                }
2131            }
2132
2133            // revert storages
2134            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2135                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2136                // delete previous value
2137                // TODO: This does not use dupsort features
2138                if plain_storage_cursor
2139                    .seek_by_key_subkey(*address, *storage_key)?
2140                    .filter(|s| s.key == *storage_key)
2141                    .is_some()
2142                {
2143                    plain_storage_cursor.delete_current()?
2144                }
2145
2146                // insert value if needed
2147                if !old_storage_value.is_zero() {
2148                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2149                }
2150            }
2151        }
2152
2153        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2154
2155        Ok(())
2156    }
2157
2158    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2159    ///
2160    /// The latest state will be unwound and returned back with all the blocks
2161    ///
2162    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2163    ///    transaction ids.
2164    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2165    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2166    ///    the changesets.
2167    ///    - In order to have both the old and new values in the changesets, we also access the
2168    ///      plain state tables.
2169    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2170    ///    we:
2171    ///     1. Take the old value from the changeset
2172    ///     2. Take the new value from the plain state
2173    ///     3. Save the old value to the local state
2174    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2175    ///    have seen before we:
2176    ///     1. Take the old value from the changeset
2177    ///     2. Take the new value from the local state
2178    ///     3. Set the local state to the value in the changeset
2179    fn take_state_above(
2180        &self,
2181        block: BlockNumber,
2182        remove_receipts_from: StorageLocation,
2183    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2184        let range = block + 1..=self.last_block_number()?;
2185
2186        if range.is_empty() {
2187            return Ok(ExecutionOutcome::default())
2188        }
2189        let start_block_number = *range.start();
2190
2191        // We are not removing block meta as it is used to get block changesets.
2192        let block_bodies = self.block_body_indices_range(range.clone())?;
2193
2194        // get transaction receipts
2195        let from_transaction_num =
2196            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2197        let to_transaction_num =
2198            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2199
2200        let storage_range = BlockNumberAddress::range(range.clone());
2201
2202        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2203        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2204
2205        // This is not working for blocks that are not at tip. as plain state is not the last
2206        // state of end range. We should rename the functions or add support to access
2207        // History state. Accessing history state can be tricky but we are not gaining
2208        // anything.
2209        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2210        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2211
2212        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2213        // remove, and return later
2214        let (state, reverts) = self.populate_bundle_state(
2215            account_changeset,
2216            storage_changeset,
2217            &mut plain_accounts_cursor,
2218            &mut plain_storage_cursor,
2219        )?;
2220
2221        // iterate over local plain state remove all account and all storages.
2222        for (address, (old_account, new_account, storage)) in &state {
2223            // revert account if needed.
2224            if old_account != new_account {
2225                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2226                if let Some(account) = old_account {
2227                    plain_accounts_cursor.upsert(*address, account)?;
2228                } else if existing_entry.is_some() {
2229                    plain_accounts_cursor.delete_current()?;
2230                }
2231            }
2232
2233            // revert storages
2234            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2235                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2236                // delete previous value
2237                // TODO: This does not use dupsort features
2238                if plain_storage_cursor
2239                    .seek_by_key_subkey(*address, *storage_key)?
2240                    .filter(|s| s.key == *storage_key)
2241                    .is_some()
2242                {
2243                    plain_storage_cursor.delete_current()?
2244                }
2245
2246                // insert value if needed
2247                if !old_storage_value.is_zero() {
2248                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2249                }
2250            }
2251        }
2252
2253        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2254        let mut receipts_iter = self
2255            .static_file_provider
2256            .get_range_with_static_file_or_database(
2257                StaticFileSegment::Receipts,
2258                from_transaction_num..to_transaction_num + 1,
2259                |static_file, range, _| {
2260                    static_file
2261                        .receipts_by_tx_range(range.clone())
2262                        .map(|r| range.into_iter().zip(r).collect())
2263                },
2264                |range, _| {
2265                    self.tx
2266                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2267                        .walk_range(range)?
2268                        .map(|r| r.map_err(Into::into))
2269                        .collect()
2270                },
2271                |_| true,
2272            )?
2273            .into_iter()
2274            .peekable();
2275
2276        let mut receipts = Vec::with_capacity(block_bodies.len());
2277        // loop break if we are at the end of the blocks.
2278        for block_body in block_bodies {
2279            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2280            for num in block_body.tx_num_range() {
2281                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2282                    block_receipts.push(receipts_iter.next().unwrap().1);
2283                }
2284            }
2285            receipts.push(block_receipts);
2286        }
2287
2288        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2289
2290        Ok(ExecutionOutcome::new_init(
2291            state,
2292            reverts,
2293            Vec::new(),
2294            receipts,
2295            start_block_number,
2296            Vec::new(),
2297        ))
2298    }
2299}
2300
2301impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2302    /// Writes trie updates. Returns the number of entries modified.
2303    fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2304        if trie_updates.is_empty() {
2305            return Ok(0)
2306        }
2307
2308        // Track the number of inserted entries.
2309        let mut num_entries = 0;
2310
2311        // Merge updated and removed nodes. Updated nodes must take precedence.
2312        let mut account_updates = trie_updates
2313            .removed_nodes_ref()
2314            .iter()
2315            .filter_map(|n| {
2316                (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2317            })
2318            .collect::<Vec<_>>();
2319        account_updates.extend(
2320            trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2321        );
2322        // Sort trie node updates.
2323        account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2324
2325        let tx = self.tx_ref();
2326        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2327        for (key, updated_node) in account_updates {
2328            let nibbles = StoredNibbles(key.clone());
2329            match updated_node {
2330                Some(node) => {
2331                    if !nibbles.0.is_empty() {
2332                        num_entries += 1;
2333                        account_trie_cursor.upsert(nibbles, node)?;
2334                    }
2335                }
2336                None => {
2337                    num_entries += 1;
2338                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2339                        account_trie_cursor.delete_current()?;
2340                    }
2341                }
2342            }
2343        }
2344
2345        num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2346
2347        Ok(num_entries)
2348    }
2349}
2350
2351impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2352    /// Writes storage trie updates from the given storage trie map. First sorts the storage trie
2353    /// updates by the hashed address, writing in sorted order.
2354    fn write_storage_trie_updates(
2355        &self,
2356        storage_tries: &B256Map<StorageTrieUpdates>,
2357    ) -> ProviderResult<usize> {
2358        let mut num_entries = 0;
2359        let mut storage_tries = Vec::from_iter(storage_tries);
2360        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2361        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2362        for (hashed_address, storage_trie_updates) in storage_tries {
2363            let mut db_storage_trie_cursor =
2364                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2365            num_entries +=
2366                db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2367            cursor = db_storage_trie_cursor.cursor;
2368        }
2369
2370        Ok(num_entries)
2371    }
2372
2373    fn write_individual_storage_trie_updates(
2374        &self,
2375        hashed_address: B256,
2376        updates: &StorageTrieUpdates,
2377    ) -> ProviderResult<usize> {
2378        if updates.is_empty() {
2379            return Ok(0)
2380        }
2381
2382        let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2383        let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2384        Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2385    }
2386}
2387
2388impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2389    fn unwind_account_hashing<'a>(
2390        &self,
2391        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2392    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2393        // Aggregate all block changesets and make a list of accounts that have been changed.
2394        // Note that collecting and then reversing the order is necessary to ensure that the
2395        // changes are applied in the correct order.
2396        let hashed_accounts = changesets
2397            .into_iter()
2398            .map(|(_, e)| (keccak256(e.address), e.info))
2399            .collect::<Vec<_>>()
2400            .into_iter()
2401            .rev()
2402            .collect::<BTreeMap<_, _>>();
2403
2404        // Apply values to HashedState, and remove the account if it's None.
2405        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2406        for (hashed_address, account) in &hashed_accounts {
2407            if let Some(account) = account {
2408                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2409            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2410                hashed_accounts_cursor.delete_current()?;
2411            }
2412        }
2413
2414        Ok(hashed_accounts)
2415    }
2416
2417    fn unwind_account_hashing_range(
2418        &self,
2419        range: impl RangeBounds<BlockNumber>,
2420    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2421        let changesets = self
2422            .tx
2423            .cursor_read::<tables::AccountChangeSets>()?
2424            .walk_range(range)?
2425            .collect::<Result<Vec<_>, _>>()?;
2426        self.unwind_account_hashing(changesets.iter())
2427    }
2428
2429    fn insert_account_for_hashing(
2430        &self,
2431        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2432    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2433        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2434        let hashed_accounts =
2435            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2436        for (hashed_address, account) in &hashed_accounts {
2437            if let Some(account) = account {
2438                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2439            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2440                hashed_accounts_cursor.delete_current()?;
2441            }
2442        }
2443        Ok(hashed_accounts)
2444    }
2445
2446    fn unwind_storage_hashing(
2447        &self,
2448        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2449    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2450        // Aggregate all block changesets and make list of accounts that have been changed.
2451        let mut hashed_storages = changesets
2452            .into_iter()
2453            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2454                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2455            })
2456            .collect::<Vec<_>>();
2457        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2458
2459        // Apply values to HashedState, and remove the account if it's None.
2460        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2461            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2462        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2463        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2464            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2465
2466            if hashed_storage
2467                .seek_by_key_subkey(hashed_address, key)?
2468                .filter(|entry| entry.key == key)
2469                .is_some()
2470            {
2471                hashed_storage.delete_current()?;
2472            }
2473
2474            if !value.is_zero() {
2475                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2476            }
2477        }
2478        Ok(hashed_storage_keys)
2479    }
2480
2481    fn unwind_storage_hashing_range(
2482        &self,
2483        range: impl RangeBounds<BlockNumberAddress>,
2484    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2485        let changesets = self
2486            .tx
2487            .cursor_read::<tables::StorageChangeSets>()?
2488            .walk_range(range)?
2489            .collect::<Result<Vec<_>, _>>()?;
2490        self.unwind_storage_hashing(changesets.into_iter())
2491    }
2492
2493    fn insert_storage_for_hashing(
2494        &self,
2495        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2496    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2497        // hash values
2498        let hashed_storages =
2499            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2500                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2501                    map.insert(keccak256(entry.key), entry.value);
2502                    map
2503                });
2504                map.insert(keccak256(address), storage);
2505                map
2506            });
2507
2508        let hashed_storage_keys = hashed_storages
2509            .iter()
2510            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2511            .collect();
2512
2513        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2514        // Hash the address and key and apply them to HashedStorage (if Storage is None
2515        // just remove it);
2516        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2517            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2518                if hashed_storage_cursor
2519                    .seek_by_key_subkey(hashed_address, key)?
2520                    .filter(|entry| entry.key == key)
2521                    .is_some()
2522                {
2523                    hashed_storage_cursor.delete_current()?;
2524                }
2525
2526                if !value.is_zero() {
2527                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2528                }
2529                Ok(())
2530            })
2531        })?;
2532
2533        Ok(hashed_storage_keys)
2534    }
2535
2536    fn insert_hashes(
2537        &self,
2538        range: RangeInclusive<BlockNumber>,
2539        end_block_hash: B256,
2540        expected_state_root: B256,
2541    ) -> ProviderResult<()> {
2542        // Initialize prefix sets.
2543        let mut account_prefix_set = PrefixSetMut::default();
2544        let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2545        let mut destroyed_accounts = HashSet::default();
2546
2547        let mut durations_recorder = metrics::DurationsRecorder::default();
2548
2549        // storage hashing stage
2550        {
2551            let lists = self.changed_storages_with_range(range.clone())?;
2552            let storages = self.plain_state_storages(lists)?;
2553            let storage_entries = self.insert_storage_for_hashing(storages)?;
2554            for (hashed_address, hashed_slots) in storage_entries {
2555                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2556                for slot in hashed_slots {
2557                    storage_prefix_sets
2558                        .entry(hashed_address)
2559                        .or_default()
2560                        .insert(Nibbles::unpack(slot));
2561                }
2562            }
2563        }
2564        durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2565
2566        // account hashing stage
2567        {
2568            let lists = self.changed_accounts_with_range(range.clone())?;
2569            let accounts = self.basic_accounts(lists)?;
2570            let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2571            for (hashed_address, account) in hashed_addresses {
2572                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2573                if account.is_none() {
2574                    destroyed_accounts.insert(hashed_address);
2575                }
2576            }
2577        }
2578        durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2579
2580        // merkle tree
2581        {
2582            // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
2583            // are pre-loaded.
2584            let prefix_sets = TriePrefixSets {
2585                account_prefix_set: account_prefix_set.freeze(),
2586                storage_prefix_sets: storage_prefix_sets
2587                    .into_iter()
2588                    .map(|(k, v)| (k, v.freeze()))
2589                    .collect(),
2590                destroyed_accounts,
2591            };
2592            let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2593                .with_prefix_sets(prefix_sets)
2594                .root_with_updates()
2595                .map_err(reth_db_api::DatabaseError::from)?;
2596            if state_root != expected_state_root {
2597                return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2598                    root: GotExpected { got: state_root, expected: expected_state_root },
2599                    block_number: *range.end(),
2600                    block_hash: end_block_hash,
2601                })))
2602            }
2603            self.write_trie_updates(&trie_updates)?;
2604        }
2605        durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2606
2607        debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2608
2609        Ok(())
2610    }
2611}
2612
2613impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2614    fn unwind_account_history_indices<'a>(
2615        &self,
2616        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2617    ) -> ProviderResult<usize> {
2618        let mut last_indices = changesets
2619            .into_iter()
2620            .map(|(index, account)| (account.address, *index))
2621            .collect::<Vec<_>>();
2622        last_indices.sort_by_key(|(a, _)| *a);
2623
2624        // Unwind the account history index.
2625        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2626        for &(address, rem_index) in &last_indices {
2627            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2628                &mut cursor,
2629                ShardedKey::last(address),
2630                rem_index,
2631                |sharded_key| sharded_key.key == address,
2632            )?;
2633
2634            // Check the last returned partial shard.
2635            // If it's not empty, the shard needs to be reinserted.
2636            if !partial_shard.is_empty() {
2637                cursor.insert(
2638                    ShardedKey::last(address),
2639                    &BlockNumberList::new_pre_sorted(partial_shard),
2640                )?;
2641            }
2642        }
2643
2644        let changesets = last_indices.len();
2645        Ok(changesets)
2646    }
2647
2648    fn unwind_account_history_indices_range(
2649        &self,
2650        range: impl RangeBounds<BlockNumber>,
2651    ) -> ProviderResult<usize> {
2652        let changesets = self
2653            .tx
2654            .cursor_read::<tables::AccountChangeSets>()?
2655            .walk_range(range)?
2656            .collect::<Result<Vec<_>, _>>()?;
2657        self.unwind_account_history_indices(changesets.iter())
2658    }
2659
2660    fn insert_account_history_index(
2661        &self,
2662        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2663    ) -> ProviderResult<()> {
2664        self.append_history_index::<_, tables::AccountsHistory>(
2665            account_transitions,
2666            ShardedKey::new,
2667        )
2668    }
2669
2670    fn unwind_storage_history_indices(
2671        &self,
2672        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2673    ) -> ProviderResult<usize> {
2674        let mut storage_changesets = changesets
2675            .into_iter()
2676            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2677            .collect::<Vec<_>>();
2678        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2679
2680        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2681        for &(address, storage_key, rem_index) in &storage_changesets {
2682            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2683                &mut cursor,
2684                StorageShardedKey::last(address, storage_key),
2685                rem_index,
2686                |storage_sharded_key| {
2687                    storage_sharded_key.address == address &&
2688                        storage_sharded_key.sharded_key.key == storage_key
2689                },
2690            )?;
2691
2692            // Check the last returned partial shard.
2693            // If it's not empty, the shard needs to be reinserted.
2694            if !partial_shard.is_empty() {
2695                cursor.insert(
2696                    StorageShardedKey::last(address, storage_key),
2697                    &BlockNumberList::new_pre_sorted(partial_shard),
2698                )?;
2699            }
2700        }
2701
2702        let changesets = storage_changesets.len();
2703        Ok(changesets)
2704    }
2705
2706    fn unwind_storage_history_indices_range(
2707        &self,
2708        range: impl RangeBounds<BlockNumberAddress>,
2709    ) -> ProviderResult<usize> {
2710        let changesets = self
2711            .tx
2712            .cursor_read::<tables::StorageChangeSets>()?
2713            .walk_range(range)?
2714            .collect::<Result<Vec<_>, _>>()?;
2715        self.unwind_storage_history_indices(changesets.into_iter())
2716    }
2717
2718    fn insert_storage_history_index(
2719        &self,
2720        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2721    ) -> ProviderResult<()> {
2722        self.append_history_index::<_, tables::StoragesHistory>(
2723            storage_transitions,
2724            |(address, storage_key), highest_block_number| {
2725                StorageShardedKey::new(address, storage_key, highest_block_number)
2726            },
2727        )
2728    }
2729
2730    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2731        // account history stage
2732        {
2733            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2734            self.insert_account_history_index(indices)?;
2735        }
2736
2737        // storage history stage
2738        {
2739            let indices = self.changed_storages_and_blocks_with_range(range)?;
2740            self.insert_storage_history_index(indices)?;
2741        }
2742
2743        Ok(())
2744    }
2745}
2746
2747impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2748    for DatabaseProvider<TX, N>
2749{
2750    fn take_block_and_execution_above(
2751        &self,
2752        block: BlockNumber,
2753        remove_from: StorageLocation,
2754    ) -> ProviderResult<Chain<Self::Primitives>> {
2755        let range = block + 1..=self.last_block_number()?;
2756
2757        self.unwind_trie_state_range(range.clone())?;
2758
2759        // get execution res
2760        let execution_state = self.take_state_above(block, remove_from)?;
2761
2762        let blocks = self.recovered_block_range(range)?;
2763
2764        // remove block bodies it is needed for both get block range and get block execution results
2765        // that is why it is deleted afterwards.
2766        self.remove_blocks_above(block, remove_from)?;
2767
2768        // Update pipeline progress
2769        self.update_pipeline_stages(block, true)?;
2770
2771        Ok(Chain::new(blocks, execution_state, None))
2772    }
2773
2774    fn remove_block_and_execution_above(
2775        &self,
2776        block: BlockNumber,
2777        remove_from: StorageLocation,
2778    ) -> ProviderResult<()> {
2779        let range = block + 1..=self.last_block_number()?;
2780
2781        self.unwind_trie_state_range(range)?;
2782
2783        // remove execution res
2784        self.remove_state_above(block, remove_from)?;
2785
2786        // remove block bodies it is needed for both get block range and get block execution results
2787        // that is why it is deleted afterwards.
2788        self.remove_blocks_above(block, remove_from)?;
2789
2790        // Update pipeline progress
2791        self.update_pipeline_stages(block, true)?;
2792
2793        Ok(())
2794    }
2795}
2796
2797impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2798    for DatabaseProvider<TX, N>
2799{
2800    type Block = BlockTy<N>;
2801    type Receipt = ReceiptTy<N>;
2802
2803    /// Inserts the block into the database, always modifying the following tables:
2804    /// * [`CanonicalHeaders`](tables::CanonicalHeaders)
2805    /// * [`Headers`](tables::Headers)
2806    /// * [`HeaderNumbers`](tables::HeaderNumbers)
2807    /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
2808    /// * [`BlockBodyIndices`](tables::BlockBodyIndices)
2809    ///
2810    /// If there are transactions in the block, the following tables will be modified:
2811    /// * [`Transactions`](tables::Transactions)
2812    /// * [`TransactionBlocks`](tables::TransactionBlocks)
2813    ///
2814    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2815    /// If withdrawals are not empty, this will modify
2816    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2817    ///
2818    /// If the provider has __not__ configured full sender pruning, this will modify
2819    /// [`TransactionSenders`](tables::TransactionSenders).
2820    ///
2821    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2822    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2823    fn insert_block(
2824        &self,
2825        block: RecoveredBlock<Self::Block>,
2826        write_to: StorageLocation,
2827    ) -> ProviderResult<StoredBlockBodyIndices> {
2828        let block_number = block.number();
2829
2830        let mut durations_recorder = metrics::DurationsRecorder::default();
2831
2832        // total difficulty
2833        let ttd = if block_number == 0 {
2834            block.header().difficulty()
2835        } else {
2836            let parent_block_number = block_number - 1;
2837            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2838            durations_recorder.record_relative(metrics::Action::GetParentTD);
2839            parent_ttd + block.header().difficulty()
2840        };
2841
2842        if write_to.database() {
2843            self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2844            durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2845
2846            // Put header with canonical hashes.
2847            self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2848            durations_recorder.record_relative(metrics::Action::InsertHeaders);
2849
2850            self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2851            durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2852        }
2853
2854        if write_to.static_files() {
2855            let mut writer =
2856                self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2857            writer.append_header(block.header(), ttd, &block.hash())?;
2858        }
2859
2860        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2861        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2862
2863        let mut next_tx_num = self
2864            .tx
2865            .cursor_read::<tables::TransactionBlocks>()?
2866            .last()?
2867            .map(|(n, _)| n + 1)
2868            .unwrap_or_default();
2869        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2870        let first_tx_num = next_tx_num;
2871
2872        let tx_count = block.body().transaction_count() as u64;
2873
2874        // Ensures we have all the senders for the block's transactions.
2875        for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2876            let hash = transaction.tx_hash();
2877
2878            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2879                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2880            }
2881
2882            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2883                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2884            }
2885            next_tx_num += 1;
2886        }
2887
2888        self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2889
2890        debug!(
2891            target: "providers::db",
2892            ?block_number,
2893            actions = ?durations_recorder.actions,
2894            "Inserted block"
2895        );
2896
2897        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2898    }
2899
2900    fn append_block_bodies(
2901        &self,
2902        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2903        write_to: StorageLocation,
2904    ) -> ProviderResult<()> {
2905        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2906
2907        // Initialize writer if we will be writing transactions to staticfiles
2908        let mut tx_static_writer = write_to
2909            .static_files()
2910            .then(|| {
2911                self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2912            })
2913            .transpose()?;
2914
2915        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2916        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2917
2918        // Initialize cursor if we will be writing transactions to database
2919        let mut tx_cursor = write_to
2920            .database()
2921            .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2922            .transpose()?;
2923
2924        // Get id for the next tx_num or zero if there are no transactions.
2925        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2926
2927        for (block_number, body) in &bodies {
2928            // Increment block on static file header.
2929            if let Some(writer) = tx_static_writer.as_mut() {
2930                writer.increment_block(*block_number)?;
2931            }
2932
2933            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2934            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2935
2936            let mut durations_recorder = metrics::DurationsRecorder::default();
2937
2938            // insert block meta
2939            block_indices_cursor.append(*block_number, &block_indices)?;
2940
2941            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2942
2943            let Some(body) = body else { continue };
2944
2945            // write transaction block index
2946            if !body.transactions().is_empty() {
2947                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2948                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2949            }
2950
2951            // write transactions
2952            for transaction in body.transactions() {
2953                if let Some(writer) = tx_static_writer.as_mut() {
2954                    writer.append_transaction(next_tx_num, transaction)?;
2955                }
2956                if let Some(cursor) = tx_cursor.as_mut() {
2957                    cursor.append(next_tx_num, transaction)?;
2958                }
2959
2960                // Increment transaction id for each transaction.
2961                next_tx_num += 1;
2962            }
2963
2964            debug!(
2965                target: "providers::db",
2966                ?block_number,
2967                actions = ?durations_recorder.actions,
2968                "Inserted block body"
2969            );
2970        }
2971
2972        self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2973
2974        Ok(())
2975    }
2976
2977    fn remove_blocks_above(
2978        &self,
2979        block: BlockNumber,
2980        remove_from: StorageLocation,
2981    ) -> ProviderResult<()> {
2982        for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2983            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2984        }
2985
2986        // Only prune canonical headers after we've removed the block hashes as we rely on data from
2987        // this table in `canonical_hashes_range`.
2988        self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2989        self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2990        self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2991
2992        // First transaction to be removed
2993        let unwind_tx_from = self
2994            .block_body_indices(block)?
2995            .map(|b| b.next_tx_num())
2996            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2997
2998        // Last transaction to be removed
2999        let unwind_tx_to = self
3000            .tx
3001            .cursor_read::<tables::BlockBodyIndices>()?
3002            .last()?
3003            // shouldn't happen because this was OK above
3004            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3005            .1
3006            .last_tx_num();
3007
3008        if unwind_tx_from <= unwind_tx_to {
3009            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3010                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3011            }
3012        }
3013
3014        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3015
3016        self.remove_bodies_above(block, remove_from)?;
3017
3018        Ok(())
3019    }
3020
3021    fn remove_bodies_above(
3022        &self,
3023        block: BlockNumber,
3024        remove_from: StorageLocation,
3025    ) -> ProviderResult<()> {
3026        self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3027
3028        // First transaction to be removed
3029        let unwind_tx_from = self
3030            .block_body_indices(block)?
3031            .map(|b| b.next_tx_num())
3032            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3033
3034        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3035        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3036
3037        if remove_from.database() {
3038            self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3039        }
3040
3041        if remove_from.static_files() {
3042            let static_file_tx_num = self
3043                .static_file_provider
3044                .get_highest_static_file_tx(StaticFileSegment::Transactions);
3045
3046            let to_delete = static_file_tx_num
3047                .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3048                .unwrap_or_default();
3049
3050            self.static_file_provider
3051                .latest_writer(StaticFileSegment::Transactions)?
3052                .prune_transactions(to_delete, block)?;
3053        }
3054
3055        Ok(())
3056    }
3057
3058    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3059    fn append_blocks_with_state(
3060        &self,
3061        blocks: Vec<RecoveredBlock<Self::Block>>,
3062        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3063        hashed_state: HashedPostStateSorted,
3064        trie_updates: TrieUpdates,
3065    ) -> ProviderResult<()> {
3066        if blocks.is_empty() {
3067            debug!(target: "providers::db", "Attempted to append empty block range");
3068            return Ok(())
3069        }
3070
3071        let first_number = blocks.first().unwrap().number();
3072
3073        let last = blocks.last().unwrap();
3074        let last_block_number = last.number();
3075
3076        let mut durations_recorder = metrics::DurationsRecorder::default();
3077
3078        // Insert the blocks
3079        for block in blocks {
3080            self.insert_block(block, StorageLocation::Database)?;
3081            durations_recorder.record_relative(metrics::Action::InsertBlock);
3082        }
3083
3084        self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3085        durations_recorder.record_relative(metrics::Action::InsertState);
3086
3087        // insert hashes and intermediate merkle nodes
3088        self.write_hashed_state(&hashed_state)?;
3089        self.write_trie_updates(&trie_updates)?;
3090        durations_recorder.record_relative(metrics::Action::InsertHashes);
3091
3092        self.update_history_indices(first_number..=last_block_number)?;
3093        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3094
3095        // Update pipeline progress
3096        self.update_pipeline_stages(last_block_number, false)?;
3097        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3098
3099        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3100
3101        Ok(())
3102    }
3103}
3104
3105impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3106    fn get_prune_checkpoint(
3107        &self,
3108        segment: PruneSegment,
3109    ) -> ProviderResult<Option<PruneCheckpoint>> {
3110        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3111    }
3112
3113    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3114        Ok(self
3115            .tx
3116            .cursor_read::<tables::PruneCheckpoints>()?
3117            .walk(None)?
3118            .collect::<Result<_, _>>()?)
3119    }
3120}
3121
3122impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3123    fn save_prune_checkpoint(
3124        &self,
3125        segment: PruneSegment,
3126        checkpoint: PruneCheckpoint,
3127    ) -> ProviderResult<()> {
3128        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3129    }
3130}
3131
3132impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3133    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3134        let db_entries = self.tx.entries::<T>()?;
3135        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3136            Ok(entries) => entries,
3137            Err(ProviderError::UnsupportedProvider) => 0,
3138            Err(err) => return Err(err),
3139        };
3140
3141        Ok(db_entries + static_file_entries)
3142    }
3143}
3144
3145impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3146    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3147        let mut finalized_blocks = self
3148            .tx
3149            .cursor_read::<tables::ChainState>()?
3150            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3151            .take(1)
3152            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3153
3154        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3155        Ok(last_finalized_block_number)
3156    }
3157
3158    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3159        let mut finalized_blocks = self
3160            .tx
3161            .cursor_read::<tables::ChainState>()?
3162            .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3163            .take(1)
3164            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3165
3166        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3167        Ok(last_finalized_block_number)
3168    }
3169}
3170
3171impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3172    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3173        Ok(self
3174            .tx
3175            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3176    }
3177
3178    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3179        Ok(self
3180            .tx
3181            .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3182    }
3183}
3184
3185impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3186    type Tx = TX;
3187
3188    fn tx_ref(&self) -> &Self::Tx {
3189        &self.tx
3190    }
3191
3192    fn tx_mut(&mut self) -> &mut Self::Tx {
3193        &mut self.tx
3194    }
3195
3196    fn into_tx(self) -> Self::Tx {
3197        self.tx
3198    }
3199
3200    fn prune_modes_ref(&self) -> &PruneModes {
3201        self.prune_modes_ref()
3202    }
3203}