reth_provider/providers/database/
provider.rs

1use crate::{
2    bundle_state::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        static_file::StaticFileWriter,
6        NodeTypesForProvider, StaticFileProvider,
7    },
8    to_range,
9    traits::{
10        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11    },
12    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14    DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15    HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16    OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17    StageCheckpointReader, StateCommitmentProvider, StateProviderBox, StateWriter,
18    StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter,
19    TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter,
20    WithdrawalsProvider,
21};
22use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header, TxReceipt};
23use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
24use alloy_primitives::{
25    keccak256,
26    map::{hash_map, B256Map, HashMap, HashSet},
27    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
28};
29use itertools::Itertools;
30use rayon::slice::ParallelSliceMut;
31use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
32use reth_db_api::{
33    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
34    database::Database,
35    models::{
36        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
37        ShardedKey, StoredBlockBodyIndices,
38    },
39    table::Table,
40    tables,
41    transaction::{DbTx, DbTxMut},
42    BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
43};
44use reth_execution_types::{Chain, ExecutionOutcome};
45use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
46use reth_primitives_traits::{
47    Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
48    SealedBlock, SealedHeader, SignedTransaction, StorageEntry,
49};
50use reth_prune_types::{
51    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
52};
53use reth_stages_types::{StageCheckpoint, StageId};
54use reth_static_file_types::StaticFileSegment;
55use reth_storage_api::{
56    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
57    StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
58};
59use reth_storage_errors::provider::{ProviderResult, RootMismatch};
60use reth_trie::{
61    prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
62    updates::{StorageTrieUpdates, TrieUpdates},
63    HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
64};
65use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
66use revm_database::states::{
67    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
68};
69use std::{
70    cmp::Ordering,
71    collections::{BTreeMap, BTreeSet},
72    fmt::Debug,
73    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
74    sync::{mpsc, Arc},
75};
76use tracing::{debug, trace};
77
78/// A [`DatabaseProvider`] that holds a read-only database transaction.
79pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
80
81/// A [`DatabaseProvider`] that holds a read-write database transaction.
82///
83/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
84/// Once that issue is solved, we can probably revert back to being an alias type.
85#[derive(Debug)]
86pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
87    pub DatabaseProvider<<DB as Database>::TXMut, N>,
88);
89
90impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
91    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
92
93    fn deref(&self) -> &Self::Target {
94        &self.0
95    }
96}
97
98impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
99    fn deref_mut(&mut self) -> &mut Self::Target {
100        &mut self.0
101    }
102}
103
104impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
105    for DatabaseProviderRW<DB, N>
106{
107    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
108        &self.0
109    }
110}
111
112impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
113    /// Commit database transaction and static file if it exists.
114    pub fn commit(self) -> ProviderResult<bool> {
115        self.0.commit()
116    }
117
118    /// Consume `DbTx` or `DbTxMut`.
119    pub fn into_tx(self) -> <DB as Database>::TXMut {
120        self.0.into_tx()
121    }
122}
123
124impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
125    for DatabaseProvider<<DB as Database>::TXMut, N>
126{
127    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
128        provider.0
129    }
130}
131
132/// A provider struct that fetches data from the database.
133/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
134#[derive(Debug)]
135pub struct DatabaseProvider<TX, N: NodeTypes> {
136    /// Database transaction.
137    tx: TX,
138    /// Chain spec
139    chain_spec: Arc<N::ChainSpec>,
140    /// Static File provider
141    static_file_provider: StaticFileProvider<N::Primitives>,
142    /// Pruning configuration
143    prune_modes: PruneModes,
144    /// Node storage handler.
145    storage: Arc<N::Storage>,
146}
147
148impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
149    /// Returns reference to prune modes.
150    pub const fn prune_modes_ref(&self) -> &PruneModes {
151        &self.prune_modes
152    }
153}
154
155impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
156    /// State provider for latest state
157    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
158        trace!(target: "providers::db", "Returning latest state provider");
159        Box::new(LatestStateProviderRef::new(self))
160    }
161
162    /// Storage provider for state at that given block hash
163    pub fn history_by_block_hash<'a>(
164        &'a self,
165        block_hash: BlockHash,
166    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
167        let mut block_number =
168            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
169        if block_number == self.best_block_number().unwrap_or_default() &&
170            block_number == self.last_block_number().unwrap_or_default()
171        {
172            return Ok(Box::new(LatestStateProviderRef::new(self)))
173        }
174
175        // +1 as the changeset that we want is the one that was applied after this block.
176        block_number += 1;
177
178        let account_history_prune_checkpoint =
179            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
180        let storage_history_prune_checkpoint =
181            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
182
183        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
184
185        // If we pruned account or storage history, we can't return state on every historical block.
186        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
187        if let Some(prune_checkpoint_block_number) =
188            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
189        {
190            state_provider = state_provider.with_lowest_available_account_history_block_number(
191                prune_checkpoint_block_number + 1,
192            );
193        }
194        if let Some(prune_checkpoint_block_number) =
195            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
196        {
197            state_provider = state_provider.with_lowest_available_storage_history_block_number(
198                prune_checkpoint_block_number + 1,
199            );
200        }
201
202        Ok(Box::new(state_provider))
203    }
204
205    #[cfg(feature = "test-utils")]
206    /// Sets the prune modes for provider.
207    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
208        self.prune_modes = prune_modes;
209    }
210}
211
212impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
213    type Primitives = N::Primitives;
214}
215
216impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
217    /// Returns a static file provider
218    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
219        self.static_file_provider.clone()
220    }
221}
222
223impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
224    for DatabaseProvider<TX, N>
225{
226    type ChainSpec = N::ChainSpec;
227
228    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
229        self.chain_spec.clone()
230    }
231}
232
233impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
234    /// Creates a provider with an inner read-write transaction.
235    pub const fn new_rw(
236        tx: TX,
237        chain_spec: Arc<N::ChainSpec>,
238        static_file_provider: StaticFileProvider<N::Primitives>,
239        prune_modes: PruneModes,
240        storage: Arc<N::Storage>,
241    ) -> Self {
242        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
243    }
244}
245
246impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
247    fn as_ref(&self) -> &Self {
248        self
249    }
250}
251
252impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
253    /// Unwinds trie state for the given range.
254    ///
255    /// This includes calculating the resulted state root and comparing it with the parent block
256    /// state root.
257    pub fn unwind_trie_state_range(
258        &self,
259        range: RangeInclusive<BlockNumber>,
260    ) -> ProviderResult<()> {
261        let changed_accounts = self
262            .tx
263            .cursor_read::<tables::AccountChangeSets>()?
264            .walk_range(range.clone())?
265            .collect::<Result<Vec<_>, _>>()?;
266
267        // Unwind account hashes. Add changed accounts to account prefix set.
268        let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
269        let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
270        let mut destroyed_accounts = HashSet::default();
271        for (hashed_address, account) in hashed_addresses {
272            account_prefix_set.insert(Nibbles::unpack(hashed_address));
273            if account.is_none() {
274                destroyed_accounts.insert(hashed_address);
275            }
276        }
277
278        // Unwind account history indices.
279        self.unwind_account_history_indices(changed_accounts.iter())?;
280        let storage_range = BlockNumberAddress::range(range.clone());
281
282        let changed_storages = self
283            .tx
284            .cursor_read::<tables::StorageChangeSets>()?
285            .walk_range(storage_range)?
286            .collect::<Result<Vec<_>, _>>()?;
287
288        // Unwind storage hashes. Add changed account and storage keys to corresponding prefix
289        // sets.
290        let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
291        let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
292        for (hashed_address, hashed_slots) in storage_entries {
293            account_prefix_set.insert(Nibbles::unpack(hashed_address));
294            let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
295            for slot in hashed_slots {
296                storage_prefix_set.insert(Nibbles::unpack(slot));
297            }
298            storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
299        }
300
301        // Unwind storage history indices.
302        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
303
304        // Calculate the reverted merkle root.
305        // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
306        // are pre-loaded.
307        let prefix_sets = TriePrefixSets {
308            account_prefix_set: account_prefix_set.freeze(),
309            storage_prefix_sets,
310            destroyed_accounts,
311        };
312        let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
313            .with_prefix_sets(prefix_sets)
314            .root_with_updates()
315            .map_err(reth_db_api::DatabaseError::from)?;
316
317        let parent_number = range.start().saturating_sub(1);
318        let parent_state_root = self
319            .header_by_number(parent_number)?
320            .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
321            .state_root();
322
323        // state root should be always correct as we are reverting state.
324        // but for sake of double verification we will check it again.
325        if new_state_root != parent_state_root {
326            let parent_hash = self
327                .block_hash(parent_number)?
328                .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
329            return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
330                root: GotExpected { got: new_state_root, expected: parent_state_root },
331                block_number: parent_number,
332                block_hash: parent_hash,
333            })))
334        }
335        self.write_trie_updates(&trie_updates)?;
336
337        Ok(())
338    }
339
340    /// Removes receipts from all transactions starting with provided number (inclusive).
341    fn remove_receipts_from(
342        &self,
343        from_tx: TxNumber,
344        last_block: BlockNumber,
345        remove_from: StorageLocation,
346    ) -> ProviderResult<()> {
347        if remove_from.database() {
348            // iterate over block body and remove receipts
349            self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
350        }
351
352        if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
353            let static_file_receipt_num =
354                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
355
356            let to_delete = static_file_receipt_num
357                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
358                .unwrap_or_default();
359
360            self.static_file_provider
361                .latest_writer(StaticFileSegment::Receipts)?
362                .prune_receipts(to_delete, last_block)?;
363        }
364
365        Ok(())
366    }
367}
368
369impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
370    fn try_into_history_at_block(
371        self,
372        mut block_number: BlockNumber,
373    ) -> ProviderResult<StateProviderBox> {
374        // if the block number is the same as the currently best block number on disk we can use the
375        // latest state provider here
376        if block_number == self.best_block_number().unwrap_or_default() {
377            return Ok(Box::new(LatestStateProvider::new(self)))
378        }
379
380        // +1 as the changeset that we want is the one that was applied after this block.
381        block_number += 1;
382
383        let account_history_prune_checkpoint =
384            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
385        let storage_history_prune_checkpoint =
386            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
387
388        let mut state_provider = HistoricalStateProvider::new(self, block_number);
389
390        // If we pruned account or storage history, we can't return state on every historical block.
391        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
392        if let Some(prune_checkpoint_block_number) =
393            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
394        {
395            state_provider = state_provider.with_lowest_available_account_history_block_number(
396                prune_checkpoint_block_number + 1,
397            );
398        }
399        if let Some(prune_checkpoint_block_number) =
400            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
401        {
402            state_provider = state_provider.with_lowest_available_storage_history_block_number(
403                prune_checkpoint_block_number + 1,
404            );
405        }
406
407        Ok(Box::new(state_provider))
408    }
409}
410
411impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
412    type StateCommitment = N::StateCommitment;
413}
414
415impl<
416        Tx: DbTx + DbTxMut + 'static,
417        N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
418    > DatabaseProvider<Tx, N>
419{
420    // TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev.
421    // #[cfg(any(test, feature = "test-utils"))]
422    /// Inserts an historical block. **Used for setting up test environments**
423    pub fn insert_historical_block(
424        &self,
425        block: RecoveredBlock<<Self as BlockWriter>::Block>,
426    ) -> ProviderResult<StoredBlockBodyIndices> {
427        let ttd = if block.number() == 0 {
428            block.header().difficulty()
429        } else {
430            let parent_block_number = block.number() - 1;
431            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
432            parent_ttd + block.header().difficulty()
433        };
434
435        let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
436
437        // Backfill: some tests start at a forward block number, but static files require no gaps.
438        let segment_header = writer.user_header();
439        if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
440            for block_number in 0..block.number() {
441                let mut prev = block.clone_header();
442                prev.number = block_number;
443                writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
444            }
445        }
446
447        writer.append_header(block.header(), ttd, &block.hash())?;
448
449        self.insert_block(block, StorageLocation::Database)
450    }
451}
452
453/// For a given key, unwind all history shards that are below the given block number.
454///
455/// S - Sharded key subtype.
456/// T - Table to walk over.
457/// C - Cursor implementation.
458///
459/// This function walks the entries from the given start key and deletes all shards that belong to
460/// the key and are below the given block number.
461///
462/// The boundary shard (the shard is split by the block number) is removed from the database. Any
463/// indices that are above the block number are filtered out. The boundary shard is returned for
464/// reinsertion (if it's not empty).
465fn unwind_history_shards<S, T, C>(
466    cursor: &mut C,
467    start_key: T::Key,
468    block_number: BlockNumber,
469    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
470) -> ProviderResult<Vec<u64>>
471where
472    T: Table<Value = BlockNumberList>,
473    T::Key: AsRef<ShardedKey<S>>,
474    C: DbCursorRO<T> + DbCursorRW<T>,
475{
476    let mut item = cursor.seek_exact(start_key)?;
477    while let Some((sharded_key, list)) = item {
478        // If the shard does not belong to the key, break.
479        if !shard_belongs_to_key(&sharded_key) {
480            break
481        }
482        cursor.delete_current()?;
483
484        // Check the first item.
485        // If it is greater or eq to the block number, delete it.
486        let first = list.iter().next().expect("List can't be empty");
487        if first >= block_number {
488            item = cursor.prev()?;
489            continue
490        } else if block_number <= sharded_key.as_ref().highest_block_number {
491            // Filter out all elements greater than block number.
492            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
493        }
494        return Ok(list.iter().collect::<Vec<_>>())
495    }
496
497    Ok(Vec::new())
498}
499
500impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
501    /// Creates a provider with an inner read-only transaction.
502    pub const fn new(
503        tx: TX,
504        chain_spec: Arc<N::ChainSpec>,
505        static_file_provider: StaticFileProvider<N::Primitives>,
506        prune_modes: PruneModes,
507        storage: Arc<N::Storage>,
508    ) -> Self {
509        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
510    }
511
512    /// Consume `DbTx` or `DbTxMut`.
513    pub fn into_tx(self) -> TX {
514        self.tx
515    }
516
517    /// Pass `DbTx` or `DbTxMut` mutable reference.
518    pub const fn tx_mut(&mut self) -> &mut TX {
519        &mut self.tx
520    }
521
522    /// Pass `DbTx` or `DbTxMut` immutable reference.
523    pub const fn tx_ref(&self) -> &TX {
524        &self.tx
525    }
526
527    /// Returns a reference to the chain specification.
528    pub fn chain_spec(&self) -> &N::ChainSpec {
529        &self.chain_spec
530    }
531}
532
533impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
534    fn transactions_by_tx_range_with_cursor<C>(
535        &self,
536        range: impl RangeBounds<TxNumber>,
537        cursor: &mut C,
538    ) -> ProviderResult<Vec<TxTy<N>>>
539    where
540        C: DbCursorRO<tables::Transactions<TxTy<N>>>,
541    {
542        self.static_file_provider.get_range_with_static_file_or_database(
543            StaticFileSegment::Transactions,
544            to_range(range),
545            |static_file, range, _| static_file.transactions_by_tx_range(range),
546            |range, _| self.cursor_collect(cursor, range),
547            |_| true,
548        )
549    }
550
551    fn recovered_block<H, HF, B, BF>(
552        &self,
553        id: BlockHashOrNumber,
554        _transaction_kind: TransactionVariant,
555        header_by_number: HF,
556        construct_block: BF,
557    ) -> ProviderResult<Option<B>>
558    where
559        H: AsRef<HeaderTy<N>>,
560        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
561        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
562    {
563        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
564        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
565
566        // Get the block body
567        //
568        // If the body indices are not found, this means that the transactions either do not exist
569        // in the database yet, or they do exit but are not indexed. If they exist but are not
570        // indexed, we don't have enough information to return the block anyways, so we return
571        // `None`.
572        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
573
574        let tx_range = body.tx_num_range();
575
576        let (transactions, senders) = if tx_range.is_empty() {
577            (vec![], vec![])
578        } else {
579            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
580        };
581
582        let body = self
583            .storage
584            .reader()
585            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
586            .pop()
587            .ok_or(ProviderError::InvalidStorageOutput)?;
588
589        construct_block(header, body, senders)
590    }
591
592    /// Returns a range of blocks from the database.
593    ///
594    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
595    /// construct blocks from the following inputs:
596    ///     – Header
597    ///     - Range of transaction numbers
598    ///     – Ommers
599    ///     – Withdrawals
600    ///     – Senders
601    fn block_range<F, H, HF, R>(
602        &self,
603        range: RangeInclusive<BlockNumber>,
604        headers_range: HF,
605        mut assemble_block: F,
606    ) -> ProviderResult<Vec<R>>
607    where
608        H: AsRef<HeaderTy<N>>,
609        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
610        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
611    {
612        if range.is_empty() {
613            return Ok(Vec::new())
614        }
615
616        let len = range.end().saturating_sub(*range.start()) as usize;
617        let mut blocks = Vec::with_capacity(len);
618
619        let headers = headers_range(range.clone())?;
620        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
621
622        // If the body indices are not found, this means that the transactions either do
623        // not exist in the database yet, or they do exit but are
624        // not indexed. If they exist but are not indexed, we don't
625        // have enough information to return the block anyways, so
626        // we skip the block.
627        let present_headers = self
628            .block_body_indices_range(range)?
629            .into_iter()
630            .map(|b| b.tx_num_range())
631            .zip(headers)
632            .collect::<Vec<_>>();
633
634        let mut inputs = Vec::new();
635        for (tx_range, header) in &present_headers {
636            let transactions = if tx_range.is_empty() {
637                Vec::new()
638            } else {
639                self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
640            };
641
642            inputs.push((header.as_ref(), transactions));
643        }
644
645        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
646
647        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
648            blocks.push(assemble_block(header, body, tx_range)?);
649        }
650
651        Ok(blocks)
652    }
653
654    /// Returns a range of blocks from the database, along with the senders of each
655    /// transaction in the blocks.
656    ///
657    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
658    /// construct blocks from the following inputs:
659    ///     – Header
660    ///     - Transactions
661    ///     – Ommers
662    ///     – Withdrawals
663    ///     – Senders
664    fn block_with_senders_range<H, HF, B, BF>(
665        &self,
666        range: RangeInclusive<BlockNumber>,
667        headers_range: HF,
668        assemble_block: BF,
669    ) -> ProviderResult<Vec<B>>
670    where
671        H: AsRef<HeaderTy<N>>,
672        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
673        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
674    {
675        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
676
677        self.block_range(range, headers_range, |header, body, tx_range| {
678            let senders = if tx_range.is_empty() {
679                Vec::new()
680            } else {
681                // fetch senders from the senders table
682                let known_senders =
683                    senders_cursor
684                        .walk_range(tx_range.clone())?
685                        .collect::<Result<HashMap<_, _>, _>>()?;
686
687                let mut senders = Vec::with_capacity(body.transactions().len());
688                for (tx_num, tx) in tx_range.zip(body.transactions()) {
689                    match known_senders.get(&tx_num) {
690                        None => {
691                            // recover the sender from the transaction if not found
692                            let sender = tx.recover_signer_unchecked()?;
693                            senders.push(sender);
694                        }
695                        Some(sender) => senders.push(*sender),
696                    }
697                }
698
699                senders
700            };
701
702            assemble_block(header, body, senders)
703        })
704    }
705
706    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
707    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
708    /// account changesets.
709    fn populate_bundle_state<A, S>(
710        &self,
711        account_changeset: Vec<(u64, AccountBeforeTx)>,
712        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
713        plain_accounts_cursor: &mut A,
714        plain_storage_cursor: &mut S,
715    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
716    where
717        A: DbCursorRO<PlainAccountState>,
718        S: DbDupCursorRO<PlainStorageState>,
719    {
720        // iterate previous value and get plain state value to create changeset
721        // Double option around Account represent if Account state is know (first option) and
722        // account is removed (Second Option)
723        let mut state: BundleStateInit = HashMap::default();
724
725        // This is not working for blocks that are not at tip. as plain state is not the last
726        // state of end range. We should rename the functions or add support to access
727        // History state. Accessing history state can be tricky but we are not gaining
728        // anything.
729
730        let mut reverts: RevertsInit = HashMap::default();
731
732        // add account changeset changes
733        for (block_number, account_before) in account_changeset.into_iter().rev() {
734            let AccountBeforeTx { info: old_info, address } = account_before;
735            match state.entry(address) {
736                hash_map::Entry::Vacant(entry) => {
737                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
738                    entry.insert((old_info, new_info, HashMap::default()));
739                }
740                hash_map::Entry::Occupied(mut entry) => {
741                    // overwrite old account state.
742                    entry.get_mut().0 = old_info;
743                }
744            }
745            // insert old info into reverts.
746            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
747        }
748
749        // add storage changeset changes
750        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
751            let BlockNumberAddress((block_number, address)) = block_and_address;
752            // get account state or insert from plain state.
753            let account_state = match state.entry(address) {
754                hash_map::Entry::Vacant(entry) => {
755                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
756                    entry.insert((present_info, present_info, HashMap::default()))
757                }
758                hash_map::Entry::Occupied(entry) => entry.into_mut(),
759            };
760
761            // match storage.
762            match account_state.2.entry(old_storage.key) {
763                hash_map::Entry::Vacant(entry) => {
764                    let new_storage = plain_storage_cursor
765                        .seek_by_key_subkey(address, old_storage.key)?
766                        .filter(|storage| storage.key == old_storage.key)
767                        .unwrap_or_default();
768                    entry.insert((old_storage.value, new_storage.value));
769                }
770                hash_map::Entry::Occupied(mut entry) => {
771                    entry.get_mut().0 = old_storage.value;
772                }
773            };
774
775            reverts
776                .entry(block_number)
777                .or_default()
778                .entry(address)
779                .or_default()
780                .1
781                .push(old_storage);
782        }
783
784        Ok((state, reverts))
785    }
786}
787
788impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
789    /// Commit database transaction.
790    pub fn commit(self) -> ProviderResult<bool> {
791        Ok(self.tx.commit()?)
792    }
793
794    /// Load shard and remove it. If list is empty, last shard was full or
795    /// there are no shards at all.
796    fn take_shard<T>(
797        &self,
798        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
799        key: T::Key,
800    ) -> ProviderResult<Vec<u64>>
801    where
802        T: Table<Value = BlockNumberList>,
803    {
804        if let Some((_, list)) = cursor.seek_exact(key)? {
805            // delete old shard so new one can be inserted.
806            cursor.delete_current()?;
807            let list = list.iter().collect::<Vec<_>>();
808            return Ok(list)
809        }
810        Ok(Vec::new())
811    }
812
813    /// Insert history index to the database.
814    ///
815    /// For each updated partial key, this function removes the last shard from
816    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
817    /// inserts the new shards back into the database.
818    ///
819    /// This function is used by history indexing stages.
820    fn append_history_index<P, T>(
821        &self,
822        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
823        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
824    ) -> ProviderResult<()>
825    where
826        P: Copy,
827        T: Table<Value = BlockNumberList>,
828    {
829        let mut cursor = self.tx.cursor_write::<T>()?;
830        for (partial_key, indices) in index_updates {
831            let mut last_shard =
832                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
833            last_shard.extend(indices);
834            // Chunk indices and insert them in shards of N size.
835            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
836            while let Some(list) = chunks.next() {
837                let highest_block_number = if chunks.peek().is_some() {
838                    *list.last().expect("`chunks` does not return empty list")
839                } else {
840                    // Insert last list with `u64::MAX`.
841                    u64::MAX
842                };
843                cursor.insert(
844                    sharded_key_factory(partial_key, highest_block_number),
845                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
846                )?;
847            }
848        }
849        Ok(())
850    }
851}
852
853impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
854    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
855        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
856    }
857}
858
859impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
860    fn changed_accounts_with_range(
861        &self,
862        range: impl RangeBounds<BlockNumber>,
863    ) -> ProviderResult<BTreeSet<Address>> {
864        self.tx
865            .cursor_read::<tables::AccountChangeSets>()?
866            .walk_range(range)?
867            .map(|entry| {
868                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
869            })
870            .collect()
871    }
872
873    fn basic_accounts(
874        &self,
875        iter: impl IntoIterator<Item = Address>,
876    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
877        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
878        Ok(iter
879            .into_iter()
880            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
881            .collect::<Result<Vec<_>, _>>()?)
882    }
883
884    fn changed_accounts_and_blocks_with_range(
885        &self,
886        range: RangeInclusive<BlockNumber>,
887    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
888        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
889
890        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
891            BTreeMap::new(),
892            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
893                let (index, account) = entry?;
894                accounts.entry(account.address).or_default().push(index);
895                Ok(accounts)
896            },
897        )?;
898
899        Ok(account_transitions)
900    }
901}
902
903impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
904    fn storage_changeset(
905        &self,
906        block_number: BlockNumber,
907    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
908        let range = block_number..=block_number;
909        let storage_range = BlockNumberAddress::range(range);
910        self.tx
911            .cursor_dup_read::<tables::StorageChangeSets>()?
912            .walk_range(storage_range)?
913            .map(|result| -> ProviderResult<_> { Ok(result?) })
914            .collect()
915    }
916}
917
918impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
919    fn account_block_changeset(
920        &self,
921        block_number: BlockNumber,
922    ) -> ProviderResult<Vec<AccountBeforeTx>> {
923        let range = block_number..=block_number;
924        self.tx
925            .cursor_read::<tables::AccountChangeSets>()?
926            .walk_range(range)?
927            .map(|result| -> ProviderResult<_> {
928                let (_, account_before) = result?;
929                Ok(account_before)
930            })
931            .collect()
932    }
933}
934
935impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
936    for DatabaseProvider<TX, N>
937{
938    type Header = HeaderTy<N>;
939
940    fn local_tip_header(
941        &self,
942        highest_uninterrupted_block: BlockNumber,
943    ) -> ProviderResult<SealedHeader<Self::Header>> {
944        let static_file_provider = self.static_file_provider();
945
946        // Make sure Headers static file is at the same height. If it's further, this
947        // input execution was interrupted previously and we need to unwind the static file.
948        let next_static_file_block_num = static_file_provider
949            .get_highest_static_file_block(StaticFileSegment::Headers)
950            .map(|id| id + 1)
951            .unwrap_or_default();
952        let next_block = highest_uninterrupted_block + 1;
953
954        match next_static_file_block_num.cmp(&next_block) {
955            // The node shutdown between an executed static file commit and before the database
956            // commit, so we need to unwind the static files.
957            Ordering::Greater => {
958                let mut static_file_producer =
959                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
960                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
961                // Since this is a database <-> static file inconsistency, we commit the change
962                // straight away.
963                static_file_producer.commit()?
964            }
965            Ordering::Less => {
966                // There's either missing or corrupted files.
967                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
968            }
969            Ordering::Equal => {}
970        }
971
972        let local_head = static_file_provider
973            .sealed_header(highest_uninterrupted_block)?
974            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
975
976        Ok(local_head)
977    }
978}
979
980impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
981    type Header = HeaderTy<N>;
982
983    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
984        if let Some(num) = self.block_number(*block_hash)? {
985            Ok(self.header_by_number(num)?)
986        } else {
987            Ok(None)
988        }
989    }
990
991    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
992        self.static_file_provider.get_with_static_file_or_database(
993            StaticFileSegment::Headers,
994            num,
995            |static_file| static_file.header_by_number(num),
996            || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
997        )
998    }
999
1000    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1001        if let Some(num) = self.block_number(*block_hash)? {
1002            self.header_td_by_number(num)
1003        } else {
1004            Ok(None)
1005        }
1006    }
1007
1008    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1009        if self.chain_spec.is_paris_active_at_block(number) {
1010            if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1011                // if this block is higher than the final paris(merge) block, return the final paris
1012                // difficulty
1013                return Ok(Some(td))
1014            }
1015        }
1016
1017        self.static_file_provider.get_with_static_file_or_database(
1018            StaticFileSegment::Headers,
1019            number,
1020            |static_file| static_file.header_td_by_number(number),
1021            || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1022        )
1023    }
1024
1025    fn headers_range(
1026        &self,
1027        range: impl RangeBounds<BlockNumber>,
1028    ) -> ProviderResult<Vec<Self::Header>> {
1029        self.static_file_provider.get_range_with_static_file_or_database(
1030            StaticFileSegment::Headers,
1031            to_range(range),
1032            |static_file, range, _| static_file.headers_range(range),
1033            |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1034            |_| true,
1035        )
1036    }
1037
1038    fn sealed_header(
1039        &self,
1040        number: BlockNumber,
1041    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1042        self.static_file_provider.get_with_static_file_or_database(
1043            StaticFileSegment::Headers,
1044            number,
1045            |static_file| static_file.sealed_header(number),
1046            || {
1047                if let Some(header) = self.header_by_number(number)? {
1048                    let hash = self
1049                        .block_hash(number)?
1050                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1051                    Ok(Some(SealedHeader::new(header, hash)))
1052                } else {
1053                    Ok(None)
1054                }
1055            },
1056        )
1057    }
1058
1059    fn sealed_headers_while(
1060        &self,
1061        range: impl RangeBounds<BlockNumber>,
1062        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1063    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1064        self.static_file_provider.get_range_with_static_file_or_database(
1065            StaticFileSegment::Headers,
1066            to_range(range),
1067            |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1068            |range, mut predicate| {
1069                let mut headers = vec![];
1070                for entry in
1071                    self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1072                {
1073                    let (number, header) = entry?;
1074                    let hash = self
1075                        .block_hash(number)?
1076                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1077                    let sealed = SealedHeader::new(header, hash);
1078                    if !predicate(&sealed) {
1079                        break
1080                    }
1081                    headers.push(sealed);
1082                }
1083                Ok(headers)
1084            },
1085            predicate,
1086        )
1087    }
1088}
1089
1090impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1091    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1092        self.static_file_provider.get_with_static_file_or_database(
1093            StaticFileSegment::Headers,
1094            number,
1095            |static_file| static_file.block_hash(number),
1096            || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1097        )
1098    }
1099
1100    fn canonical_hashes_range(
1101        &self,
1102        start: BlockNumber,
1103        end: BlockNumber,
1104    ) -> ProviderResult<Vec<B256>> {
1105        self.static_file_provider.get_range_with_static_file_or_database(
1106            StaticFileSegment::Headers,
1107            start..end,
1108            |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1109            |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1110            |_| true,
1111        )
1112    }
1113}
1114
1115impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1116    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1117        let best_number = self.best_block_number()?;
1118        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1119        Ok(ChainInfo { best_hash, best_number })
1120    }
1121
1122    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1123        // The best block number is tracked via the finished stage which gets updated in the same tx
1124        // when new blocks committed
1125        Ok(self
1126            .get_stage_checkpoint(StageId::Finish)?
1127            .map(|checkpoint| checkpoint.block_number)
1128            .unwrap_or_default())
1129    }
1130
1131    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1132        Ok(self
1133            .tx
1134            .cursor_read::<tables::CanonicalHeaders>()?
1135            .last()?
1136            .map(|(num, _)| num)
1137            .max(
1138                self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1139            )
1140            .unwrap_or_default())
1141    }
1142
1143    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1144        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1145    }
1146}
1147
1148impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1149    type Block = BlockTy<N>;
1150
1151    fn find_block_by_hash(
1152        &self,
1153        hash: B256,
1154        source: BlockSource,
1155    ) -> ProviderResult<Option<Self::Block>> {
1156        if source.is_canonical() {
1157            self.block(hash.into())
1158        } else {
1159            Ok(None)
1160        }
1161    }
1162
1163    /// Returns the block with matching number from database.
1164    ///
1165    /// If the header for this block is not found, this returns `None`.
1166    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1167    /// will return None.
1168    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1169        if let Some(number) = self.convert_hash_or_number(id)? {
1170            if let Some(header) = self.header_by_number(number)? {
1171                // If the body indices are not found, this means that the transactions either do not
1172                // exist in the database yet, or they do exit but are not indexed.
1173                // If they exist but are not indexed, we don't have enough
1174                // information to return the block anyways, so we return `None`.
1175                let Some(transactions) = self.transactions_by_block(number.into())? else {
1176                    return Ok(None)
1177                };
1178
1179                let body = self
1180                    .storage
1181                    .reader()
1182                    .read_block_bodies(self, vec![(&header, transactions)])?
1183                    .pop()
1184                    .ok_or(ProviderError::InvalidStorageOutput)?;
1185
1186                return Ok(Some(Self::Block::new(header, body)))
1187            }
1188        }
1189
1190        Ok(None)
1191    }
1192
1193    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1194        Ok(None)
1195    }
1196
1197    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1198        Ok(None)
1199    }
1200
1201    fn pending_block_and_receipts(
1202        &self,
1203    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1204        Ok(None)
1205    }
1206
1207    /// Returns the block with senders with matching number or hash from database.
1208    ///
1209    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1210    /// spot, and we want fast querying.**
1211    ///
1212    /// If the header for this block is not found, this returns `None`.
1213    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1214    /// will return None.
1215    fn recovered_block(
1216        &self,
1217        id: BlockHashOrNumber,
1218        transaction_kind: TransactionVariant,
1219    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1220        self.recovered_block(
1221            id,
1222            transaction_kind,
1223            |block_number| self.header_by_number(block_number),
1224            |header, body, senders| {
1225                Self::Block::new(header, body)
1226                    // Note: we're using unchecked here because we know the block contains valid txs
1227                    // wrt to its height and can ignore the s value check so pre
1228                    // EIP-2 txs are allowed
1229                    .try_into_recovered_unchecked(senders)
1230                    .map(Some)
1231                    .map_err(|_| ProviderError::SenderRecoveryError)
1232            },
1233        )
1234    }
1235
1236    fn sealed_block_with_senders(
1237        &self,
1238        id: BlockHashOrNumber,
1239        transaction_kind: TransactionVariant,
1240    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1241        self.recovered_block(
1242            id,
1243            transaction_kind,
1244            |block_number| self.sealed_header(block_number),
1245            |header, body, senders| {
1246                Self::Block::new_sealed(header, body)
1247                    // Note: we're using unchecked here because we know the block contains valid txs
1248                    // wrt to its height and can ignore the s value check so pre
1249                    // EIP-2 txs are allowed
1250                    .try_with_senders_unchecked(senders)
1251                    .map(Some)
1252                    .map_err(|_| ProviderError::SenderRecoveryError)
1253            },
1254        )
1255    }
1256
1257    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1258        self.block_range(
1259            range,
1260            |range| self.headers_range(range),
1261            |header, body, _| Ok(Self::Block::new(header, body)),
1262        )
1263    }
1264
1265    fn block_with_senders_range(
1266        &self,
1267        range: RangeInclusive<BlockNumber>,
1268    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1269        self.block_with_senders_range(
1270            range,
1271            |range| self.headers_range(range),
1272            |header, body, senders| {
1273                Self::Block::new(header, body)
1274                    .try_into_recovered_unchecked(senders)
1275                    .map_err(|_| ProviderError::SenderRecoveryError)
1276            },
1277        )
1278    }
1279
1280    fn recovered_block_range(
1281        &self,
1282        range: RangeInclusive<BlockNumber>,
1283    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1284        self.block_with_senders_range(
1285            range,
1286            |range| self.sealed_headers_range(range),
1287            |header, body, senders| {
1288                Self::Block::new_sealed(header, body)
1289                    .try_with_senders(senders)
1290                    .map_err(|_| ProviderError::SenderRecoveryError)
1291            },
1292        )
1293    }
1294}
1295
1296impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1297    for DatabaseProvider<TX, N>
1298{
1299    /// Recovers transaction hashes by walking through `Transactions` table and
1300    /// calculating them in a parallel manner. Returned unsorted.
1301    fn transaction_hashes_by_range(
1302        &self,
1303        tx_range: Range<TxNumber>,
1304    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1305        self.static_file_provider.get_range_with_static_file_or_database(
1306            StaticFileSegment::Transactions,
1307            tx_range,
1308            |static_file, range, _| static_file.transaction_hashes_by_range(range),
1309            |tx_range, _| {
1310                let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1311                let tx_range_size = tx_range.clone().count();
1312                let tx_walker = tx_cursor.walk_range(tx_range)?;
1313
1314                let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1315                let mut channels = Vec::with_capacity(chunk_size);
1316                let mut transaction_count = 0;
1317
1318                #[inline]
1319                fn calculate_hash<T>(
1320                    entry: Result<(TxNumber, T), DatabaseError>,
1321                    rlp_buf: &mut Vec<u8>,
1322                ) -> Result<(B256, TxNumber), Box<ProviderError>>
1323                where
1324                    T: Encodable2718,
1325                {
1326                    let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1327                    tx.encode_2718(rlp_buf);
1328                    Ok((keccak256(rlp_buf), tx_id))
1329                }
1330
1331                for chunk in &tx_walker.chunks(chunk_size) {
1332                    let (tx, rx) = mpsc::channel();
1333                    channels.push(rx);
1334
1335                    // Note: Unfortunate side-effect of how chunk is designed in itertools (it is
1336                    // not Send)
1337                    let chunk: Vec<_> = chunk.collect();
1338                    transaction_count += chunk.len();
1339
1340                    // Spawn the task onto the global rayon pool
1341                    // This task will send the results through the channel after it has calculated
1342                    // the hash.
1343                    rayon::spawn(move || {
1344                        let mut rlp_buf = Vec::with_capacity(128);
1345                        for entry in chunk {
1346                            rlp_buf.clear();
1347                            let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1348                        }
1349                    });
1350                }
1351                let mut tx_list = Vec::with_capacity(transaction_count);
1352
1353                // Iterate over channels and append the tx hashes unsorted
1354                for channel in channels {
1355                    while let Ok(tx) = channel.recv() {
1356                        let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1357                        tx_list.push((tx_hash, tx_id));
1358                    }
1359                }
1360
1361                Ok(tx_list)
1362            },
1363            |_| true,
1364        )
1365    }
1366}
1367
1368// Calculates the hash of the given transaction
1369impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1370    type Transaction = TxTy<N>;
1371
1372    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1373        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1374    }
1375
1376    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1377        self.static_file_provider.get_with_static_file_or_database(
1378            StaticFileSegment::Transactions,
1379            id,
1380            |static_file| static_file.transaction_by_id(id),
1381            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1382        )
1383    }
1384
1385    fn transaction_by_id_unhashed(
1386        &self,
1387        id: TxNumber,
1388    ) -> ProviderResult<Option<Self::Transaction>> {
1389        self.static_file_provider.get_with_static_file_or_database(
1390            StaticFileSegment::Transactions,
1391            id,
1392            |static_file| static_file.transaction_by_id_unhashed(id),
1393            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1394        )
1395    }
1396
1397    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1398        if let Some(id) = self.transaction_id(hash)? {
1399            Ok(self.transaction_by_id_unhashed(id)?)
1400        } else {
1401            Ok(None)
1402        }
1403    }
1404
1405    fn transaction_by_hash_with_meta(
1406        &self,
1407        tx_hash: TxHash,
1408    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1409        let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1410        if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1411            if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1412                if let Some(block_number) =
1413                    transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1414                {
1415                    if let Some(sealed_header) = self.sealed_header(block_number)? {
1416                        let (header, block_hash) = sealed_header.split();
1417                        if let Some(block_body) = self.block_body_indices(block_number)? {
1418                            // the index of the tx in the block is the offset:
1419                            // len([start..tx_id])
1420                            // NOTE: `transaction_id` is always `>=` the block's first
1421                            // index
1422                            let index = transaction_id - block_body.first_tx_num();
1423
1424                            let meta = TransactionMeta {
1425                                tx_hash,
1426                                index,
1427                                block_hash,
1428                                block_number,
1429                                base_fee: header.base_fee_per_gas(),
1430                                excess_blob_gas: header.excess_blob_gas(),
1431                                timestamp: header.timestamp(),
1432                            };
1433
1434                            return Ok(Some((transaction, meta)))
1435                        }
1436                    }
1437                }
1438            }
1439        }
1440
1441        Ok(None)
1442    }
1443
1444    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1445        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1446        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1447    }
1448
1449    fn transactions_by_block(
1450        &self,
1451        id: BlockHashOrNumber,
1452    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1453        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1454
1455        if let Some(block_number) = self.convert_hash_or_number(id)? {
1456            if let Some(body) = self.block_body_indices(block_number)? {
1457                let tx_range = body.tx_num_range();
1458                return if tx_range.is_empty() {
1459                    Ok(Some(Vec::new()))
1460                } else {
1461                    Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1462                }
1463            }
1464        }
1465        Ok(None)
1466    }
1467
1468    fn transactions_by_block_range(
1469        &self,
1470        range: impl RangeBounds<BlockNumber>,
1471    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1472        let range = to_range(range);
1473        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1474
1475        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1476            .into_iter()
1477            .map(|body| {
1478                let tx_num_range = body.tx_num_range();
1479                if tx_num_range.is_empty() {
1480                    Ok(Vec::new())
1481                } else {
1482                    Ok(self
1483                        .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1484                        .into_iter()
1485                        .collect())
1486                }
1487            })
1488            .collect()
1489    }
1490
1491    fn transactions_by_tx_range(
1492        &self,
1493        range: impl RangeBounds<TxNumber>,
1494    ) -> ProviderResult<Vec<Self::Transaction>> {
1495        self.transactions_by_tx_range_with_cursor(
1496            range,
1497            &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1498        )
1499    }
1500
1501    fn senders_by_tx_range(
1502        &self,
1503        range: impl RangeBounds<TxNumber>,
1504    ) -> ProviderResult<Vec<Address>> {
1505        self.cursor_read_collect::<tables::TransactionSenders>(range)
1506    }
1507
1508    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1509        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1510    }
1511}
1512
1513impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1514    type Receipt = ReceiptTy<N>;
1515
1516    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1517        self.static_file_provider.get_with_static_file_or_database(
1518            StaticFileSegment::Receipts,
1519            id,
1520            |static_file| static_file.receipt(id),
1521            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1522        )
1523    }
1524
1525    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1526        if let Some(id) = self.transaction_id(hash)? {
1527            self.receipt(id)
1528        } else {
1529            Ok(None)
1530        }
1531    }
1532
1533    fn receipts_by_block(
1534        &self,
1535        block: BlockHashOrNumber,
1536    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1537        if let Some(number) = self.convert_hash_or_number(block)? {
1538            if let Some(body) = self.block_body_indices(number)? {
1539                let tx_range = body.tx_num_range();
1540                return if tx_range.is_empty() {
1541                    Ok(Some(Vec::new()))
1542                } else {
1543                    self.receipts_by_tx_range(tx_range).map(Some)
1544                }
1545            }
1546        }
1547        Ok(None)
1548    }
1549
1550    fn receipts_by_tx_range(
1551        &self,
1552        range: impl RangeBounds<TxNumber>,
1553    ) -> ProviderResult<Vec<Self::Receipt>> {
1554        self.static_file_provider.get_range_with_static_file_or_database(
1555            StaticFileSegment::Receipts,
1556            to_range(range),
1557            |static_file, range, _| static_file.receipts_by_tx_range(range),
1558            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1559            |_| true,
1560        )
1561    }
1562}
1563
1564impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1565    for DatabaseProvider<TX, N>
1566{
1567    fn withdrawals_by_block(
1568        &self,
1569        id: BlockHashOrNumber,
1570        timestamp: u64,
1571    ) -> ProviderResult<Option<Withdrawals>> {
1572        if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1573            if let Some(number) = self.convert_hash_or_number(id)? {
1574                return self.static_file_provider.get_with_static_file_or_database(
1575                    StaticFileSegment::BlockMeta,
1576                    number,
1577                    |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1578                    || {
1579                        // If we are past shanghai, then all blocks should have a withdrawal list,
1580                        // even if empty
1581                        let withdrawals = self
1582                            .tx
1583                            .get::<tables::BlockWithdrawals>(number)
1584                            .map(|w| w.map(|w| w.withdrawals))?
1585                            .unwrap_or_default();
1586                        Ok(Some(withdrawals))
1587                    },
1588                )
1589            }
1590        }
1591        Ok(None)
1592    }
1593}
1594
1595impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1596    /// Returns the ommers for the block with matching id from the database.
1597    ///
1598    /// If the block is not found, this returns `None`.
1599    /// If the block exists, but doesn't contain ommers, this returns `None`.
1600    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1601        if let Some(number) = self.convert_hash_or_number(id)? {
1602            // If the Paris (Merge) hardfork block is known and block is after it, return empty
1603            // ommers.
1604            if self.chain_spec.is_paris_active_at_block(number) {
1605                return Ok(Some(Vec::new()))
1606            }
1607
1608            return self.static_file_provider.get_with_static_file_or_database(
1609                StaticFileSegment::BlockMeta,
1610                number,
1611                |static_file| static_file.ommers(id),
1612                || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1613            )
1614        }
1615
1616        Ok(None)
1617    }
1618}
1619
1620impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1621    for DatabaseProvider<TX, N>
1622{
1623    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1624        self.static_file_provider.get_with_static_file_or_database(
1625            StaticFileSegment::BlockMeta,
1626            num,
1627            |static_file| static_file.block_body_indices(num),
1628            || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1629        )
1630    }
1631
1632    fn block_body_indices_range(
1633        &self,
1634        range: RangeInclusive<BlockNumber>,
1635    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1636        self.static_file_provider.get_range_with_static_file_or_database(
1637            StaticFileSegment::BlockMeta,
1638            *range.start()..*range.end() + 1,
1639            |static_file, range, _| {
1640                static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1641            },
1642            |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1643            |_| true,
1644        )
1645    }
1646}
1647
1648impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1649    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1650        Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1651    }
1652
1653    /// Get stage checkpoint progress.
1654    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1655        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1656    }
1657
1658    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1659        self.tx
1660            .cursor_read::<tables::StageCheckpoints>()?
1661            .walk(None)?
1662            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1663            .map_err(ProviderError::Database)
1664    }
1665}
1666
1667impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1668    /// Save stage checkpoint.
1669    fn save_stage_checkpoint(
1670        &self,
1671        id: StageId,
1672        checkpoint: StageCheckpoint,
1673    ) -> ProviderResult<()> {
1674        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1675    }
1676
1677    /// Save stage checkpoint progress.
1678    fn save_stage_checkpoint_progress(
1679        &self,
1680        id: StageId,
1681        checkpoint: Vec<u8>,
1682    ) -> ProviderResult<()> {
1683        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1684    }
1685
1686    fn update_pipeline_stages(
1687        &self,
1688        block_number: BlockNumber,
1689        drop_stage_checkpoint: bool,
1690    ) -> ProviderResult<()> {
1691        // iterate over all existing stages in the table and update its progress.
1692        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1693        for stage_id in StageId::ALL {
1694            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1695            cursor.upsert(
1696                stage_id.to_string(),
1697                &StageCheckpoint {
1698                    block_number,
1699                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1700                },
1701            )?;
1702        }
1703
1704        Ok(())
1705    }
1706}
1707
1708impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1709    fn plain_state_storages(
1710        &self,
1711        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1712    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1713        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1714
1715        addresses_with_keys
1716            .into_iter()
1717            .map(|(address, storage)| {
1718                storage
1719                    .into_iter()
1720                    .map(|key| -> ProviderResult<_> {
1721                        Ok(plain_storage
1722                            .seek_by_key_subkey(address, key)?
1723                            .filter(|v| v.key == key)
1724                            .unwrap_or_else(|| StorageEntry { key, value: Default::default() }))
1725                    })
1726                    .collect::<ProviderResult<Vec<_>>>()
1727                    .map(|storage| (address, storage))
1728            })
1729            .collect::<ProviderResult<Vec<(_, _)>>>()
1730    }
1731
1732    fn changed_storages_with_range(
1733        &self,
1734        range: RangeInclusive<BlockNumber>,
1735    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1736        self.tx
1737            .cursor_read::<tables::StorageChangeSets>()?
1738            .walk_range(BlockNumberAddress::range(range))?
1739            // fold all storages and save its old state so we can remove it from HashedStorage
1740            // it is needed as it is dup table.
1741            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1742                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1743                accounts.entry(address).or_default().insert(storage_entry.key);
1744                Ok(accounts)
1745            })
1746    }
1747
1748    fn changed_storages_and_blocks_with_range(
1749        &self,
1750        range: RangeInclusive<BlockNumber>,
1751    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1752        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1753
1754        let storage_changeset_lists =
1755            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1756                BTreeMap::new(),
1757                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1758                    let (index, storage) = entry?;
1759                    storages
1760                        .entry((index.address(), storage.key))
1761                        .or_default()
1762                        .push(index.block_number());
1763                    Ok(storages)
1764                },
1765            )?;
1766
1767        Ok(storage_changeset_lists)
1768    }
1769}
1770
1771impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1772    for DatabaseProvider<TX, N>
1773{
1774    type Receipt = ReceiptTy<N>;
1775
1776    fn write_state(
1777        &self,
1778        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1779        is_value_known: OriginalValuesKnown,
1780        write_receipts_to: StorageLocation,
1781    ) -> ProviderResult<()> {
1782        let first_block = execution_outcome.first_block();
1783        let block_count = execution_outcome.len() as u64;
1784        let last_block = execution_outcome.last_block();
1785        let block_range = first_block..=last_block;
1786
1787        let tip = self.last_block_number()?.max(last_block);
1788
1789        let (plain_state, reverts) =
1790            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1791
1792        self.write_state_reverts(reverts, first_block)?;
1793        self.write_state_changes(plain_state)?;
1794
1795        // Fetch the first transaction number for each block in the range
1796        let block_indices: Vec<_> = self
1797            .block_body_indices_range(block_range)?
1798            .into_iter()
1799            .map(|b| b.first_tx_num)
1800            .collect();
1801
1802        // Ensure all expected blocks are present.
1803        if block_indices.len() < block_count as usize {
1804            let missing_blocks = block_count - block_indices.len() as u64;
1805            return Err(ProviderError::BlockBodyIndicesNotFound(
1806                last_block.saturating_sub(missing_blocks - 1),
1807            ));
1808        }
1809
1810        let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1811
1812        // Prepare receipts cursor if we are going to write receipts to the database
1813        //
1814        // We are writing to database if requested or if there's any kind of receipt pruning
1815        // configured
1816        let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1817            .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1818            .transpose()?;
1819
1820        // Prepare receipts static writer if we are going to write receipts to static files
1821        //
1822        // We are writing to static files if requested and if there's no receipt pruning configured
1823        let mut receipts_static_writer = (write_receipts_to.static_files() &&
1824            !has_receipts_pruning)
1825            .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1826            .transpose()?;
1827
1828        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1829        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1830
1831        // All receipts from the last 128 blocks are required for blockchain tree, even with
1832        // [`PruneSegment::ContractLogs`].
1833        let prunable_receipts =
1834            PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1835
1836        // Prepare set of addresses which logs should not be pruned.
1837        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1838        for (_, addresses) in contract_log_pruner.range(..first_block) {
1839            allowed_addresses.extend(addresses.iter().copied());
1840        }
1841
1842        for (idx, (receipts, first_tx_index)) in
1843            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1844        {
1845            let block_number = first_block + idx as u64;
1846
1847            // Increment block number for receipts static file writer
1848            if let Some(writer) = receipts_static_writer.as_mut() {
1849                writer.increment_block(block_number)?;
1850            }
1851
1852            // Skip writing receipts if pruning configuration requires us to.
1853            if prunable_receipts &&
1854                self.prune_modes
1855                    .receipts
1856                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1857            {
1858                continue
1859            }
1860
1861            // If there are new addresses to retain after this block number, track them
1862            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1863                allowed_addresses.extend(new_addresses.iter().copied());
1864            }
1865
1866            for (idx, receipt) in receipts.iter().enumerate() {
1867                let receipt_idx = first_tx_index + idx as u64;
1868                // Skip writing receipt if log filter is active and it does not have any logs to
1869                // retain
1870                if prunable_receipts &&
1871                    has_contract_log_filter &&
1872                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1873                {
1874                    continue
1875                }
1876
1877                if let Some(writer) = &mut receipts_static_writer {
1878                    writer.append_receipt(receipt_idx, receipt)?;
1879                }
1880
1881                if let Some(cursor) = &mut receipts_cursor {
1882                    cursor.append(receipt_idx, receipt)?;
1883                }
1884            }
1885        }
1886
1887        Ok(())
1888    }
1889
1890    fn write_state_reverts(
1891        &self,
1892        reverts: PlainStateReverts,
1893        first_block: BlockNumber,
1894    ) -> ProviderResult<()> {
1895        // Write storage changes
1896        tracing::trace!("Writing storage changes");
1897        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1898        let mut storage_changeset_cursor =
1899            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1900        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1901            let block_number = first_block + block_index as BlockNumber;
1902
1903            tracing::trace!(block_number, "Writing block change");
1904            // sort changes by address.
1905            storage_changes.par_sort_unstable_by_key(|a| a.address);
1906            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1907                let storage_id = BlockNumberAddress((block_number, address));
1908
1909                let mut storage = storage_revert
1910                    .into_iter()
1911                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1912                    .collect::<Vec<_>>();
1913                // sort storage slots by key.
1914                storage.par_sort_unstable_by_key(|a| a.0);
1915
1916                // If we are writing the primary storage wipe transition, the pre-existing plain
1917                // storage state has to be taken from the database and written to storage history.
1918                // See [StorageWipe::Primary] for more details.
1919                let mut wiped_storage = Vec::new();
1920                if wiped {
1921                    tracing::trace!(?address, "Wiping storage");
1922                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1923                        wiped_storage.push((entry.key, entry.value));
1924                        while let Some(entry) = storages_cursor.next_dup_val()? {
1925                            wiped_storage.push((entry.key, entry.value))
1926                        }
1927                    }
1928                }
1929
1930                tracing::trace!(?address, ?storage, "Writing storage reverts");
1931                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1932                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1933                }
1934            }
1935        }
1936
1937        // Write account changes
1938        tracing::trace!("Writing account changes");
1939        let mut account_changeset_cursor =
1940            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1941
1942        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1943            let block_number = first_block + block_index as BlockNumber;
1944            // Sort accounts by address.
1945            account_block_reverts.par_sort_by_key(|a| a.0);
1946
1947            for (address, info) in account_block_reverts {
1948                account_changeset_cursor.append_dup(
1949                    block_number,
1950                    AccountBeforeTx { address, info: info.map(Into::into) },
1951                )?;
1952            }
1953        }
1954
1955        Ok(())
1956    }
1957
1958    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1959        // sort all entries so they can be written to database in more performant way.
1960        // and take smaller memory footprint.
1961        changes.accounts.par_sort_by_key(|a| a.0);
1962        changes.storage.par_sort_by_key(|a| a.address);
1963        changes.contracts.par_sort_by_key(|a| a.0);
1964
1965        // Write new account state
1966        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1967        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1968        // write account to database.
1969        for (address, account) in changes.accounts {
1970            if let Some(account) = account {
1971                tracing::trace!(?address, "Updating plain state account");
1972                accounts_cursor.upsert(address, &account.into())?;
1973            } else if accounts_cursor.seek_exact(address)?.is_some() {
1974                tracing::trace!(?address, "Deleting plain state account");
1975                accounts_cursor.delete_current()?;
1976            }
1977        }
1978
1979        // Write bytecode
1980        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1981        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1982        for (hash, bytecode) in changes.contracts {
1983            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1984        }
1985
1986        // Write new storage state and wipe storage if needed.
1987        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1988        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1989        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1990            // Wiping of storage.
1991            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1992                storages_cursor.delete_current_duplicates()?;
1993            }
1994            // cast storages to B256.
1995            let mut storage = storage
1996                .into_iter()
1997                .map(|(k, value)| StorageEntry { key: k.into(), value })
1998                .collect::<Vec<_>>();
1999            // sort storage slots by key.
2000            storage.par_sort_unstable_by_key(|a| a.key);
2001
2002            for entry in storage {
2003                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2004                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2005                    if db_entry.key == entry.key {
2006                        storages_cursor.delete_current()?;
2007                    }
2008                }
2009
2010                if !entry.value.is_zero() {
2011                    storages_cursor.upsert(address, &entry)?;
2012                }
2013            }
2014        }
2015
2016        Ok(())
2017    }
2018
2019    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2020        // Write hashed account updates.
2021        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2022        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2023            if let Some(account) = account {
2024                hashed_accounts_cursor.upsert(hashed_address, &account)?;
2025            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2026                hashed_accounts_cursor.delete_current()?;
2027            }
2028        }
2029
2030        // Write hashed storage changes.
2031        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2032        let mut hashed_storage_cursor =
2033            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2034        for (hashed_address, storage) in sorted_storages {
2035            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2036                hashed_storage_cursor.delete_current_duplicates()?;
2037            }
2038
2039            for (hashed_slot, value) in storage.storage_slots_sorted() {
2040                let entry = StorageEntry { key: hashed_slot, value };
2041                if let Some(db_entry) =
2042                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2043                {
2044                    if db_entry.key == entry.key {
2045                        hashed_storage_cursor.delete_current()?;
2046                    }
2047                }
2048
2049                if !entry.value.is_zero() {
2050                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2051                }
2052            }
2053        }
2054
2055        Ok(())
2056    }
2057
2058    /// Remove the last N blocks of state.
2059    ///
2060    /// The latest state will be unwound
2061    ///
2062    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2063    ///    transaction ids.
2064    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2065    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2066    ///    the changesets.
2067    ///    - In order to have both the old and new values in the changesets, we also access the
2068    ///      plain state tables.
2069    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2070    ///    we:
2071    ///     1. Take the old value from the changeset
2072    ///     2. Take the new value from the plain state
2073    ///     3. Save the old value to the local state
2074    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2075    ///    have seen before we:
2076    ///     1. Take the old value from the changeset
2077    ///     2. Take the new value from the local state
2078    ///     3. Set the local state to the value in the changeset
2079    fn remove_state_above(
2080        &self,
2081        block: BlockNumber,
2082        remove_receipts_from: StorageLocation,
2083    ) -> ProviderResult<()> {
2084        let range = block + 1..=self.last_block_number()?;
2085
2086        if range.is_empty() {
2087            return Ok(());
2088        }
2089
2090        // We are not removing block meta as it is used to get block changesets.
2091        let block_bodies = self.block_body_indices_range(range.clone())?;
2092
2093        // get transaction receipts
2094        let from_transaction_num =
2095            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2096
2097        let storage_range = BlockNumberAddress::range(range.clone());
2098
2099        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2100        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2101
2102        // This is not working for blocks that are not at tip. as plain state is not the last
2103        // state of end range. We should rename the functions or add support to access
2104        // History state. Accessing history state can be tricky but we are not gaining
2105        // anything.
2106        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2107        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2108
2109        let (state, _) = self.populate_bundle_state(
2110            account_changeset,
2111            storage_changeset,
2112            &mut plain_accounts_cursor,
2113            &mut plain_storage_cursor,
2114        )?;
2115
2116        // iterate over local plain state remove all account and all storages.
2117        for (address, (old_account, new_account, storage)) in &state {
2118            // revert account if needed.
2119            if old_account != new_account {
2120                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2121                if let Some(account) = old_account {
2122                    plain_accounts_cursor.upsert(*address, account)?;
2123                } else if existing_entry.is_some() {
2124                    plain_accounts_cursor.delete_current()?;
2125                }
2126            }
2127
2128            // revert storages
2129            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2130                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2131                // delete previous value
2132                // TODO: This does not use dupsort features
2133                if plain_storage_cursor
2134                    .seek_by_key_subkey(*address, *storage_key)?
2135                    .filter(|s| s.key == *storage_key)
2136                    .is_some()
2137                {
2138                    plain_storage_cursor.delete_current()?
2139                }
2140
2141                // insert value if needed
2142                if !old_storage_value.is_zero() {
2143                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2144                }
2145            }
2146        }
2147
2148        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2149
2150        Ok(())
2151    }
2152
2153    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2154    ///
2155    /// The latest state will be unwound and returned back with all the blocks
2156    ///
2157    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2158    ///    transaction ids.
2159    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2160    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2161    ///    the changesets.
2162    ///    - In order to have both the old and new values in the changesets, we also access the
2163    ///      plain state tables.
2164    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2165    ///    we:
2166    ///     1. Take the old value from the changeset
2167    ///     2. Take the new value from the plain state
2168    ///     3. Save the old value to the local state
2169    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2170    ///    have seen before we:
2171    ///     1. Take the old value from the changeset
2172    ///     2. Take the new value from the local state
2173    ///     3. Set the local state to the value in the changeset
2174    fn take_state_above(
2175        &self,
2176        block: BlockNumber,
2177        remove_receipts_from: StorageLocation,
2178    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2179        let range = block + 1..=self.last_block_number()?;
2180
2181        if range.is_empty() {
2182            return Ok(ExecutionOutcome::default())
2183        }
2184        let start_block_number = *range.start();
2185
2186        // We are not removing block meta as it is used to get block changesets.
2187        let block_bodies = self.block_body_indices_range(range.clone())?;
2188
2189        // get transaction receipts
2190        let from_transaction_num =
2191            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2192        let to_transaction_num =
2193            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2194
2195        let storage_range = BlockNumberAddress::range(range.clone());
2196
2197        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2198        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2199
2200        // This is not working for blocks that are not at tip. as plain state is not the last
2201        // state of end range. We should rename the functions or add support to access
2202        // History state. Accessing history state can be tricky but we are not gaining
2203        // anything.
2204        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2205        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2206
2207        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2208        // remove, and return later
2209        let (state, reverts) = self.populate_bundle_state(
2210            account_changeset,
2211            storage_changeset,
2212            &mut plain_accounts_cursor,
2213            &mut plain_storage_cursor,
2214        )?;
2215
2216        // iterate over local plain state remove all account and all storages.
2217        for (address, (old_account, new_account, storage)) in &state {
2218            // revert account if needed.
2219            if old_account != new_account {
2220                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2221                if let Some(account) = old_account {
2222                    plain_accounts_cursor.upsert(*address, account)?;
2223                } else if existing_entry.is_some() {
2224                    plain_accounts_cursor.delete_current()?;
2225                }
2226            }
2227
2228            // revert storages
2229            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2230                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2231                // delete previous value
2232                // TODO: This does not use dupsort features
2233                if plain_storage_cursor
2234                    .seek_by_key_subkey(*address, *storage_key)?
2235                    .filter(|s| s.key == *storage_key)
2236                    .is_some()
2237                {
2238                    plain_storage_cursor.delete_current()?
2239                }
2240
2241                // insert value if needed
2242                if !old_storage_value.is_zero() {
2243                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2244                }
2245            }
2246        }
2247
2248        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2249        let mut receipts_iter = self
2250            .static_file_provider
2251            .get_range_with_static_file_or_database(
2252                StaticFileSegment::Receipts,
2253                from_transaction_num..to_transaction_num + 1,
2254                |static_file, range, _| {
2255                    static_file
2256                        .receipts_by_tx_range(range.clone())
2257                        .map(|r| range.into_iter().zip(r).collect())
2258                },
2259                |range, _| {
2260                    self.tx
2261                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2262                        .walk_range(range)?
2263                        .map(|r| r.map_err(Into::into))
2264                        .collect()
2265                },
2266                |_| true,
2267            )?
2268            .into_iter()
2269            .peekable();
2270
2271        let mut receipts = Vec::with_capacity(block_bodies.len());
2272        // loop break if we are at the end of the blocks.
2273        for block_body in block_bodies {
2274            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2275            for num in block_body.tx_num_range() {
2276                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2277                    block_receipts.push(receipts_iter.next().unwrap().1);
2278                }
2279            }
2280            receipts.push(block_receipts);
2281        }
2282
2283        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2284
2285        Ok(ExecutionOutcome::new_init(
2286            state,
2287            reverts,
2288            Vec::new(),
2289            receipts,
2290            start_block_number,
2291            Vec::new(),
2292        ))
2293    }
2294}
2295
2296impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2297    /// Writes trie updates. Returns the number of entries modified.
2298    fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2299        if trie_updates.is_empty() {
2300            return Ok(0)
2301        }
2302
2303        // Track the number of inserted entries.
2304        let mut num_entries = 0;
2305
2306        // Merge updated and removed nodes. Updated nodes must take precedence.
2307        let mut account_updates = trie_updates
2308            .removed_nodes_ref()
2309            .iter()
2310            .filter_map(|n| {
2311                (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2312            })
2313            .collect::<Vec<_>>();
2314        account_updates.extend(
2315            trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2316        );
2317        // Sort trie node updates.
2318        account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2319
2320        let tx = self.tx_ref();
2321        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2322        for (key, updated_node) in account_updates {
2323            let nibbles = StoredNibbles(key.clone());
2324            match updated_node {
2325                Some(node) => {
2326                    if !nibbles.0.is_empty() {
2327                        num_entries += 1;
2328                        account_trie_cursor.upsert(nibbles, node)?;
2329                    }
2330                }
2331                None => {
2332                    num_entries += 1;
2333                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2334                        account_trie_cursor.delete_current()?;
2335                    }
2336                }
2337            }
2338        }
2339
2340        num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2341
2342        Ok(num_entries)
2343    }
2344}
2345
2346impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2347    /// Writes storage trie updates from the given storage trie map. First sorts the storage trie
2348    /// updates by the hashed address, writing in sorted order.
2349    fn write_storage_trie_updates(
2350        &self,
2351        storage_tries: &B256Map<StorageTrieUpdates>,
2352    ) -> ProviderResult<usize> {
2353        let mut num_entries = 0;
2354        let mut storage_tries = Vec::from_iter(storage_tries);
2355        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2356        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2357        for (hashed_address, storage_trie_updates) in storage_tries {
2358            let mut db_storage_trie_cursor =
2359                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2360            num_entries +=
2361                db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2362            cursor = db_storage_trie_cursor.cursor;
2363        }
2364
2365        Ok(num_entries)
2366    }
2367
2368    fn write_individual_storage_trie_updates(
2369        &self,
2370        hashed_address: B256,
2371        updates: &StorageTrieUpdates,
2372    ) -> ProviderResult<usize> {
2373        if updates.is_empty() {
2374            return Ok(0)
2375        }
2376
2377        let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2378        let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2379        Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2380    }
2381}
2382
2383impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2384    fn unwind_account_hashing<'a>(
2385        &self,
2386        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2387    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2388        // Aggregate all block changesets and make a list of accounts that have been changed.
2389        // Note that collecting and then reversing the order is necessary to ensure that the
2390        // changes are applied in the correct order.
2391        let hashed_accounts = changesets
2392            .into_iter()
2393            .map(|(_, e)| (keccak256(e.address), e.info))
2394            .collect::<Vec<_>>()
2395            .into_iter()
2396            .rev()
2397            .collect::<BTreeMap<_, _>>();
2398
2399        // Apply values to HashedState, and remove the account if it's None.
2400        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2401        for (hashed_address, account) in &hashed_accounts {
2402            if let Some(account) = account {
2403                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2404            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2405                hashed_accounts_cursor.delete_current()?;
2406            }
2407        }
2408
2409        Ok(hashed_accounts)
2410    }
2411
2412    fn unwind_account_hashing_range(
2413        &self,
2414        range: impl RangeBounds<BlockNumber>,
2415    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2416        let changesets = self
2417            .tx
2418            .cursor_read::<tables::AccountChangeSets>()?
2419            .walk_range(range)?
2420            .collect::<Result<Vec<_>, _>>()?;
2421        self.unwind_account_hashing(changesets.iter())
2422    }
2423
2424    fn insert_account_for_hashing(
2425        &self,
2426        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2427    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2428        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2429        let hashed_accounts =
2430            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2431        for (hashed_address, account) in &hashed_accounts {
2432            if let Some(account) = account {
2433                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2434            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2435                hashed_accounts_cursor.delete_current()?;
2436            }
2437        }
2438        Ok(hashed_accounts)
2439    }
2440
2441    fn unwind_storage_hashing(
2442        &self,
2443        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2444    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2445        // Aggregate all block changesets and make list of accounts that have been changed.
2446        let mut hashed_storages = changesets
2447            .into_iter()
2448            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2449                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2450            })
2451            .collect::<Vec<_>>();
2452        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2453
2454        // Apply values to HashedState, and remove the account if it's None.
2455        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2456            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2457        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2458        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2459            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2460
2461            if hashed_storage
2462                .seek_by_key_subkey(hashed_address, key)?
2463                .filter(|entry| entry.key == key)
2464                .is_some()
2465            {
2466                hashed_storage.delete_current()?;
2467            }
2468
2469            if !value.is_zero() {
2470                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2471            }
2472        }
2473        Ok(hashed_storage_keys)
2474    }
2475
2476    fn unwind_storage_hashing_range(
2477        &self,
2478        range: impl RangeBounds<BlockNumberAddress>,
2479    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2480        let changesets = self
2481            .tx
2482            .cursor_read::<tables::StorageChangeSets>()?
2483            .walk_range(range)?
2484            .collect::<Result<Vec<_>, _>>()?;
2485        self.unwind_storage_hashing(changesets.into_iter())
2486    }
2487
2488    fn insert_storage_for_hashing(
2489        &self,
2490        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2491    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2492        // hash values
2493        let hashed_storages =
2494            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2495                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2496                    map.insert(keccak256(entry.key), entry.value);
2497                    map
2498                });
2499                map.insert(keccak256(address), storage);
2500                map
2501            });
2502
2503        let hashed_storage_keys = hashed_storages
2504            .iter()
2505            .map(|(hashed_address, entries)| (*hashed_address, entries.keys().copied().collect()))
2506            .collect();
2507
2508        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2509        // Hash the address and key and apply them to HashedStorage (if Storage is None
2510        // just remove it);
2511        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2512            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2513                if hashed_storage_cursor
2514                    .seek_by_key_subkey(hashed_address, key)?
2515                    .filter(|entry| entry.key == key)
2516                    .is_some()
2517                {
2518                    hashed_storage_cursor.delete_current()?;
2519                }
2520
2521                if !value.is_zero() {
2522                    hashed_storage_cursor.upsert(hashed_address, &StorageEntry { key, value })?;
2523                }
2524                Ok(())
2525            })
2526        })?;
2527
2528        Ok(hashed_storage_keys)
2529    }
2530
2531    fn insert_hashes(
2532        &self,
2533        range: RangeInclusive<BlockNumber>,
2534        end_block_hash: B256,
2535        expected_state_root: B256,
2536    ) -> ProviderResult<()> {
2537        // Initialize prefix sets.
2538        let mut account_prefix_set = PrefixSetMut::default();
2539        let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2540        let mut destroyed_accounts = HashSet::default();
2541
2542        let mut durations_recorder = metrics::DurationsRecorder::default();
2543
2544        // storage hashing stage
2545        {
2546            let lists = self.changed_storages_with_range(range.clone())?;
2547            let storages = self.plain_state_storages(lists)?;
2548            let storage_entries = self.insert_storage_for_hashing(storages)?;
2549            for (hashed_address, hashed_slots) in storage_entries {
2550                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2551                for slot in hashed_slots {
2552                    storage_prefix_sets
2553                        .entry(hashed_address)
2554                        .or_default()
2555                        .insert(Nibbles::unpack(slot));
2556                }
2557            }
2558        }
2559        durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2560
2561        // account hashing stage
2562        {
2563            let lists = self.changed_accounts_with_range(range.clone())?;
2564            let accounts = self.basic_accounts(lists)?;
2565            let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2566            for (hashed_address, account) in hashed_addresses {
2567                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2568                if account.is_none() {
2569                    destroyed_accounts.insert(hashed_address);
2570                }
2571            }
2572        }
2573        durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2574
2575        // merkle tree
2576        {
2577            // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
2578            // are pre-loaded.
2579            let prefix_sets = TriePrefixSets {
2580                account_prefix_set: account_prefix_set.freeze(),
2581                storage_prefix_sets: storage_prefix_sets
2582                    .into_iter()
2583                    .map(|(k, v)| (k, v.freeze()))
2584                    .collect(),
2585                destroyed_accounts,
2586            };
2587            let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2588                .with_prefix_sets(prefix_sets)
2589                .root_with_updates()
2590                .map_err(reth_db_api::DatabaseError::from)?;
2591            if state_root != expected_state_root {
2592                return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2593                    root: GotExpected { got: state_root, expected: expected_state_root },
2594                    block_number: *range.end(),
2595                    block_hash: end_block_hash,
2596                })))
2597            }
2598            self.write_trie_updates(&trie_updates)?;
2599        }
2600        durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2601
2602        debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2603
2604        Ok(())
2605    }
2606}
2607
2608impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2609    fn unwind_account_history_indices<'a>(
2610        &self,
2611        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2612    ) -> ProviderResult<usize> {
2613        let mut last_indices = changesets
2614            .into_iter()
2615            .map(|(index, account)| (account.address, *index))
2616            .collect::<Vec<_>>();
2617        last_indices.sort_by_key(|(a, _)| *a);
2618
2619        // Unwind the account history index.
2620        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2621        for &(address, rem_index) in &last_indices {
2622            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2623                &mut cursor,
2624                ShardedKey::last(address),
2625                rem_index,
2626                |sharded_key| sharded_key.key == address,
2627            )?;
2628
2629            // Check the last returned partial shard.
2630            // If it's not empty, the shard needs to be reinserted.
2631            if !partial_shard.is_empty() {
2632                cursor.insert(
2633                    ShardedKey::last(address),
2634                    &BlockNumberList::new_pre_sorted(partial_shard),
2635                )?;
2636            }
2637        }
2638
2639        let changesets = last_indices.len();
2640        Ok(changesets)
2641    }
2642
2643    fn unwind_account_history_indices_range(
2644        &self,
2645        range: impl RangeBounds<BlockNumber>,
2646    ) -> ProviderResult<usize> {
2647        let changesets = self
2648            .tx
2649            .cursor_read::<tables::AccountChangeSets>()?
2650            .walk_range(range)?
2651            .collect::<Result<Vec<_>, _>>()?;
2652        self.unwind_account_history_indices(changesets.iter())
2653    }
2654
2655    fn insert_account_history_index(
2656        &self,
2657        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2658    ) -> ProviderResult<()> {
2659        self.append_history_index::<_, tables::AccountsHistory>(
2660            account_transitions,
2661            ShardedKey::new,
2662        )
2663    }
2664
2665    fn unwind_storage_history_indices(
2666        &self,
2667        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2668    ) -> ProviderResult<usize> {
2669        let mut storage_changesets = changesets
2670            .into_iter()
2671            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2672            .collect::<Vec<_>>();
2673        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2674
2675        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2676        for &(address, storage_key, rem_index) in &storage_changesets {
2677            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2678                &mut cursor,
2679                StorageShardedKey::last(address, storage_key),
2680                rem_index,
2681                |storage_sharded_key| {
2682                    storage_sharded_key.address == address &&
2683                        storage_sharded_key.sharded_key.key == storage_key
2684                },
2685            )?;
2686
2687            // Check the last returned partial shard.
2688            // If it's not empty, the shard needs to be reinserted.
2689            if !partial_shard.is_empty() {
2690                cursor.insert(
2691                    StorageShardedKey::last(address, storage_key),
2692                    &BlockNumberList::new_pre_sorted(partial_shard),
2693                )?;
2694            }
2695        }
2696
2697        let changesets = storage_changesets.len();
2698        Ok(changesets)
2699    }
2700
2701    fn unwind_storage_history_indices_range(
2702        &self,
2703        range: impl RangeBounds<BlockNumberAddress>,
2704    ) -> ProviderResult<usize> {
2705        let changesets = self
2706            .tx
2707            .cursor_read::<tables::StorageChangeSets>()?
2708            .walk_range(range)?
2709            .collect::<Result<Vec<_>, _>>()?;
2710        self.unwind_storage_history_indices(changesets.into_iter())
2711    }
2712
2713    fn insert_storage_history_index(
2714        &self,
2715        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2716    ) -> ProviderResult<()> {
2717        self.append_history_index::<_, tables::StoragesHistory>(
2718            storage_transitions,
2719            |(address, storage_key), highest_block_number| {
2720                StorageShardedKey::new(address, storage_key, highest_block_number)
2721            },
2722        )
2723    }
2724
2725    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2726        // account history stage
2727        {
2728            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2729            self.insert_account_history_index(indices)?;
2730        }
2731
2732        // storage history stage
2733        {
2734            let indices = self.changed_storages_and_blocks_with_range(range)?;
2735            self.insert_storage_history_index(indices)?;
2736        }
2737
2738        Ok(())
2739    }
2740}
2741
2742impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2743    for DatabaseProvider<TX, N>
2744{
2745    fn take_block_and_execution_above(
2746        &self,
2747        block: BlockNumber,
2748        remove_from: StorageLocation,
2749    ) -> ProviderResult<Chain<Self::Primitives>> {
2750        let range = block + 1..=self.last_block_number()?;
2751
2752        self.unwind_trie_state_range(range.clone())?;
2753
2754        // get execution res
2755        let execution_state = self.take_state_above(block, remove_from)?;
2756
2757        let blocks = self.recovered_block_range(range)?;
2758
2759        // remove block bodies it is needed for both get block range and get block execution results
2760        // that is why it is deleted afterwards.
2761        self.remove_blocks_above(block, remove_from)?;
2762
2763        // Update pipeline progress
2764        self.update_pipeline_stages(block, true)?;
2765
2766        Ok(Chain::new(blocks, execution_state, None))
2767    }
2768
2769    fn remove_block_and_execution_above(
2770        &self,
2771        block: BlockNumber,
2772        remove_from: StorageLocation,
2773    ) -> ProviderResult<()> {
2774        let range = block + 1..=self.last_block_number()?;
2775
2776        self.unwind_trie_state_range(range)?;
2777
2778        // remove execution res
2779        self.remove_state_above(block, remove_from)?;
2780
2781        // remove block bodies it is needed for both get block range and get block execution results
2782        // that is why it is deleted afterwards.
2783        self.remove_blocks_above(block, remove_from)?;
2784
2785        // Update pipeline progress
2786        self.update_pipeline_stages(block, true)?;
2787
2788        Ok(())
2789    }
2790}
2791
2792impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2793    for DatabaseProvider<TX, N>
2794{
2795    type Block = BlockTy<N>;
2796    type Receipt = ReceiptTy<N>;
2797
2798    /// Inserts the block into the database, always modifying the following tables:
2799    /// * [`CanonicalHeaders`](tables::CanonicalHeaders)
2800    /// * [`Headers`](tables::Headers)
2801    /// * [`HeaderNumbers`](tables::HeaderNumbers)
2802    /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
2803    /// * [`BlockBodyIndices`](tables::BlockBodyIndices)
2804    ///
2805    /// If there are transactions in the block, the following tables will be modified:
2806    /// * [`Transactions`](tables::Transactions)
2807    /// * [`TransactionBlocks`](tables::TransactionBlocks)
2808    ///
2809    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2810    /// If withdrawals are not empty, this will modify
2811    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2812    ///
2813    /// If the provider has __not__ configured full sender pruning, this will modify
2814    /// [`TransactionSenders`](tables::TransactionSenders).
2815    ///
2816    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2817    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2818    fn insert_block(
2819        &self,
2820        block: RecoveredBlock<Self::Block>,
2821        write_to: StorageLocation,
2822    ) -> ProviderResult<StoredBlockBodyIndices> {
2823        let block_number = block.number();
2824
2825        let mut durations_recorder = metrics::DurationsRecorder::default();
2826
2827        // total difficulty
2828        let ttd = if block_number == 0 {
2829            block.header().difficulty()
2830        } else {
2831            let parent_block_number = block_number - 1;
2832            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2833            durations_recorder.record_relative(metrics::Action::GetParentTD);
2834            parent_ttd + block.header().difficulty()
2835        };
2836
2837        if write_to.database() {
2838            self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2839            durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2840
2841            // Put header with canonical hashes.
2842            self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2843            durations_recorder.record_relative(metrics::Action::InsertHeaders);
2844
2845            self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2846            durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2847        }
2848
2849        if write_to.static_files() {
2850            let mut writer =
2851                self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2852            writer.append_header(block.header(), ttd, &block.hash())?;
2853        }
2854
2855        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2856        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2857
2858        let mut next_tx_num = self
2859            .tx
2860            .cursor_read::<tables::TransactionBlocks>()?
2861            .last()?
2862            .map(|(n, _)| n + 1)
2863            .unwrap_or_default();
2864        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2865        let first_tx_num = next_tx_num;
2866
2867        let tx_count = block.body().transaction_count() as u64;
2868
2869        // Ensures we have all the senders for the block's transactions.
2870        for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2871            let hash = transaction.tx_hash();
2872
2873            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2874                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2875            }
2876
2877            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2878                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2879            }
2880            next_tx_num += 1;
2881        }
2882
2883        self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2884
2885        debug!(
2886            target: "providers::db",
2887            ?block_number,
2888            actions = ?durations_recorder.actions,
2889            "Inserted block"
2890        );
2891
2892        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2893    }
2894
2895    fn append_block_bodies(
2896        &self,
2897        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2898        write_to: StorageLocation,
2899    ) -> ProviderResult<()> {
2900        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2901
2902        // Initialize writer if we will be writing transactions to staticfiles
2903        let mut tx_static_writer = write_to
2904            .static_files()
2905            .then(|| {
2906                self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2907            })
2908            .transpose()?;
2909
2910        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2911        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2912
2913        // Initialize cursor if we will be writing transactions to database
2914        let mut tx_cursor = write_to
2915            .database()
2916            .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2917            .transpose()?;
2918
2919        // Get id for the next tx_num or zero if there are no transactions.
2920        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2921
2922        for (block_number, body) in &bodies {
2923            // Increment block on static file header.
2924            if let Some(writer) = tx_static_writer.as_mut() {
2925                writer.increment_block(*block_number)?;
2926            }
2927
2928            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2929            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2930
2931            let mut durations_recorder = metrics::DurationsRecorder::default();
2932
2933            // insert block meta
2934            block_indices_cursor.append(*block_number, &block_indices)?;
2935
2936            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2937
2938            let Some(body) = body else { continue };
2939
2940            // write transaction block index
2941            if !body.transactions().is_empty() {
2942                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2943                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2944            }
2945
2946            // write transactions
2947            for transaction in body.transactions() {
2948                if let Some(writer) = tx_static_writer.as_mut() {
2949                    writer.append_transaction(next_tx_num, transaction)?;
2950                }
2951                if let Some(cursor) = tx_cursor.as_mut() {
2952                    cursor.append(next_tx_num, transaction)?;
2953                }
2954
2955                // Increment transaction id for each transaction.
2956                next_tx_num += 1;
2957            }
2958
2959            debug!(
2960                target: "providers::db",
2961                ?block_number,
2962                actions = ?durations_recorder.actions,
2963                "Inserted block body"
2964            );
2965        }
2966
2967        self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2968
2969        Ok(())
2970    }
2971
2972    fn remove_blocks_above(
2973        &self,
2974        block: BlockNumber,
2975        remove_from: StorageLocation,
2976    ) -> ProviderResult<()> {
2977        for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2978            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2979        }
2980
2981        // Only prune canonical headers after we've removed the block hashes as we rely on data from
2982        // this table in `canonical_hashes_range`.
2983        self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2984        self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2985        self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2986
2987        // First transaction to be removed
2988        let unwind_tx_from = self
2989            .block_body_indices(block)?
2990            .map(|b| b.next_tx_num())
2991            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2992
2993        // Last transaction to be removed
2994        let unwind_tx_to = self
2995            .tx
2996            .cursor_read::<tables::BlockBodyIndices>()?
2997            .last()?
2998            // shouldn't happen because this was OK above
2999            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3000            .1
3001            .last_tx_num();
3002
3003        if unwind_tx_from <= unwind_tx_to {
3004            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3005                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3006            }
3007        }
3008
3009        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3010
3011        self.remove_bodies_above(block, remove_from)?;
3012
3013        Ok(())
3014    }
3015
3016    fn remove_bodies_above(
3017        &self,
3018        block: BlockNumber,
3019        remove_from: StorageLocation,
3020    ) -> ProviderResult<()> {
3021        self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3022
3023        // First transaction to be removed
3024        let unwind_tx_from = self
3025            .block_body_indices(block)?
3026            .map(|b| b.next_tx_num())
3027            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3028
3029        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3030        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3031
3032        if remove_from.database() {
3033            self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3034        }
3035
3036        if remove_from.static_files() {
3037            let static_file_tx_num = self
3038                .static_file_provider
3039                .get_highest_static_file_tx(StaticFileSegment::Transactions);
3040
3041            let to_delete = static_file_tx_num
3042                .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3043                .unwrap_or_default();
3044
3045            self.static_file_provider
3046                .latest_writer(StaticFileSegment::Transactions)?
3047                .prune_transactions(to_delete, block)?;
3048        }
3049
3050        Ok(())
3051    }
3052
3053    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3054    fn append_blocks_with_state(
3055        &self,
3056        blocks: Vec<RecoveredBlock<Self::Block>>,
3057        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3058        hashed_state: HashedPostStateSorted,
3059        trie_updates: TrieUpdates,
3060    ) -> ProviderResult<()> {
3061        if blocks.is_empty() {
3062            debug!(target: "providers::db", "Attempted to append empty block range");
3063            return Ok(())
3064        }
3065
3066        let first_number = blocks.first().unwrap().number();
3067
3068        let last = blocks.last().unwrap();
3069        let last_block_number = last.number();
3070
3071        let mut durations_recorder = metrics::DurationsRecorder::default();
3072
3073        // Insert the blocks
3074        for block in blocks {
3075            self.insert_block(block, StorageLocation::Database)?;
3076            durations_recorder.record_relative(metrics::Action::InsertBlock);
3077        }
3078
3079        self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3080        durations_recorder.record_relative(metrics::Action::InsertState);
3081
3082        // insert hashes and intermediate merkle nodes
3083        self.write_hashed_state(&hashed_state)?;
3084        self.write_trie_updates(&trie_updates)?;
3085        durations_recorder.record_relative(metrics::Action::InsertHashes);
3086
3087        self.update_history_indices(first_number..=last_block_number)?;
3088        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3089
3090        // Update pipeline progress
3091        self.update_pipeline_stages(last_block_number, false)?;
3092        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3093
3094        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3095
3096        Ok(())
3097    }
3098}
3099
3100impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3101    fn get_prune_checkpoint(
3102        &self,
3103        segment: PruneSegment,
3104    ) -> ProviderResult<Option<PruneCheckpoint>> {
3105        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3106    }
3107
3108    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3109        Ok(self
3110            .tx
3111            .cursor_read::<tables::PruneCheckpoints>()?
3112            .walk(None)?
3113            .collect::<Result<_, _>>()?)
3114    }
3115}
3116
3117impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3118    fn save_prune_checkpoint(
3119        &self,
3120        segment: PruneSegment,
3121        checkpoint: PruneCheckpoint,
3122    ) -> ProviderResult<()> {
3123        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3124    }
3125}
3126
3127impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3128    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3129        let db_entries = self.tx.entries::<T>()?;
3130        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3131            Ok(entries) => entries,
3132            Err(ProviderError::UnsupportedProvider) => 0,
3133            Err(err) => return Err(err),
3134        };
3135
3136        Ok(db_entries + static_file_entries)
3137    }
3138}
3139
3140impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3141    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3142        let mut finalized_blocks = self
3143            .tx
3144            .cursor_read::<tables::ChainState>()?
3145            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3146            .take(1)
3147            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3148
3149        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3150        Ok(last_finalized_block_number)
3151    }
3152
3153    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3154        let mut finalized_blocks = self
3155            .tx
3156            .cursor_read::<tables::ChainState>()?
3157            .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3158            .take(1)
3159            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3160
3161        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3162        Ok(last_finalized_block_number)
3163    }
3164}
3165
3166impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3167    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3168        Ok(self
3169            .tx
3170            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3171    }
3172
3173    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3174        Ok(self
3175            .tx
3176            .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3177    }
3178}
3179
3180impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3181    type Tx = TX;
3182
3183    fn tx_ref(&self) -> &Self::Tx {
3184        &self.tx
3185    }
3186
3187    fn tx_mut(&mut self) -> &mut Self::Tx {
3188        &mut self.tx
3189    }
3190
3191    fn into_tx(self) -> Self::Tx {
3192        self.tx
3193    }
3194
3195    fn prune_modes_ref(&self) -> &PruneModes {
3196        self.prune_modes_ref()
3197    }
3198}