reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4    BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7    TransactionsProvider, TrieReader,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11    eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12    HashOrNumber,
13};
14use alloy_primitives::{
15    map::{hash_map, HashMap},
16    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
24use reth_prune_types::{PruneCheckpoint, PruneSegment};
25use reth_stages_types::{StageCheckpoint, StageId};
26use reth_storage_api::{
27    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
28    StorageChangeSetReader, TryIntoHistoricalStateProvider,
29};
30use reth_storage_errors::provider::ProviderResult;
31use reth_trie::updates::TrieUpdatesSorted;
32use revm_database::states::PlainStorageRevert;
33use std::{
34    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
35    sync::Arc,
36};
37use tracing::trace;
38
39/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
40/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
41/// this provider.
42///
43/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
44/// time-out.
45#[derive(Debug)]
46#[doc(hidden)] // triggers ICE for `cargo docs`
47pub struct ConsistentProvider<N: ProviderNodeTypes> {
48    /// Storage provider.
49    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
50    /// Head block at time of [`Self`] creation
51    head_block: Option<Arc<BlockState<N::Primitives>>>,
52    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
53    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
54}
55
56impl<N: ProviderNodeTypes> ConsistentProvider<N> {
57    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
58    ///
59    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
60    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
61    /// view of memory and database.
62    pub fn new(
63        storage_provider_factory: ProviderFactory<N>,
64        state: CanonicalInMemoryState<N::Primitives>,
65    ) -> ProviderResult<Self> {
66        // Each one provides a snapshot at the time of instantiation, but its order matters.
67        //
68        // If we acquire first the database provider, it's possible that before the in-memory chain
69        // snapshot is instantiated, it will flush blocks to disk. This would
70        // mean that our database provider would not have access to the flushed blocks (since it's
71        // working under an older view), while the in-memory state may have deleted them
72        // entirely. Resulting in gaps on the range.
73        let head_block = state.head_state();
74        let storage_provider = storage_provider_factory.database_provider_ro()?;
75        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
76    }
77
78    // Helper function to convert range bounds
79    fn convert_range_bounds<T>(
80        &self,
81        range: impl RangeBounds<T>,
82        end_unbounded: impl FnOnce() -> T,
83    ) -> (T, T)
84    where
85        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
86    {
87        let start = match range.start_bound() {
88            Bound::Included(&n) => n,
89            Bound::Excluded(&n) => n + T::from(1u8),
90            Bound::Unbounded => T::from(0u8),
91        };
92
93        let end = match range.end_bound() {
94            Bound::Included(&n) => n,
95            Bound::Excluded(&n) => n - T::from(1u8),
96            Bound::Unbounded => end_unbounded(),
97        };
98
99        (start, end)
100    }
101
102    /// Storage provider for latest block
103    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
104        trace!(target: "providers::blockchain", "Getting latest block state provider");
105
106        // use latest state provider if the head state exists
107        if let Some(state) = &self.head_block {
108            trace!(target: "providers::blockchain", "Using head state for latest state provider");
109            Ok(self.block_state_provider_ref(state)?.boxed())
110        } else {
111            trace!(target: "providers::blockchain", "Using database state for latest state provider");
112            Ok(self.storage_provider.latest())
113        }
114    }
115
116    fn history_by_block_hash_ref<'a>(
117        &'a self,
118        block_hash: BlockHash,
119    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
120        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
121
122        self.get_in_memory_or_storage_by_block(
123            block_hash.into(),
124            |_| self.storage_provider.history_by_block_hash(block_hash),
125            |block_state| {
126                let state_provider = self.block_state_provider_ref(block_state)?;
127                Ok(Box::new(state_provider))
128            },
129        )
130    }
131
132    /// Returns a state provider indexed by the given block number or tag.
133    fn state_by_block_number_ref<'a>(
134        &'a self,
135        number: BlockNumber,
136    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
137        let hash =
138            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
139        self.history_by_block_hash_ref(hash)
140    }
141
142    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
143    ///
144    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
145    pub fn get_state(
146        &self,
147        range: RangeInclusive<BlockNumber>,
148    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
149        if range.is_empty() {
150            return Ok(None)
151        }
152        let start_block_number = *range.start();
153        let end_block_number = *range.end();
154
155        // We are not removing block meta as it is used to get block changesets.
156        let mut block_bodies = Vec::new();
157        for block_num in range.clone() {
158            let block_body = self
159                .block_body_indices(block_num)?
160                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
161            block_bodies.push((block_num, block_body))
162        }
163
164        // get transaction receipts
165        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
166        else {
167            return Ok(None)
168        };
169        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
170            return Ok(None)
171        };
172
173        let mut account_changeset = Vec::new();
174        for block_num in range.clone() {
175            let changeset =
176                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
177            account_changeset.extend(changeset);
178        }
179
180        let mut storage_changeset = Vec::new();
181        for block_num in range {
182            let changeset = self.storage_changeset(block_num)?;
183            storage_changeset.extend(changeset);
184        }
185
186        let (state, reverts) =
187            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
188
189        let mut receipt_iter =
190            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
191
192        let mut receipts = Vec::with_capacity(block_bodies.len());
193        // loop break if we are at the end of the blocks.
194        for (_, block_body) in block_bodies {
195            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
196            for tx_num in block_body.tx_num_range() {
197                let receipt = receipt_iter
198                    .next()
199                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
200                block_receipts.push(receipt);
201            }
202            receipts.push(block_receipts);
203        }
204
205        Ok(Some(ExecutionOutcome::new_init(
206            state,
207            reverts,
208            // We skip new contracts since we never delete them from the database
209            Vec::new(),
210            receipts,
211            start_block_number,
212            Vec::new(),
213        )))
214    }
215
216    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
217    /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
218    /// storage and account changesets.
219    fn populate_bundle_state(
220        &self,
221        account_changeset: Vec<(u64, AccountBeforeTx)>,
222        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
223        block_range_end: BlockNumber,
224    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
225        let mut state: BundleStateInit = HashMap::default();
226        let mut reverts: RevertsInit = HashMap::default();
227        let state_provider = self.state_by_block_number_ref(block_range_end)?;
228
229        // add account changeset changes
230        for (block_number, account_before) in account_changeset.into_iter().rev() {
231            let AccountBeforeTx { info: old_info, address } = account_before;
232            match state.entry(address) {
233                hash_map::Entry::Vacant(entry) => {
234                    let new_info = state_provider.basic_account(&address)?;
235                    entry.insert((old_info, new_info, HashMap::default()));
236                }
237                hash_map::Entry::Occupied(mut entry) => {
238                    // overwrite old account state.
239                    entry.get_mut().0 = old_info;
240                }
241            }
242            // insert old info into reverts.
243            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
244        }
245
246        // add storage changeset changes
247        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
248            let BlockNumberAddress((block_number, address)) = block_and_address;
249            // get account state or insert from plain state.
250            let account_state = match state.entry(address) {
251                hash_map::Entry::Vacant(entry) => {
252                    let present_info = state_provider.basic_account(&address)?;
253                    entry.insert((present_info, present_info, HashMap::default()))
254                }
255                hash_map::Entry::Occupied(entry) => entry.into_mut(),
256            };
257
258            // match storage.
259            match account_state.2.entry(old_storage.key) {
260                hash_map::Entry::Vacant(entry) => {
261                    let new_storage_value =
262                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
263                    entry.insert((old_storage.value, new_storage_value));
264                }
265                hash_map::Entry::Occupied(mut entry) => {
266                    entry.get_mut().0 = old_storage.value;
267                }
268            };
269
270            reverts
271                .entry(block_number)
272                .or_default()
273                .entry(address)
274                .or_default()
275                .1
276                .push(old_storage);
277        }
278
279        Ok((state, reverts))
280    }
281
282    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
283    /// is met.
284    ///
285    /// Creates a snapshot of the in-memory chain state and database provider to prevent
286    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
287    /// recent in-memory blocks in case of overlaps.
288    ///
289    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
290    ///   user to retrieve the required items from the database using [`RangeInclusive`].
291    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
292    ///   state, allowing for selection or filtering for the desired data.
293    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
294        &self,
295        range: impl RangeBounds<BlockNumber>,
296        fetch_db_range: F,
297        map_block_state_item: G,
298        mut predicate: P,
299    ) -> ProviderResult<Vec<T>>
300    where
301        F: FnOnce(
302            &DatabaseProviderRO<N::DB, N>,
303            RangeInclusive<BlockNumber>,
304            &mut P,
305        ) -> ProviderResult<Vec<T>>,
306        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
307        P: FnMut(&T) -> bool,
308    {
309        // Each one provides a snapshot at the time of instantiation, but its order matters.
310        //
311        // If we acquire first the database provider, it's possible that before the in-memory chain
312        // snapshot is instantiated, it will flush blocks to disk. This would
313        // mean that our database provider would not have access to the flushed blocks (since it's
314        // working under an older view), while the in-memory state may have deleted them
315        // entirely. Resulting in gaps on the range.
316        let mut in_memory_chain =
317            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
318        let db_provider = &self.storage_provider;
319
320        let (start, end) = self.convert_range_bounds(range, || {
321            // the first block is the highest one.
322            in_memory_chain
323                .first()
324                .map(|b| b.number())
325                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
326        });
327
328        if start > end {
329            return Ok(vec![])
330        }
331
332        // Split range into storage_range and in-memory range. If the in-memory range is not
333        // necessary drop it early.
334        //
335        // The last block of `in_memory_chain` is the lowest block number.
336        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
337            Some(lowest_memory_block) if lowest_memory_block <= end => {
338                let highest_memory_block =
339                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
340
341                // Database will for a time overlap with in-memory-chain blocks. In
342                // case of a re-org, it can mean that the database blocks are of a forked chain, and
343                // so, we should prioritize the in-memory overlapped blocks.
344                let in_memory_range =
345                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
346
347                // If requested range is in the middle of the in-memory range, remove the necessary
348                // lowest blocks
349                in_memory_chain.truncate(
350                    in_memory_chain
351                        .len()
352                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
353                );
354
355                let storage_range =
356                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
357
358                (Some((in_memory_chain, in_memory_range)), storage_range)
359            }
360            _ => {
361                // Drop the in-memory chain so we don't hold blocks in memory.
362                drop(in_memory_chain);
363
364                (None, Some(start..=end))
365            }
366        };
367
368        let mut items = Vec::with_capacity((end - start + 1) as usize);
369
370        if let Some(storage_range) = storage_range {
371            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
372            items.append(&mut db_items);
373
374            // The predicate was not met, if the number of items differs from the expected. So, we
375            // return what we have.
376            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
377                return Ok(items)
378            }
379        }
380
381        if let Some((in_memory_chain, in_memory_range)) = in_memory {
382            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
383                debug_assert!(num == block.number());
384                if let Some(item) = map_block_state_item(block, &mut predicate) {
385                    items.push(item);
386                } else {
387                    break
388                }
389            }
390        }
391
392        Ok(items)
393    }
394
395    /// This uses a given [`BlockState`] to initialize a state provider for that block.
396    fn block_state_provider_ref(
397        &self,
398        state: &BlockState<N::Primitives>,
399    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
400        let anchor_hash = state.anchor().hash;
401        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
402        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
403        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
404    }
405
406    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
407    ///
408    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
409    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
410    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
411    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
412        &self,
413        range: impl RangeBounds<BlockNumber>,
414        fetch_from_db: S,
415        fetch_from_block_state: M,
416    ) -> ProviderResult<Vec<R>>
417    where
418        S: FnOnce(
419            &DatabaseProviderRO<N::DB, N>,
420            RangeInclusive<TxNumber>,
421        ) -> ProviderResult<Vec<R>>,
422        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
423    {
424        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
425        let provider = &self.storage_provider;
426
427        // Get the last block number stored in the storage which does NOT overlap with in-memory
428        // chain.
429        let last_database_block_number = in_mem_chain
430            .last()
431            .map(|b| Ok(b.anchor().number))
432            .unwrap_or_else(|| provider.last_block_number())?;
433
434        // Get the next tx number for the last block stored in the storage, which marks the start of
435        // the in-memory state.
436        let last_block_body_index = provider
437            .block_body_indices(last_database_block_number)?
438            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
439        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
440
441        let (start, end) = self.convert_range_bounds(range, || {
442            in_mem_chain
443                .iter()
444                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
445                .sum::<u64>() +
446                last_block_body_index.last_tx_num()
447        });
448
449        if start > end {
450            return Ok(vec![])
451        }
452
453        let mut tx_range = start..=end;
454
455        // If the range is entirely before the first in-memory transaction number, fetch from
456        // storage
457        if *tx_range.end() < in_memory_tx_num {
458            return fetch_from_db(provider, tx_range);
459        }
460
461        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
462
463        // If the range spans storage and memory, get elements from storage first.
464        if *tx_range.start() < in_memory_tx_num {
465            // Determine the range that needs to be fetched from storage.
466            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
467
468            // Set the remaining transaction range for in-memory
469            tx_range = in_memory_tx_num..=*tx_range.end();
470
471            items.extend(fetch_from_db(provider, db_range)?);
472        }
473
474        // Iterate from the lowest block to the highest in-memory chain
475        for block_state in in_mem_chain.iter().rev() {
476            let block_tx_count =
477                block_state.block_ref().recovered_block().body().transactions().len();
478            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
479
480            // If the transaction range start is equal or higher than the next block first
481            // transaction, advance
482            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
483                in_memory_tx_num += block_tx_count as u64;
484                continue
485            }
486
487            // This should only be more than 0 once, in case of a partial range inside a block.
488            let skip = (tx_range.start() - in_memory_tx_num) as usize;
489
490            items.extend(fetch_from_block_state(
491                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
492                block_state,
493            )?);
494
495            in_memory_tx_num += block_tx_count as u64;
496
497            // Break if the range has been fully processed
498            if in_memory_tx_num > *tx_range.end() {
499                break
500            }
501
502            // Set updated range
503            tx_range = in_memory_tx_num..=*tx_range.end();
504        }
505
506        Ok(items)
507    }
508
509    /// Fetches data from either in-memory state or persistent storage by transaction
510    /// [`HashOrNumber`].
511    fn get_in_memory_or_storage_by_tx<S, M, R>(
512        &self,
513        id: HashOrNumber,
514        fetch_from_db: S,
515        fetch_from_block_state: M,
516    ) -> ProviderResult<Option<R>>
517    where
518        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
519        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
520    {
521        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
522        let provider = &self.storage_provider;
523
524        // Get the last block number stored in the database which does NOT overlap with in-memory
525        // chain.
526        let last_database_block_number = in_mem_chain
527            .last()
528            .map(|b| Ok(b.anchor().number))
529            .unwrap_or_else(|| provider.last_block_number())?;
530
531        // Get the next tx number for the last block stored in the database and consider it the
532        // first tx number of the in-memory state
533        let last_block_body_index = provider
534            .block_body_indices(last_database_block_number)?
535            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
536        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
537
538        // If the transaction number is less than the first in-memory transaction number, make a
539        // database lookup
540        if let HashOrNumber::Number(id) = id &&
541            id < in_memory_tx_num
542        {
543            return fetch_from_db(provider)
544        }
545
546        // Iterate from the lowest block to the highest
547        for block_state in in_mem_chain.iter().rev() {
548            let executed_block = block_state.block_ref();
549            let block = executed_block.recovered_block();
550
551            for tx_index in 0..block.body().transactions().len() {
552                match id {
553                    HashOrNumber::Hash(tx_hash) => {
554                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
555                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
556                        }
557                    }
558                    HashOrNumber::Number(id) => {
559                        if id == in_memory_tx_num {
560                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
561                        }
562                    }
563                }
564
565                in_memory_tx_num += 1;
566            }
567        }
568
569        // Not found in-memory, so check database.
570        if let HashOrNumber::Hash(_) = id {
571            return fetch_from_db(provider)
572        }
573
574        Ok(None)
575    }
576
577    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
578    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
579        &self,
580        id: BlockHashOrNumber,
581        fetch_from_db: S,
582        fetch_from_block_state: M,
583    ) -> ProviderResult<R>
584    where
585        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
586        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
587    {
588        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
589            return fetch_from_block_state(block_state)
590        }
591        fetch_from_db(&self.storage_provider)
592    }
593
594    /// Consumes the provider and returns a state provider for the specific block hash.
595    pub(crate) fn into_state_provider_at_block_hash(
596        self,
597        block_hash: BlockHash,
598    ) -> ProviderResult<Box<dyn StateProvider>> {
599        let Self { storage_provider, head_block, .. } = self;
600        let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
601            let block_number = storage_provider
602                .block_number(block_hash)?
603                .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
604            storage_provider.try_into_history_at_block(block_number)
605        };
606        if let Some(Some(block_state)) =
607            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
608        {
609            let anchor_hash = block_state.anchor().hash;
610            let latest_historical = into_history_at_block_hash(anchor_hash)?;
611            return Ok(Box::new(block_state.state_provider(latest_historical)));
612        }
613        into_history_at_block_hash(block_hash)
614    }
615}
616
617impl<N: ProviderNodeTypes> ConsistentProvider<N> {
618    /// Ensures that the given block number is canonical (synced)
619    ///
620    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
621    /// out of range and would lead to invalid results, mainly during initial sync.
622    ///
623    /// Verifying the `block_number` would be expensive since we need to lookup sync table
624    /// Instead, we ensure that the `block_number` is within the range of the
625    /// [`Self::best_block_number`] which is updated when a block is synced.
626    #[inline]
627    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
628        let latest = self.best_block_number()?;
629        if block_number > latest {
630            Err(ProviderError::HeaderNotFound(block_number.into()))
631        } else {
632            Ok(())
633        }
634    }
635}
636
637impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
638    type Primitives = N::Primitives;
639}
640
641impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
642    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
643        self.storage_provider.static_file_provider()
644    }
645}
646
647impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
648    type Header = HeaderTy<N>;
649
650    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
651        self.get_in_memory_or_storage_by_block(
652            block_hash.into(),
653            |db_provider| db_provider.header(block_hash),
654            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
655        )
656    }
657
658    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
659        self.get_in_memory_or_storage_by_block(
660            num.into(),
661            |db_provider| db_provider.header_by_number(num),
662            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
663        )
664    }
665
666    fn headers_range(
667        &self,
668        range: impl RangeBounds<BlockNumber>,
669    ) -> ProviderResult<Vec<Self::Header>> {
670        self.get_in_memory_or_storage_by_block_range_while(
671            range,
672            |db_provider, range, _| db_provider.headers_range(range),
673            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
674            |_| true,
675        )
676    }
677
678    fn sealed_header(
679        &self,
680        number: BlockNumber,
681    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
682        self.get_in_memory_or_storage_by_block(
683            number.into(),
684            |db_provider| db_provider.sealed_header(number),
685            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
686        )
687    }
688
689    fn sealed_headers_range(
690        &self,
691        range: impl RangeBounds<BlockNumber>,
692    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
693        self.get_in_memory_or_storage_by_block_range_while(
694            range,
695            |db_provider, range, _| db_provider.sealed_headers_range(range),
696            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
697            |_| true,
698        )
699    }
700
701    fn sealed_headers_while(
702        &self,
703        range: impl RangeBounds<BlockNumber>,
704        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
705    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
706        self.get_in_memory_or_storage_by_block_range_while(
707            range,
708            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
709            |block_state, predicate| {
710                let header = block_state.block_ref().recovered_block().sealed_header();
711                predicate(header).then(|| header.clone())
712            },
713            predicate,
714        )
715    }
716}
717
718impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
719    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
720        self.get_in_memory_or_storage_by_block(
721            number.into(),
722            |db_provider| db_provider.block_hash(number),
723            |block_state| Ok(Some(block_state.hash())),
724        )
725    }
726
727    fn canonical_hashes_range(
728        &self,
729        start: BlockNumber,
730        end: BlockNumber,
731    ) -> ProviderResult<Vec<B256>> {
732        self.get_in_memory_or_storage_by_block_range_while(
733            start..end,
734            |db_provider, inclusive_range, _| {
735                db_provider
736                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
737            },
738            |block_state, _| Some(block_state.hash()),
739            |_| true,
740        )
741    }
742}
743
744impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
745    fn chain_info(&self) -> ProviderResult<ChainInfo> {
746        let best_number = self.best_block_number()?;
747        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
748    }
749
750    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
751        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
752    }
753
754    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
755        self.storage_provider.last_block_number()
756    }
757
758    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
759        self.get_in_memory_or_storage_by_block(
760            hash.into(),
761            |db_provider| db_provider.block_number(hash),
762            |block_state| Ok(Some(block_state.number())),
763        )
764    }
765}
766
767impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
768    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
769        Ok(self.canonical_in_memory_state.pending_block_num_hash())
770    }
771
772    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
773        Ok(self.canonical_in_memory_state.get_safe_num_hash())
774    }
775
776    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
777        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
778    }
779}
780
781impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
782    type Block = BlockTy<N>;
783
784    fn find_block_by_hash(
785        &self,
786        hash: B256,
787        source: BlockSource,
788    ) -> ProviderResult<Option<Self::Block>> {
789        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
790            let Some(block) = self.get_in_memory_or_storage_by_block(
791                hash.into(),
792                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
793                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
794            )?
795        {
796            return Ok(Some(block))
797        }
798
799        if matches!(source, BlockSource::Pending | BlockSource::Any) {
800            return Ok(self
801                .canonical_in_memory_state
802                .pending_block()
803                .filter(|b| b.hash() == hash)
804                .map(|b| b.into_block()))
805        }
806
807        Ok(None)
808    }
809
810    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
811        self.get_in_memory_or_storage_by_block(
812            id,
813            |db_provider| db_provider.block(id),
814            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
815        )
816    }
817
818    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
819        Ok(self.canonical_in_memory_state.pending_recovered_block())
820    }
821
822    fn pending_block_and_receipts(
823        &self,
824    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
825        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
826    }
827
828    /// Returns the block with senders with matching number or hash from database.
829    ///
830    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
831    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
832    ///
833    /// Returns `None` if block is not found.
834    fn recovered_block(
835        &self,
836        id: BlockHashOrNumber,
837        transaction_kind: TransactionVariant,
838    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
839        self.get_in_memory_or_storage_by_block(
840            id,
841            |db_provider| db_provider.recovered_block(id, transaction_kind),
842            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
843        )
844    }
845
846    fn sealed_block_with_senders(
847        &self,
848        id: BlockHashOrNumber,
849        transaction_kind: TransactionVariant,
850    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
851        self.get_in_memory_or_storage_by_block(
852            id,
853            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
854            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
855        )
856    }
857
858    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
859        self.get_in_memory_or_storage_by_block_range_while(
860            range,
861            |db_provider, range, _| db_provider.block_range(range),
862            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
863            |_| true,
864        )
865    }
866
867    fn block_with_senders_range(
868        &self,
869        range: RangeInclusive<BlockNumber>,
870    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
871        self.get_in_memory_or_storage_by_block_range_while(
872            range,
873            |db_provider, range, _| db_provider.block_with_senders_range(range),
874            |block_state, _| Some(block_state.block().recovered_block().clone()),
875            |_| true,
876        )
877    }
878
879    fn recovered_block_range(
880        &self,
881        range: RangeInclusive<BlockNumber>,
882    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
883        self.get_in_memory_or_storage_by_block_range_while(
884            range,
885            |db_provider, range, _| db_provider.recovered_block_range(range),
886            |block_state, _| Some(block_state.block().recovered_block().clone()),
887            |_| true,
888        )
889    }
890
891    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
892        self.get_in_memory_or_storage_by_tx(
893            id.into(),
894            |db_provider| db_provider.block_by_transaction_id(id),
895            |_, _, block_state| Ok(Some(block_state.number())),
896        )
897    }
898}
899
900impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
901    type Transaction = TxTy<N>;
902
903    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
904        self.get_in_memory_or_storage_by_tx(
905            tx_hash.into(),
906            |db_provider| db_provider.transaction_id(tx_hash),
907            |_, tx_number, _| Ok(Some(tx_number)),
908        )
909    }
910
911    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
912        self.get_in_memory_or_storage_by_tx(
913            id.into(),
914            |provider| provider.transaction_by_id(id),
915            |tx_index, _, block_state| {
916                Ok(block_state
917                    .block_ref()
918                    .recovered_block()
919                    .body()
920                    .transactions()
921                    .get(tx_index)
922                    .cloned())
923            },
924        )
925    }
926
927    fn transaction_by_id_unhashed(
928        &self,
929        id: TxNumber,
930    ) -> ProviderResult<Option<Self::Transaction>> {
931        self.get_in_memory_or_storage_by_tx(
932            id.into(),
933            |provider| provider.transaction_by_id_unhashed(id),
934            |tx_index, _, block_state| {
935                Ok(block_state
936                    .block_ref()
937                    .recovered_block()
938                    .body()
939                    .transactions()
940                    .get(tx_index)
941                    .cloned())
942            },
943        )
944    }
945
946    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
947        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
948            return Ok(Some(tx))
949        }
950
951        self.storage_provider.transaction_by_hash(hash)
952    }
953
954    fn transaction_by_hash_with_meta(
955        &self,
956        tx_hash: TxHash,
957    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
958        if let Some((tx, meta)) =
959            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
960        {
961            return Ok(Some((tx, meta)))
962        }
963
964        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
965    }
966
967    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
968        self.get_in_memory_or_storage_by_tx(
969            id.into(),
970            |provider| provider.transaction_block(id),
971            |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
972        )
973    }
974
975    fn transactions_by_block(
976        &self,
977        id: BlockHashOrNumber,
978    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
979        self.get_in_memory_or_storage_by_block(
980            id,
981            |provider| provider.transactions_by_block(id),
982            |block_state| {
983                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
984            },
985        )
986    }
987
988    fn transactions_by_block_range(
989        &self,
990        range: impl RangeBounds<BlockNumber>,
991    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
992        self.get_in_memory_or_storage_by_block_range_while(
993            range,
994            |db_provider, range, _| db_provider.transactions_by_block_range(range),
995            |block_state, _| {
996                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
997            },
998            |_| true,
999        )
1000    }
1001
1002    fn transactions_by_tx_range(
1003        &self,
1004        range: impl RangeBounds<TxNumber>,
1005    ) -> ProviderResult<Vec<Self::Transaction>> {
1006        self.get_in_memory_or_storage_by_tx_range(
1007            range,
1008            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1009            |index_range, block_state| {
1010                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1011                    .to_vec())
1012            },
1013        )
1014    }
1015
1016    fn senders_by_tx_range(
1017        &self,
1018        range: impl RangeBounds<TxNumber>,
1019    ) -> ProviderResult<Vec<Address>> {
1020        self.get_in_memory_or_storage_by_tx_range(
1021            range,
1022            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1023            |index_range, block_state| {
1024                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1025            },
1026        )
1027    }
1028
1029    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1030        self.get_in_memory_or_storage_by_tx(
1031            id.into(),
1032            |provider| provider.transaction_sender(id),
1033            |tx_index, _, block_state| {
1034                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1035            },
1036        )
1037    }
1038}
1039
1040impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1041    type Receipt = ReceiptTy<N>;
1042
1043    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1044        self.get_in_memory_or_storage_by_tx(
1045            id.into(),
1046            |provider| provider.receipt(id),
1047            |tx_index, _, block_state| {
1048                Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1049            },
1050        )
1051    }
1052
1053    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1054        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1055            let executed_block = block_state.block_ref();
1056            let block = executed_block.recovered_block();
1057            let receipts = block_state.executed_block_receipts();
1058
1059            // assuming 1:1 correspondence between transactions and receipts
1060            debug_assert_eq!(
1061                block.body().transactions().len(),
1062                receipts.len(),
1063                "Mismatch between transaction and receipt count"
1064            );
1065
1066            if let Some(tx_index) =
1067                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1068            {
1069                // safe to use tx_index for receipts due to 1:1 correspondence
1070                return Ok(receipts.get(tx_index).cloned());
1071            }
1072        }
1073
1074        self.storage_provider.receipt_by_hash(hash)
1075    }
1076
1077    fn receipts_by_block(
1078        &self,
1079        block: BlockHashOrNumber,
1080    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1081        self.get_in_memory_or_storage_by_block(
1082            block,
1083            |db_provider| db_provider.receipts_by_block(block),
1084            |block_state| Ok(Some(block_state.executed_block_receipts())),
1085        )
1086    }
1087
1088    fn receipts_by_tx_range(
1089        &self,
1090        range: impl RangeBounds<TxNumber>,
1091    ) -> ProviderResult<Vec<Self::Receipt>> {
1092        self.get_in_memory_or_storage_by_tx_range(
1093            range,
1094            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1095            |index_range, block_state| {
1096                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1097            },
1098        )
1099    }
1100
1101    fn receipts_by_block_range(
1102        &self,
1103        block_range: RangeInclusive<BlockNumber>,
1104    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1105        self.storage_provider.receipts_by_block_range(block_range)
1106    }
1107}
1108
1109impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1110    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1111        match block {
1112            BlockId::Hash(rpc_block_hash) => {
1113                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1114                if receipts.is_none() &&
1115                    !rpc_block_hash.require_canonical.unwrap_or(false) &&
1116                    let Some(state) = self
1117                        .head_block
1118                        .as_ref()
1119                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1120                {
1121                    receipts = Some(state.executed_block_receipts());
1122                }
1123                Ok(receipts)
1124            }
1125            BlockId::Number(num_tag) => match num_tag {
1126                BlockNumberOrTag::Pending => Ok(self
1127                    .canonical_in_memory_state
1128                    .pending_state()
1129                    .map(|block_state| block_state.executed_block_receipts())),
1130                _ => {
1131                    if let Some(num) = self.convert_block_number(num_tag)? {
1132                        self.receipts_by_block(num.into())
1133                    } else {
1134                        Ok(None)
1135                    }
1136                }
1137            },
1138        }
1139    }
1140}
1141
1142impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1143    fn block_body_indices(
1144        &self,
1145        number: BlockNumber,
1146    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1147        self.get_in_memory_or_storage_by_block(
1148            number.into(),
1149            |db_provider| db_provider.block_body_indices(number),
1150            |block_state| {
1151                // Find the last block indices on database
1152                let last_storage_block_number = block_state.anchor().number;
1153                let mut stored_indices = self
1154                    .storage_provider
1155                    .block_body_indices(last_storage_block_number)?
1156                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1157
1158                // Prepare our block indices
1159                stored_indices.first_tx_num = stored_indices.next_tx_num();
1160                stored_indices.tx_count = 0;
1161
1162                // Iterate from the lowest block in memory until our target block
1163                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1164                    let block_tx_count =
1165                        state.block_ref().recovered_block().body().transactions().len() as u64;
1166                    if state.block_ref().recovered_block().number() == number {
1167                        stored_indices.tx_count = block_tx_count;
1168                    } else {
1169                        stored_indices.first_tx_num += block_tx_count;
1170                    }
1171                }
1172
1173                Ok(Some(stored_indices))
1174            },
1175        )
1176    }
1177
1178    fn block_body_indices_range(
1179        &self,
1180        range: RangeInclusive<BlockNumber>,
1181    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1182        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1183    }
1184}
1185
1186impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1187    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1188        self.storage_provider.get_stage_checkpoint(id)
1189    }
1190
1191    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1192        self.storage_provider.get_stage_checkpoint_progress(id)
1193    }
1194
1195    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1196        self.storage_provider.get_all_checkpoints()
1197    }
1198}
1199
1200impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1201    fn get_prune_checkpoint(
1202        &self,
1203        segment: PruneSegment,
1204    ) -> ProviderResult<Option<PruneCheckpoint>> {
1205        self.storage_provider.get_prune_checkpoint(segment)
1206    }
1207
1208    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1209        self.storage_provider.get_prune_checkpoints()
1210    }
1211}
1212
1213impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1214    type ChainSpec = N::ChainSpec;
1215
1216    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1217        ChainSpecProvider::chain_spec(&self.storage_provider)
1218    }
1219}
1220
1221impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1222    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1223        match id {
1224            BlockId::Number(num) => self.block_by_number_or_tag(num),
1225            BlockId::Hash(hash) => {
1226                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1227                // so not at the provider level?
1228                // if we decide to do this at a higher level, then we can make this an automatic
1229                // trait impl
1230                if Some(true) == hash.require_canonical {
1231                    // check the database, canonical blocks are only stored in the database
1232                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1233                } else {
1234                    self.block_by_hash(hash.block_hash)
1235                }
1236            }
1237        }
1238    }
1239
1240    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1241        Ok(match id {
1242            BlockNumberOrTag::Latest => {
1243                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1244            }
1245            BlockNumberOrTag::Finalized => {
1246                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1247            }
1248            BlockNumberOrTag::Safe => {
1249                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1250            }
1251            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1252            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1253
1254            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1255        })
1256    }
1257
1258    fn sealed_header_by_number_or_tag(
1259        &self,
1260        id: BlockNumberOrTag,
1261    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1262        match id {
1263            BlockNumberOrTag::Latest => {
1264                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1265            }
1266            BlockNumberOrTag::Finalized => {
1267                Ok(self.canonical_in_memory_state.get_finalized_header())
1268            }
1269            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1270            BlockNumberOrTag::Earliest => self
1271                .header_by_number(self.earliest_block_number()?)?
1272                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1273            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1274            BlockNumberOrTag::Number(num) => self
1275                .header_by_number(num)?
1276                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1277        }
1278    }
1279
1280    fn sealed_header_by_id(
1281        &self,
1282        id: BlockId,
1283    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1284        Ok(match id {
1285            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1286            BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1287        })
1288    }
1289
1290    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1291        Ok(match id {
1292            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1293            BlockId::Hash(hash) => self.header(hash.block_hash)?,
1294        })
1295    }
1296}
1297
1298impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1299    fn storage_changeset(
1300        &self,
1301        block_number: BlockNumber,
1302    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1303        if let Some(state) =
1304            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1305        {
1306            let changesets = state
1307                .block()
1308                .execution_output
1309                .bundle
1310                .reverts
1311                .clone()
1312                .to_plain_state_reverts()
1313                .storage
1314                .into_iter()
1315                .flatten()
1316                .flat_map(|revert: PlainStorageRevert| {
1317                    revert.storage_revert.into_iter().map(move |(key, value)| {
1318                        (
1319                            BlockNumberAddress((block_number, revert.address)),
1320                            StorageEntry { key: key.into(), value: value.to_previous_value() },
1321                        )
1322                    })
1323                })
1324                .collect();
1325            Ok(changesets)
1326        } else {
1327            // Perform checks on whether or not changesets exist for the block.
1328
1329            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1330            let storage_history_exists = self
1331                .storage_provider
1332                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1333                .and_then(|checkpoint| {
1334                    // return true if the block number is ahead of the prune checkpoint.
1335                    //
1336                    // The checkpoint stores the highest pruned block number, so we should make
1337                    // sure the block_number is strictly greater.
1338                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1339                })
1340                .unwrap_or(true);
1341
1342            if !storage_history_exists {
1343                return Err(ProviderError::StateAtBlockPruned(block_number))
1344            }
1345
1346            self.storage_provider.storage_changeset(block_number)
1347        }
1348    }
1349}
1350
1351impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1352    fn account_block_changeset(
1353        &self,
1354        block_number: BlockNumber,
1355    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1356        if let Some(state) =
1357            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1358        {
1359            let changesets = state
1360                .block_ref()
1361                .execution_output
1362                .bundle
1363                .reverts
1364                .clone()
1365                .to_plain_state_reverts()
1366                .accounts
1367                .into_iter()
1368                .flatten()
1369                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1370                .collect();
1371            Ok(changesets)
1372        } else {
1373            // Perform checks on whether or not changesets exist for the block.
1374
1375            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1376            let account_history_exists = self
1377                .storage_provider
1378                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1379                .and_then(|checkpoint| {
1380                    // return true if the block number is ahead of the prune checkpoint.
1381                    //
1382                    // The checkpoint stores the highest pruned block number, so we should make
1383                    // sure the block_number is strictly greater.
1384                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1385                })
1386                .unwrap_or(true);
1387
1388            if !account_history_exists {
1389                return Err(ProviderError::StateAtBlockPruned(block_number))
1390            }
1391
1392            self.storage_provider.account_block_changeset(block_number)
1393        }
1394    }
1395
1396    fn get_account_before_block(
1397        &self,
1398        block_number: BlockNumber,
1399        address: Address,
1400    ) -> ProviderResult<Option<AccountBeforeTx>> {
1401        if let Some(state) =
1402            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1403        {
1404            // Search in-memory state for the account changeset
1405            let changeset = state
1406                .block_ref()
1407                .execution_output
1408                .bundle
1409                .reverts
1410                .clone()
1411                .to_plain_state_reverts()
1412                .accounts
1413                .into_iter()
1414                .flatten()
1415                .find(|(addr, _)| addr == &address)
1416                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1417            Ok(changeset)
1418        } else {
1419            // Perform checks on whether or not changesets exist for the block.
1420            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1421            let account_history_exists = self
1422                .storage_provider
1423                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1424                .and_then(|checkpoint| {
1425                    // return true if the block number is ahead of the prune checkpoint.
1426                    //
1427                    // The checkpoint stores the highest pruned block number, so we should make
1428                    // sure the block_number is strictly greater.
1429                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1430                })
1431                .unwrap_or(true);
1432
1433            if !account_history_exists {
1434                return Err(ProviderError::StateAtBlockPruned(block_number))
1435            }
1436
1437            // Delegate to the storage provider for database lookups
1438            self.storage_provider.get_account_before_block(block_number, address)
1439        }
1440    }
1441}
1442
1443impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1444    /// Get basic account information.
1445    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1446        // use latest state provider
1447        let state_provider = self.latest_ref()?;
1448        state_provider.basic_account(address)
1449    }
1450}
1451
1452impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1453    type Receipt = ReceiptTy<N>;
1454
1455    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1456    ///
1457    /// If data for the block does not exist, this will return [`None`].
1458    ///
1459    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1460    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1461    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1462    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1463    /// first place.
1464    fn get_state(
1465        &self,
1466        block: BlockNumber,
1467    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1468        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1469            let state = state.block_ref().execution_outcome().clone();
1470            Ok(Some(state))
1471        } else {
1472            Self::get_state(self, block..=block)
1473        }
1474    }
1475}
1476
1477impl<N: ProviderNodeTypes> TrieReader for ConsistentProvider<N> {
1478    fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
1479        self.storage_provider.trie_reverts(from)
1480    }
1481
1482    fn get_block_trie_updates(
1483        &self,
1484        block_number: BlockNumber,
1485    ) -> ProviderResult<TrieUpdatesSorted> {
1486        self.storage_provider.get_block_trie_updates(block_number)
1487    }
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492    use crate::{
1493        providers::blockchain_provider::BlockchainProvider,
1494        test_utils::create_test_provider_factory, BlockWriter,
1495    };
1496    use alloy_eips::BlockHashOrNumber;
1497    use alloy_primitives::B256;
1498    use itertools::Itertools;
1499    use rand::Rng;
1500    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1501    use reth_db_api::models::AccountBeforeTx;
1502    use reth_ethereum_primitives::Block;
1503    use reth_execution_types::ExecutionOutcome;
1504    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1505    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1506    use reth_testing_utils::generators::{
1507        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1508    };
1509    use revm_database::BundleState;
1510    use std::{
1511        ops::{Bound, Range, RangeBounds},
1512        sync::Arc,
1513    };
1514
1515    const TEST_BLOCKS_COUNT: usize = 5;
1516
1517    fn random_blocks(
1518        rng: &mut impl Rng,
1519        database_blocks: usize,
1520        in_memory_blocks: usize,
1521        requests_count: Option<Range<u8>>,
1522        withdrawals_count: Option<Range<u8>>,
1523        tx_count: impl RangeBounds<u8>,
1524    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1525        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1526
1527        let tx_start = match tx_count.start_bound() {
1528            Bound::Included(&n) | Bound::Excluded(&n) => n,
1529            Bound::Unbounded => u8::MIN,
1530        };
1531        let tx_end = match tx_count.end_bound() {
1532            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1533            Bound::Unbounded => u8::MAX,
1534        };
1535
1536        let blocks = random_block_range(
1537            rng,
1538            0..=block_range,
1539            BlockRangeParams {
1540                parent: Some(B256::ZERO),
1541                tx_count: tx_start..tx_end,
1542                requests_count,
1543                withdrawals_count,
1544            },
1545        );
1546        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1547        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1548    }
1549
1550    #[test]
1551    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1552        // Initialize random number generator and provider factory
1553        let mut rng = generators::rng();
1554        let factory = create_test_provider_factory();
1555
1556        // Generate 10 random blocks and split into database and in-memory blocks
1557        let blocks = random_block_range(
1558            &mut rng,
1559            0..=10,
1560            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1561        );
1562        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1563
1564        // Insert first 5 blocks into the database
1565        let provider_rw = factory.provider_rw()?;
1566        for block in database_blocks {
1567            provider_rw.insert_block(
1568                block.clone().try_recover().expect("failed to seal block with senders"),
1569            )?;
1570        }
1571        provider_rw.commit()?;
1572
1573        // Create a new provider
1574        let provider = BlockchainProvider::new(factory)?;
1575        let consistent_provider = provider.consistent_provider()?;
1576
1577        // Useful blocks
1578        let first_db_block = database_blocks.first().unwrap();
1579        let first_in_mem_block = in_memory_blocks.first().unwrap();
1580        let last_in_mem_block = in_memory_blocks.last().unwrap();
1581
1582        // No block in memory before setting in memory state
1583        assert_eq!(
1584            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1585            None
1586        );
1587        assert_eq!(
1588            consistent_provider
1589                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1590            None
1591        );
1592        // No pending block in memory
1593        assert_eq!(
1594            consistent_provider
1595                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1596            None
1597        );
1598
1599        // Insert first block into the in-memory state
1600        let in_memory_block_senders =
1601            first_in_mem_block.senders().expect("failed to recover senders");
1602        let chain = NewCanonicalChain::Commit {
1603            new: vec![ExecutedBlock {
1604                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1605                    first_in_mem_block.clone(),
1606                    in_memory_block_senders,
1607                )),
1608                ..Default::default()
1609            }],
1610        };
1611        consistent_provider.canonical_in_memory_state.update_chain(chain);
1612        let consistent_provider = provider.consistent_provider()?;
1613
1614        // Now the block should be found in memory
1615        assert_eq!(
1616            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1617            Some(first_in_mem_block.clone().into_block())
1618        );
1619        assert_eq!(
1620            consistent_provider
1621                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1622            Some(first_in_mem_block.clone().into_block())
1623        );
1624
1625        // Find the first block in database by hash
1626        assert_eq!(
1627            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1628            Some(first_db_block.clone().into_block())
1629        );
1630        assert_eq!(
1631            consistent_provider
1632                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1633            Some(first_db_block.clone().into_block())
1634        );
1635
1636        // No pending block in database
1637        assert_eq!(
1638            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1639            None
1640        );
1641
1642        // Insert the last block into the pending state
1643        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1644            recovered_block: Arc::new(RecoveredBlock::new_sealed(
1645                last_in_mem_block.clone(),
1646                Default::default(),
1647            )),
1648            ..Default::default()
1649        });
1650
1651        // Now the last block should be found in memory
1652        assert_eq!(
1653            consistent_provider
1654                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1655            Some(last_in_mem_block.clone_block())
1656        );
1657
1658        Ok(())
1659    }
1660
1661    #[test]
1662    fn test_block_reader_block() -> eyre::Result<()> {
1663        // Initialize random number generator and provider factory
1664        let mut rng = generators::rng();
1665        let factory = create_test_provider_factory();
1666
1667        // Generate 10 random blocks and split into database and in-memory blocks
1668        let blocks = random_block_range(
1669            &mut rng,
1670            0..=10,
1671            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1672        );
1673        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1674
1675        // Insert first 5 blocks into the database
1676        let provider_rw = factory.provider_rw()?;
1677        for block in database_blocks {
1678            provider_rw.insert_block(
1679                block.clone().try_recover().expect("failed to seal block with senders"),
1680            )?;
1681        }
1682        provider_rw.commit()?;
1683
1684        // Create a new provider
1685        let provider = BlockchainProvider::new(factory)?;
1686        let consistent_provider = provider.consistent_provider()?;
1687
1688        // First in memory block
1689        let first_in_mem_block = in_memory_blocks.first().unwrap();
1690        // First database block
1691        let first_db_block = database_blocks.first().unwrap();
1692
1693        // First in memory block should not be found yet as not integrated to the in-memory state
1694        assert_eq!(
1695            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1696            None
1697        );
1698        assert_eq!(
1699            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1700            None
1701        );
1702
1703        // Insert first block into the in-memory state
1704        let in_memory_block_senders =
1705            first_in_mem_block.senders().expect("failed to recover senders");
1706        let chain = NewCanonicalChain::Commit {
1707            new: vec![ExecutedBlock {
1708                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1709                    first_in_mem_block.clone(),
1710                    in_memory_block_senders,
1711                )),
1712                ..Default::default()
1713            }],
1714        };
1715        consistent_provider.canonical_in_memory_state.update_chain(chain);
1716
1717        let consistent_provider = provider.consistent_provider()?;
1718
1719        // First in memory block should be found
1720        assert_eq!(
1721            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1722            Some(first_in_mem_block.clone().into_block())
1723        );
1724        assert_eq!(
1725            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1726            Some(first_in_mem_block.clone().into_block())
1727        );
1728
1729        // First database block should be found
1730        assert_eq!(
1731            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1732            Some(first_db_block.clone().into_block())
1733        );
1734        assert_eq!(
1735            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1736            Some(first_db_block.clone().into_block())
1737        );
1738
1739        Ok(())
1740    }
1741
1742    #[test]
1743    fn test_changeset_reader() -> eyre::Result<()> {
1744        let mut rng = generators::rng();
1745
1746        let (database_blocks, in_memory_blocks) =
1747            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1748
1749        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1750        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1751        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1752
1753        let accounts = random_eoa_accounts(&mut rng, 2);
1754
1755        let (database_changesets, database_state) = random_changeset_range(
1756            &mut rng,
1757            &database_blocks,
1758            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1759            0..0,
1760            0..0,
1761        );
1762        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1763            &mut rng,
1764            &in_memory_blocks,
1765            database_state
1766                .iter()
1767                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1768            0..0,
1769            0..0,
1770        );
1771
1772        let factory = create_test_provider_factory();
1773
1774        let provider_rw = factory.provider_rw()?;
1775        provider_rw.append_blocks_with_state(
1776            database_blocks
1777                .into_iter()
1778                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1779                .collect(),
1780            &ExecutionOutcome {
1781                bundle: BundleState::new(
1782                    database_state.into_iter().map(|(address, (account, _))| {
1783                        (address, None, Some(account.into()), Default::default())
1784                    }),
1785                    database_changesets
1786                        .iter()
1787                        .map(|block_changesets| {
1788                            block_changesets.iter().map(|(address, account, _)| {
1789                                (*address, Some(Some((*account).into())), [])
1790                            })
1791                        })
1792                        .collect::<Vec<_>>(),
1793                    Vec::new(),
1794                ),
1795                first_block: first_database_block,
1796                ..Default::default()
1797            },
1798            Default::default(),
1799        )?;
1800        provider_rw.commit()?;
1801
1802        let provider = BlockchainProvider::new(factory)?;
1803
1804        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1805        let chain = NewCanonicalChain::Commit {
1806            new: vec![in_memory_blocks
1807                .first()
1808                .map(|block| {
1809                    let senders = block.senders().expect("failed to recover senders");
1810                    ExecutedBlock {
1811                        recovered_block: Arc::new(RecoveredBlock::new_sealed(
1812                            block.clone(),
1813                            senders,
1814                        )),
1815                        execution_output: Arc::new(ExecutionOutcome {
1816                            bundle: BundleState::new(
1817                                in_memory_state.into_iter().map(|(address, (account, _))| {
1818                                    (address, None, Some(account.into()), Default::default())
1819                                }),
1820                                [in_memory_changesets.iter().map(|(address, account, _)| {
1821                                    (*address, Some(Some((*account).into())), Vec::new())
1822                                })],
1823                                [],
1824                            ),
1825                            first_block: first_in_memory_block,
1826                            ..Default::default()
1827                        }),
1828                        ..Default::default()
1829                    }
1830                })
1831                .unwrap()],
1832        };
1833        provider.canonical_in_memory_state.update_chain(chain);
1834
1835        let consistent_provider = provider.consistent_provider()?;
1836
1837        assert_eq!(
1838            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1839            database_changesets
1840                .into_iter()
1841                .next_back()
1842                .unwrap()
1843                .into_iter()
1844                .sorted_by_key(|(address, _, _)| *address)
1845                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1846                .collect::<Vec<_>>()
1847        );
1848        assert_eq!(
1849            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1850            in_memory_changesets
1851                .into_iter()
1852                .sorted_by_key(|(address, _, _)| *address)
1853                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1854                .collect::<Vec<_>>()
1855        );
1856
1857        Ok(())
1858    }
1859}