reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4    AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
5    BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider, ProviderError,
6    PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader,
7    StateReader, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, TrieReader,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11    eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12    HashOrNumber,
13};
14use alloy_primitives::{
15    map::{hash_map, HashMap},
16    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
24use reth_prune_types::{PruneCheckpoint, PruneSegment};
25use reth_stages_types::{StageCheckpoint, StageId};
26use reth_static_file_types::StaticFileSegment;
27use reth_storage_api::{
28    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
29    StorageChangeSetReader, TryIntoHistoricalStateProvider,
30};
31use reth_storage_errors::provider::ProviderResult;
32use reth_trie::updates::TrieUpdatesSorted;
33use revm_database::states::PlainStorageRevert;
34use std::{
35    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36    sync::Arc,
37};
38use tracing::trace;
39
40/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
41/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
42/// this provider.
43///
44/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
45/// time-out.
46#[derive(Debug)]
47#[doc(hidden)] // triggers ICE for `cargo docs`
48pub struct ConsistentProvider<N: ProviderNodeTypes> {
49    /// Storage provider.
50    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51    /// Head block at time of [`Self`] creation
52    head_block: Option<Arc<BlockState<N::Primitives>>>,
53    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
54    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
59    ///
60    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
61    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
62    /// view of memory and database.
63    pub fn new(
64        storage_provider_factory: ProviderFactory<N>,
65        state: CanonicalInMemoryState<N::Primitives>,
66    ) -> ProviderResult<Self> {
67        // Each one provides a snapshot at the time of instantiation, but its order matters.
68        //
69        // If we acquire first the database provider, it's possible that before the in-memory chain
70        // snapshot is instantiated, it will flush blocks to disk. This would
71        // mean that our database provider would not have access to the flushed blocks (since it's
72        // working under an older view), while the in-memory state may have deleted them
73        // entirely. Resulting in gaps on the range.
74        let head_block = state.head_state();
75        let storage_provider = storage_provider_factory.database_provider_ro()?;
76        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77    }
78
79    // Helper function to convert range bounds
80    fn convert_range_bounds<T>(
81        &self,
82        range: impl RangeBounds<T>,
83        end_unbounded: impl FnOnce() -> T,
84    ) -> (T, T)
85    where
86        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87    {
88        let start = match range.start_bound() {
89            Bound::Included(&n) => n,
90            Bound::Excluded(&n) => n + T::from(1u8),
91            Bound::Unbounded => T::from(0u8),
92        };
93
94        let end = match range.end_bound() {
95            Bound::Included(&n) => n,
96            Bound::Excluded(&n) => n - T::from(1u8),
97            Bound::Unbounded => end_unbounded(),
98        };
99
100        (start, end)
101    }
102
103    /// Storage provider for latest block
104    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105        trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107        // use latest state provider if the head state exists
108        if let Some(state) = &self.head_block {
109            trace!(target: "providers::blockchain", "Using head state for latest state provider");
110            Ok(self.block_state_provider_ref(state)?.boxed())
111        } else {
112            trace!(target: "providers::blockchain", "Using database state for latest state provider");
113            Ok(self.storage_provider.latest())
114        }
115    }
116
117    fn history_by_block_hash_ref<'a>(
118        &'a self,
119        block_hash: BlockHash,
120    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123        self.get_in_memory_or_storage_by_block(
124            block_hash.into(),
125            |_| self.storage_provider.history_by_block_hash(block_hash),
126            |block_state| {
127                let state_provider = self.block_state_provider_ref(block_state)?;
128                Ok(Box::new(state_provider))
129            },
130        )
131    }
132
133    /// Returns a state provider indexed by the given block number or tag.
134    fn state_by_block_number_ref<'a>(
135        &'a self,
136        number: BlockNumber,
137    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138        let hash =
139            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140        self.history_by_block_hash_ref(hash)
141    }
142
143    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
144    ///
145    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
146    pub fn get_state(
147        &self,
148        range: RangeInclusive<BlockNumber>,
149    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150        if range.is_empty() {
151            return Ok(None)
152        }
153        let start_block_number = *range.start();
154        let end_block_number = *range.end();
155
156        // We are not removing block meta as it is used to get block changesets.
157        let mut block_bodies = Vec::new();
158        for block_num in range.clone() {
159            let block_body = self
160                .block_body_indices(block_num)?
161                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162            block_bodies.push((block_num, block_body))
163        }
164
165        // get transaction receipts
166        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167        else {
168            return Ok(None)
169        };
170        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171            return Ok(None)
172        };
173
174        let mut account_changeset = Vec::new();
175        for block_num in range.clone() {
176            let changeset =
177                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178            account_changeset.extend(changeset);
179        }
180
181        let mut storage_changeset = Vec::new();
182        for block_num in range {
183            let changeset = self.storage_changeset(block_num)?;
184            storage_changeset.extend(changeset);
185        }
186
187        let (state, reverts) =
188            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190        let mut receipt_iter =
191            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193        let mut receipts = Vec::with_capacity(block_bodies.len());
194        // loop break if we are at the end of the blocks.
195        for (_, block_body) in block_bodies {
196            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197            for tx_num in block_body.tx_num_range() {
198                let receipt = receipt_iter
199                    .next()
200                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201                block_receipts.push(receipt);
202            }
203            receipts.push(block_receipts);
204        }
205
206        Ok(Some(ExecutionOutcome::new_init(
207            state,
208            reverts,
209            // We skip new contracts since we never delete them from the database
210            Vec::new(),
211            receipts,
212            start_block_number,
213            Vec::new(),
214        )))
215    }
216
217    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
218    /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
219    /// storage and account changesets.
220    fn populate_bundle_state(
221        &self,
222        account_changeset: Vec<(u64, AccountBeforeTx)>,
223        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224        block_range_end: BlockNumber,
225    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226        let mut state: BundleStateInit = HashMap::default();
227        let mut reverts: RevertsInit = HashMap::default();
228        let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230        // add account changeset changes
231        for (block_number, account_before) in account_changeset.into_iter().rev() {
232            let AccountBeforeTx { info: old_info, address } = account_before;
233            match state.entry(address) {
234                hash_map::Entry::Vacant(entry) => {
235                    let new_info = state_provider.basic_account(&address)?;
236                    entry.insert((old_info, new_info, HashMap::default()));
237                }
238                hash_map::Entry::Occupied(mut entry) => {
239                    // overwrite old account state.
240                    entry.get_mut().0 = old_info;
241                }
242            }
243            // insert old info into reverts.
244            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245        }
246
247        // add storage changeset changes
248        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249            let BlockNumberAddress((block_number, address)) = block_and_address;
250            // get account state or insert from plain state.
251            let account_state = match state.entry(address) {
252                hash_map::Entry::Vacant(entry) => {
253                    let present_info = state_provider.basic_account(&address)?;
254                    entry.insert((present_info, present_info, HashMap::default()))
255                }
256                hash_map::Entry::Occupied(entry) => entry.into_mut(),
257            };
258
259            // match storage.
260            match account_state.2.entry(old_storage.key) {
261                hash_map::Entry::Vacant(entry) => {
262                    let new_storage_value =
263                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264                    entry.insert((old_storage.value, new_storage_value));
265                }
266                hash_map::Entry::Occupied(mut entry) => {
267                    entry.get_mut().0 = old_storage.value;
268                }
269            };
270
271            reverts
272                .entry(block_number)
273                .or_default()
274                .entry(address)
275                .or_default()
276                .1
277                .push(old_storage);
278        }
279
280        Ok((state, reverts))
281    }
282
283    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
284    /// is met.
285    ///
286    /// Creates a snapshot of the in-memory chain state and database provider to prevent
287    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
288    /// recent in-memory blocks in case of overlaps.
289    ///
290    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
291    ///   user to retrieve the required items from the database using [`RangeInclusive`].
292    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
293    ///   state, allowing for selection or filtering for the desired data.
294    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295        &self,
296        range: impl RangeBounds<BlockNumber>,
297        fetch_db_range: F,
298        map_block_state_item: G,
299        mut predicate: P,
300    ) -> ProviderResult<Vec<T>>
301    where
302        F: FnOnce(
303            &DatabaseProviderRO<N::DB, N>,
304            RangeInclusive<BlockNumber>,
305            &mut P,
306        ) -> ProviderResult<Vec<T>>,
307        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308        P: FnMut(&T) -> bool,
309    {
310        // Each one provides a snapshot at the time of instantiation, but its order matters.
311        //
312        // If we acquire first the database provider, it's possible that before the in-memory chain
313        // snapshot is instantiated, it will flush blocks to disk. This would
314        // mean that our database provider would not have access to the flushed blocks (since it's
315        // working under an older view), while the in-memory state may have deleted them
316        // entirely. Resulting in gaps on the range.
317        let mut in_memory_chain =
318            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319        let db_provider = &self.storage_provider;
320
321        let (start, end) = self.convert_range_bounds(range, || {
322            // the first block is the highest one.
323            in_memory_chain
324                .first()
325                .map(|b| b.number())
326                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327        });
328
329        if start > end {
330            return Ok(vec![])
331        }
332
333        // Split range into storage_range and in-memory range. If the in-memory range is not
334        // necessary drop it early.
335        //
336        // The last block of `in_memory_chain` is the lowest block number.
337        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338            Some(lowest_memory_block) if lowest_memory_block <= end => {
339                let highest_memory_block =
340                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342                // Database will for a time overlap with in-memory-chain blocks. In
343                // case of a re-org, it can mean that the database blocks are of a forked chain, and
344                // so, we should prioritize the in-memory overlapped blocks.
345                let in_memory_range =
346                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348                // If requested range is in the middle of the in-memory range, remove the necessary
349                // lowest blocks
350                in_memory_chain.truncate(
351                    in_memory_chain
352                        .len()
353                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354                );
355
356                let storage_range =
357                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359                (Some((in_memory_chain, in_memory_range)), storage_range)
360            }
361            _ => {
362                // Drop the in-memory chain so we don't hold blocks in memory.
363                drop(in_memory_chain);
364
365                (None, Some(start..=end))
366            }
367        };
368
369        let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371        if let Some(storage_range) = storage_range {
372            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373            items.append(&mut db_items);
374
375            // The predicate was not met, if the number of items differs from the expected. So, we
376            // return what we have.
377            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378                return Ok(items)
379            }
380        }
381
382        if let Some((in_memory_chain, in_memory_range)) = in_memory {
383            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384                debug_assert!(num == block.number());
385                if let Some(item) = map_block_state_item(block, &mut predicate) {
386                    items.push(item);
387                } else {
388                    break
389                }
390            }
391        }
392
393        Ok(items)
394    }
395
396    /// This uses a given [`BlockState`] to initialize a state provider for that block.
397    fn block_state_provider_ref(
398        &self,
399        state: &BlockState<N::Primitives>,
400    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401        let anchor_hash = state.anchor().hash;
402        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405    }
406
407    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
408    ///
409    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
410    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
411    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
412    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413        &self,
414        range: impl RangeBounds<BlockNumber>,
415        fetch_from_db: S,
416        fetch_from_block_state: M,
417    ) -> ProviderResult<Vec<R>>
418    where
419        S: FnOnce(
420            &DatabaseProviderRO<N::DB, N>,
421            RangeInclusive<TxNumber>,
422        ) -> ProviderResult<Vec<R>>,
423        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424    {
425        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426        let provider = &self.storage_provider;
427
428        // Get the last block number stored in the storage which does NOT overlap with in-memory
429        // chain.
430        let last_database_block_number = in_mem_chain
431            .last()
432            .map(|b| Ok(b.anchor().number))
433            .unwrap_or_else(|| provider.last_block_number())?;
434
435        // Get the next tx number for the last block stored in the storage, which marks the start of
436        // the in-memory state.
437        let last_block_body_index = provider
438            .block_body_indices(last_database_block_number)?
439            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442        let (start, end) = self.convert_range_bounds(range, || {
443            in_mem_chain
444                .iter()
445                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446                .sum::<u64>() +
447                last_block_body_index.last_tx_num()
448        });
449
450        if start > end {
451            return Ok(vec![])
452        }
453
454        let mut tx_range = start..=end;
455
456        // If the range is entirely before the first in-memory transaction number, fetch from
457        // storage
458        if *tx_range.end() < in_memory_tx_num {
459            return fetch_from_db(provider, tx_range);
460        }
461
462        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464        // If the range spans storage and memory, get elements from storage first.
465        if *tx_range.start() < in_memory_tx_num {
466            // Determine the range that needs to be fetched from storage.
467            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469            // Set the remaining transaction range for in-memory
470            tx_range = in_memory_tx_num..=*tx_range.end();
471
472            items.extend(fetch_from_db(provider, db_range)?);
473        }
474
475        // Iterate from the lowest block to the highest in-memory chain
476        for block_state in in_mem_chain.iter().rev() {
477            let block_tx_count =
478                block_state.block_ref().recovered_block().body().transactions().len();
479            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481            // If the transaction range start is equal or higher than the next block first
482            // transaction, advance
483            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484                in_memory_tx_num += block_tx_count as u64;
485                continue
486            }
487
488            // This should only be more than 0 once, in case of a partial range inside a block.
489            let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491            items.extend(fetch_from_block_state(
492                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493                block_state,
494            )?);
495
496            in_memory_tx_num += block_tx_count as u64;
497
498            // Break if the range has been fully processed
499            if in_memory_tx_num > *tx_range.end() {
500                break
501            }
502
503            // Set updated range
504            tx_range = in_memory_tx_num..=*tx_range.end();
505        }
506
507        Ok(items)
508    }
509
510    /// Fetches data from either in-memory state or persistent storage by transaction
511    /// [`HashOrNumber`].
512    fn get_in_memory_or_storage_by_tx<S, M, R>(
513        &self,
514        id: HashOrNumber,
515        fetch_from_db: S,
516        fetch_from_block_state: M,
517    ) -> ProviderResult<Option<R>>
518    where
519        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521    {
522        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523        let provider = &self.storage_provider;
524
525        // Get the last block number stored in the database which does NOT overlap with in-memory
526        // chain.
527        let last_database_block_number = in_mem_chain
528            .last()
529            .map(|b| Ok(b.anchor().number))
530            .unwrap_or_else(|| provider.last_block_number())?;
531
532        // Get the next tx number for the last block stored in the database and consider it the
533        // first tx number of the in-memory state
534        let last_block_body_index = provider
535            .block_body_indices(last_database_block_number)?
536            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539        // If the transaction number is less than the first in-memory transaction number, make a
540        // database lookup
541        if let HashOrNumber::Number(id) = id &&
542            id < in_memory_tx_num
543        {
544            return fetch_from_db(provider)
545        }
546
547        // Iterate from the lowest block to the highest
548        for block_state in in_mem_chain.iter().rev() {
549            let executed_block = block_state.block_ref();
550            let block = executed_block.recovered_block();
551
552            for tx_index in 0..block.body().transactions().len() {
553                match id {
554                    HashOrNumber::Hash(tx_hash) => {
555                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557                        }
558                    }
559                    HashOrNumber::Number(id) => {
560                        if id == in_memory_tx_num {
561                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562                        }
563                    }
564                }
565
566                in_memory_tx_num += 1;
567            }
568        }
569
570        // Not found in-memory, so check database.
571        if let HashOrNumber::Hash(_) = id {
572            return fetch_from_db(provider)
573        }
574
575        Ok(None)
576    }
577
578    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
579    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580        &self,
581        id: BlockHashOrNumber,
582        fetch_from_db: S,
583        fetch_from_block_state: M,
584    ) -> ProviderResult<R>
585    where
586        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588    {
589        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590            return fetch_from_block_state(block_state)
591        }
592        fetch_from_db(&self.storage_provider)
593    }
594
595    /// Consumes the provider and returns a state provider for the specific block hash.
596    pub(crate) fn into_state_provider_at_block_hash(
597        self,
598        block_hash: BlockHash,
599    ) -> ProviderResult<Box<dyn StateProvider>> {
600        let Self { storage_provider, head_block, .. } = self;
601        let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
602            let block_number = storage_provider
603                .block_number(block_hash)?
604                .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
605            storage_provider.try_into_history_at_block(block_number)
606        };
607        if let Some(Some(block_state)) =
608            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
609        {
610            let anchor_hash = block_state.anchor().hash;
611            let latest_historical = into_history_at_block_hash(anchor_hash)?;
612            return Ok(Box::new(block_state.state_provider(latest_historical)));
613        }
614        into_history_at_block_hash(block_hash)
615    }
616}
617
618impl<N: ProviderNodeTypes> ConsistentProvider<N> {
619    /// Ensures that the given block number is canonical (synced)
620    ///
621    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
622    /// out of range and would lead to invalid results, mainly during initial sync.
623    ///
624    /// Verifying the `block_number` would be expensive since we need to lookup sync table
625    /// Instead, we ensure that the `block_number` is within the range of the
626    /// [`Self::best_block_number`] which is updated when a block is synced.
627    #[inline]
628    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
629        let latest = self.best_block_number()?;
630        if block_number > latest {
631            Err(ProviderError::HeaderNotFound(block_number.into()))
632        } else {
633            Ok(())
634        }
635    }
636}
637
638impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
639    type Primitives = N::Primitives;
640}
641
642impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
643    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
644        self.storage_provider.static_file_provider()
645    }
646
647    fn get_static_file_writer(
648        &self,
649        block: BlockNumber,
650        segment: StaticFileSegment,
651    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
652        self.storage_provider.get_static_file_writer(block, segment)
653    }
654}
655
656impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
657    type Header = HeaderTy<N>;
658
659    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
660        self.get_in_memory_or_storage_by_block(
661            block_hash.into(),
662            |db_provider| db_provider.header(block_hash),
663            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
664        )
665    }
666
667    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
668        self.get_in_memory_or_storage_by_block(
669            num.into(),
670            |db_provider| db_provider.header_by_number(num),
671            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
672        )
673    }
674
675    fn headers_range(
676        &self,
677        range: impl RangeBounds<BlockNumber>,
678    ) -> ProviderResult<Vec<Self::Header>> {
679        self.get_in_memory_or_storage_by_block_range_while(
680            range,
681            |db_provider, range, _| db_provider.headers_range(range),
682            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
683            |_| true,
684        )
685    }
686
687    fn sealed_header(
688        &self,
689        number: BlockNumber,
690    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
691        self.get_in_memory_or_storage_by_block(
692            number.into(),
693            |db_provider| db_provider.sealed_header(number),
694            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
695        )
696    }
697
698    fn sealed_headers_range(
699        &self,
700        range: impl RangeBounds<BlockNumber>,
701    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
702        self.get_in_memory_or_storage_by_block_range_while(
703            range,
704            |db_provider, range, _| db_provider.sealed_headers_range(range),
705            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
706            |_| true,
707        )
708    }
709
710    fn sealed_headers_while(
711        &self,
712        range: impl RangeBounds<BlockNumber>,
713        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
714    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
715        self.get_in_memory_or_storage_by_block_range_while(
716            range,
717            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
718            |block_state, predicate| {
719                let header = block_state.block_ref().recovered_block().sealed_header();
720                predicate(header).then(|| header.clone())
721            },
722            predicate,
723        )
724    }
725}
726
727impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
728    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
729        self.get_in_memory_or_storage_by_block(
730            number.into(),
731            |db_provider| db_provider.block_hash(number),
732            |block_state| Ok(Some(block_state.hash())),
733        )
734    }
735
736    fn canonical_hashes_range(
737        &self,
738        start: BlockNumber,
739        end: BlockNumber,
740    ) -> ProviderResult<Vec<B256>> {
741        self.get_in_memory_or_storage_by_block_range_while(
742            start..end,
743            |db_provider, inclusive_range, _| {
744                db_provider
745                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
746            },
747            |block_state, _| Some(block_state.hash()),
748            |_| true,
749        )
750    }
751}
752
753impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
754    fn chain_info(&self) -> ProviderResult<ChainInfo> {
755        let best_number = self.best_block_number()?;
756        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
757    }
758
759    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
760        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
761    }
762
763    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
764        self.storage_provider.last_block_number()
765    }
766
767    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
768        self.get_in_memory_or_storage_by_block(
769            hash.into(),
770            |db_provider| db_provider.block_number(hash),
771            |block_state| Ok(Some(block_state.number())),
772        )
773    }
774}
775
776impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
777    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
778        Ok(self.canonical_in_memory_state.pending_block_num_hash())
779    }
780
781    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
782        Ok(self.canonical_in_memory_state.get_safe_num_hash())
783    }
784
785    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
786        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
787    }
788}
789
790impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
791    type Block = BlockTy<N>;
792
793    fn find_block_by_hash(
794        &self,
795        hash: B256,
796        source: BlockSource,
797    ) -> ProviderResult<Option<Self::Block>> {
798        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
799            let Some(block) = self.get_in_memory_or_storage_by_block(
800                hash.into(),
801                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
802                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
803            )?
804        {
805            return Ok(Some(block))
806        }
807
808        if matches!(source, BlockSource::Pending | BlockSource::Any) {
809            return Ok(self
810                .canonical_in_memory_state
811                .pending_block()
812                .filter(|b| b.hash() == hash)
813                .map(|b| b.into_block()))
814        }
815
816        Ok(None)
817    }
818
819    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
820        self.get_in_memory_or_storage_by_block(
821            id,
822            |db_provider| db_provider.block(id),
823            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
824        )
825    }
826
827    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
828        Ok(self.canonical_in_memory_state.pending_recovered_block())
829    }
830
831    fn pending_block_and_receipts(
832        &self,
833    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
834        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
835    }
836
837    /// Returns the block with senders with matching number or hash from database.
838    ///
839    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
840    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
841    ///
842    /// Returns `None` if block is not found.
843    fn recovered_block(
844        &self,
845        id: BlockHashOrNumber,
846        transaction_kind: TransactionVariant,
847    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
848        self.get_in_memory_or_storage_by_block(
849            id,
850            |db_provider| db_provider.recovered_block(id, transaction_kind),
851            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
852        )
853    }
854
855    fn sealed_block_with_senders(
856        &self,
857        id: BlockHashOrNumber,
858        transaction_kind: TransactionVariant,
859    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
860        self.get_in_memory_or_storage_by_block(
861            id,
862            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
863            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
864        )
865    }
866
867    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
868        self.get_in_memory_or_storage_by_block_range_while(
869            range,
870            |db_provider, range, _| db_provider.block_range(range),
871            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
872            |_| true,
873        )
874    }
875
876    fn block_with_senders_range(
877        &self,
878        range: RangeInclusive<BlockNumber>,
879    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
880        self.get_in_memory_or_storage_by_block_range_while(
881            range,
882            |db_provider, range, _| db_provider.block_with_senders_range(range),
883            |block_state, _| Some(block_state.block().recovered_block().clone()),
884            |_| true,
885        )
886    }
887
888    fn recovered_block_range(
889        &self,
890        range: RangeInclusive<BlockNumber>,
891    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
892        self.get_in_memory_or_storage_by_block_range_while(
893            range,
894            |db_provider, range, _| db_provider.recovered_block_range(range),
895            |block_state, _| Some(block_state.block().recovered_block().clone()),
896            |_| true,
897        )
898    }
899
900    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
901        self.get_in_memory_or_storage_by_tx(
902            id.into(),
903            |db_provider| db_provider.block_by_transaction_id(id),
904            |_, _, block_state| Ok(Some(block_state.number())),
905        )
906    }
907}
908
909impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
910    type Transaction = TxTy<N>;
911
912    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
913        self.get_in_memory_or_storage_by_tx(
914            tx_hash.into(),
915            |db_provider| db_provider.transaction_id(tx_hash),
916            |_, tx_number, _| Ok(Some(tx_number)),
917        )
918    }
919
920    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
921        self.get_in_memory_or_storage_by_tx(
922            id.into(),
923            |provider| provider.transaction_by_id(id),
924            |tx_index, _, block_state| {
925                Ok(block_state
926                    .block_ref()
927                    .recovered_block()
928                    .body()
929                    .transactions()
930                    .get(tx_index)
931                    .cloned())
932            },
933        )
934    }
935
936    fn transaction_by_id_unhashed(
937        &self,
938        id: TxNumber,
939    ) -> ProviderResult<Option<Self::Transaction>> {
940        self.get_in_memory_or_storage_by_tx(
941            id.into(),
942            |provider| provider.transaction_by_id_unhashed(id),
943            |tx_index, _, block_state| {
944                Ok(block_state
945                    .block_ref()
946                    .recovered_block()
947                    .body()
948                    .transactions()
949                    .get(tx_index)
950                    .cloned())
951            },
952        )
953    }
954
955    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
956        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
957            return Ok(Some(tx))
958        }
959
960        self.storage_provider.transaction_by_hash(hash)
961    }
962
963    fn transaction_by_hash_with_meta(
964        &self,
965        tx_hash: TxHash,
966    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
967        if let Some((tx, meta)) =
968            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
969        {
970            return Ok(Some((tx, meta)))
971        }
972
973        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
974    }
975
976    fn transactions_by_block(
977        &self,
978        id: BlockHashOrNumber,
979    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
980        self.get_in_memory_or_storage_by_block(
981            id,
982            |provider| provider.transactions_by_block(id),
983            |block_state| {
984                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
985            },
986        )
987    }
988
989    fn transactions_by_block_range(
990        &self,
991        range: impl RangeBounds<BlockNumber>,
992    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
993        self.get_in_memory_or_storage_by_block_range_while(
994            range,
995            |db_provider, range, _| db_provider.transactions_by_block_range(range),
996            |block_state, _| {
997                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
998            },
999            |_| true,
1000        )
1001    }
1002
1003    fn transactions_by_tx_range(
1004        &self,
1005        range: impl RangeBounds<TxNumber>,
1006    ) -> ProviderResult<Vec<Self::Transaction>> {
1007        self.get_in_memory_or_storage_by_tx_range(
1008            range,
1009            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1010            |index_range, block_state| {
1011                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1012                    .to_vec())
1013            },
1014        )
1015    }
1016
1017    fn senders_by_tx_range(
1018        &self,
1019        range: impl RangeBounds<TxNumber>,
1020    ) -> ProviderResult<Vec<Address>> {
1021        self.get_in_memory_or_storage_by_tx_range(
1022            range,
1023            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1024            |index_range, block_state| {
1025                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1026            },
1027        )
1028    }
1029
1030    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1031        self.get_in_memory_or_storage_by_tx(
1032            id.into(),
1033            |provider| provider.transaction_sender(id),
1034            |tx_index, _, block_state| {
1035                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1036            },
1037        )
1038    }
1039}
1040
1041impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1042    type Receipt = ReceiptTy<N>;
1043
1044    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1045        self.get_in_memory_or_storage_by_tx(
1046            id.into(),
1047            |provider| provider.receipt(id),
1048            |tx_index, _, block_state| {
1049                Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1050            },
1051        )
1052    }
1053
1054    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1055        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1056            let executed_block = block_state.block_ref();
1057            let block = executed_block.recovered_block();
1058            let receipts = block_state.executed_block_receipts();
1059
1060            // assuming 1:1 correspondence between transactions and receipts
1061            debug_assert_eq!(
1062                block.body().transactions().len(),
1063                receipts.len(),
1064                "Mismatch between transaction and receipt count"
1065            );
1066
1067            if let Some(tx_index) =
1068                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1069            {
1070                // safe to use tx_index for receipts due to 1:1 correspondence
1071                return Ok(receipts.get(tx_index).cloned());
1072            }
1073        }
1074
1075        self.storage_provider.receipt_by_hash(hash)
1076    }
1077
1078    fn receipts_by_block(
1079        &self,
1080        block: BlockHashOrNumber,
1081    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1082        self.get_in_memory_or_storage_by_block(
1083            block,
1084            |db_provider| db_provider.receipts_by_block(block),
1085            |block_state| Ok(Some(block_state.executed_block_receipts())),
1086        )
1087    }
1088
1089    fn receipts_by_tx_range(
1090        &self,
1091        range: impl RangeBounds<TxNumber>,
1092    ) -> ProviderResult<Vec<Self::Receipt>> {
1093        self.get_in_memory_or_storage_by_tx_range(
1094            range,
1095            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1096            |index_range, block_state| {
1097                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1098            },
1099        )
1100    }
1101
1102    fn receipts_by_block_range(
1103        &self,
1104        block_range: RangeInclusive<BlockNumber>,
1105    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1106        self.storage_provider.receipts_by_block_range(block_range)
1107    }
1108}
1109
1110impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1111    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1112        match block {
1113            BlockId::Hash(rpc_block_hash) => {
1114                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1115                if receipts.is_none() &&
1116                    !rpc_block_hash.require_canonical.unwrap_or(false) &&
1117                    let Some(state) = self
1118                        .head_block
1119                        .as_ref()
1120                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1121                {
1122                    receipts = Some(state.executed_block_receipts());
1123                }
1124                Ok(receipts)
1125            }
1126            BlockId::Number(num_tag) => match num_tag {
1127                BlockNumberOrTag::Pending => Ok(self
1128                    .canonical_in_memory_state
1129                    .pending_state()
1130                    .map(|block_state| block_state.executed_block_receipts())),
1131                _ => {
1132                    if let Some(num) = self.convert_block_number(num_tag)? {
1133                        self.receipts_by_block(num.into())
1134                    } else {
1135                        Ok(None)
1136                    }
1137                }
1138            },
1139        }
1140    }
1141}
1142
1143impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1144    fn block_body_indices(
1145        &self,
1146        number: BlockNumber,
1147    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1148        self.get_in_memory_or_storage_by_block(
1149            number.into(),
1150            |db_provider| db_provider.block_body_indices(number),
1151            |block_state| {
1152                // Find the last block indices on database
1153                let last_storage_block_number = block_state.anchor().number;
1154                let mut stored_indices = self
1155                    .storage_provider
1156                    .block_body_indices(last_storage_block_number)?
1157                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1158
1159                // Prepare our block indices
1160                stored_indices.first_tx_num = stored_indices.next_tx_num();
1161                stored_indices.tx_count = 0;
1162
1163                // Iterate from the lowest block in memory until our target block
1164                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1165                    let block_tx_count =
1166                        state.block_ref().recovered_block().body().transactions().len() as u64;
1167                    if state.block_ref().recovered_block().number() == number {
1168                        stored_indices.tx_count = block_tx_count;
1169                    } else {
1170                        stored_indices.first_tx_num += block_tx_count;
1171                    }
1172                }
1173
1174                Ok(Some(stored_indices))
1175            },
1176        )
1177    }
1178
1179    fn block_body_indices_range(
1180        &self,
1181        range: RangeInclusive<BlockNumber>,
1182    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1183        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1184    }
1185}
1186
1187impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1188    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1189        self.storage_provider.get_stage_checkpoint(id)
1190    }
1191
1192    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1193        self.storage_provider.get_stage_checkpoint_progress(id)
1194    }
1195
1196    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1197        self.storage_provider.get_all_checkpoints()
1198    }
1199}
1200
1201impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1202    fn get_prune_checkpoint(
1203        &self,
1204        segment: PruneSegment,
1205    ) -> ProviderResult<Option<PruneCheckpoint>> {
1206        self.storage_provider.get_prune_checkpoint(segment)
1207    }
1208
1209    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1210        self.storage_provider.get_prune_checkpoints()
1211    }
1212}
1213
1214impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1215    type ChainSpec = N::ChainSpec;
1216
1217    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1218        ChainSpecProvider::chain_spec(&self.storage_provider)
1219    }
1220}
1221
1222impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1223    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1224        match id {
1225            BlockId::Number(num) => self.block_by_number_or_tag(num),
1226            BlockId::Hash(hash) => {
1227                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1228                // so not at the provider level?
1229                // if we decide to do this at a higher level, then we can make this an automatic
1230                // trait impl
1231                if Some(true) == hash.require_canonical {
1232                    // check the database, canonical blocks are only stored in the database
1233                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1234                } else {
1235                    self.block_by_hash(hash.block_hash)
1236                }
1237            }
1238        }
1239    }
1240
1241    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1242        Ok(match id {
1243            BlockNumberOrTag::Latest => {
1244                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1245            }
1246            BlockNumberOrTag::Finalized => {
1247                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1248            }
1249            BlockNumberOrTag::Safe => {
1250                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1251            }
1252            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1253            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1254
1255            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1256        })
1257    }
1258
1259    fn sealed_header_by_number_or_tag(
1260        &self,
1261        id: BlockNumberOrTag,
1262    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1263        match id {
1264            BlockNumberOrTag::Latest => {
1265                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1266            }
1267            BlockNumberOrTag::Finalized => {
1268                Ok(self.canonical_in_memory_state.get_finalized_header())
1269            }
1270            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1271            BlockNumberOrTag::Earliest => self
1272                .header_by_number(self.earliest_block_number()?)?
1273                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1274            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1275            BlockNumberOrTag::Number(num) => self
1276                .header_by_number(num)?
1277                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1278        }
1279    }
1280
1281    fn sealed_header_by_id(
1282        &self,
1283        id: BlockId,
1284    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1285        Ok(match id {
1286            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1287            BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1288        })
1289    }
1290
1291    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1292        Ok(match id {
1293            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1294            BlockId::Hash(hash) => self.header(hash.block_hash)?,
1295        })
1296    }
1297}
1298
1299impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1300    fn storage_changeset(
1301        &self,
1302        block_number: BlockNumber,
1303    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1304        if let Some(state) =
1305            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1306        {
1307            let changesets = state
1308                .block()
1309                .execution_output
1310                .bundle
1311                .reverts
1312                .clone()
1313                .to_plain_state_reverts()
1314                .storage
1315                .into_iter()
1316                .flatten()
1317                .flat_map(|revert: PlainStorageRevert| {
1318                    revert.storage_revert.into_iter().map(move |(key, value)| {
1319                        (
1320                            BlockNumberAddress((block_number, revert.address)),
1321                            StorageEntry { key: key.into(), value: value.to_previous_value() },
1322                        )
1323                    })
1324                })
1325                .collect();
1326            Ok(changesets)
1327        } else {
1328            // Perform checks on whether or not changesets exist for the block.
1329
1330            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1331            let storage_history_exists = self
1332                .storage_provider
1333                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1334                .and_then(|checkpoint| {
1335                    // return true if the block number is ahead of the prune checkpoint.
1336                    //
1337                    // The checkpoint stores the highest pruned block number, so we should make
1338                    // sure the block_number is strictly greater.
1339                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1340                })
1341                .unwrap_or(true);
1342
1343            if !storage_history_exists {
1344                return Err(ProviderError::StateAtBlockPruned(block_number))
1345            }
1346
1347            self.storage_provider.storage_changeset(block_number)
1348        }
1349    }
1350}
1351
1352impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1353    fn account_block_changeset(
1354        &self,
1355        block_number: BlockNumber,
1356    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1357        if let Some(state) =
1358            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1359        {
1360            let changesets = state
1361                .block_ref()
1362                .execution_output
1363                .bundle
1364                .reverts
1365                .clone()
1366                .to_plain_state_reverts()
1367                .accounts
1368                .into_iter()
1369                .flatten()
1370                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1371                .collect();
1372            Ok(changesets)
1373        } else {
1374            // Perform checks on whether or not changesets exist for the block.
1375
1376            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1377            let account_history_exists = self
1378                .storage_provider
1379                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1380                .and_then(|checkpoint| {
1381                    // return true if the block number is ahead of the prune checkpoint.
1382                    //
1383                    // The checkpoint stores the highest pruned block number, so we should make
1384                    // sure the block_number is strictly greater.
1385                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1386                })
1387                .unwrap_or(true);
1388
1389            if !account_history_exists {
1390                return Err(ProviderError::StateAtBlockPruned(block_number))
1391            }
1392
1393            self.storage_provider.account_block_changeset(block_number)
1394        }
1395    }
1396
1397    fn get_account_before_block(
1398        &self,
1399        block_number: BlockNumber,
1400        address: Address,
1401    ) -> ProviderResult<Option<AccountBeforeTx>> {
1402        if let Some(state) =
1403            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1404        {
1405            // Search in-memory state for the account changeset
1406            let changeset = state
1407                .block_ref()
1408                .execution_output
1409                .bundle
1410                .reverts
1411                .clone()
1412                .to_plain_state_reverts()
1413                .accounts
1414                .into_iter()
1415                .flatten()
1416                .find(|(addr, _)| addr == &address)
1417                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1418            Ok(changeset)
1419        } else {
1420            // Perform checks on whether or not changesets exist for the block.
1421            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1422            let account_history_exists = self
1423                .storage_provider
1424                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1425                .and_then(|checkpoint| {
1426                    // return true if the block number is ahead of the prune checkpoint.
1427                    //
1428                    // The checkpoint stores the highest pruned block number, so we should make
1429                    // sure the block_number is strictly greater.
1430                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1431                })
1432                .unwrap_or(true);
1433
1434            if !account_history_exists {
1435                return Err(ProviderError::StateAtBlockPruned(block_number))
1436            }
1437
1438            // Delegate to the storage provider for database lookups
1439            self.storage_provider.get_account_before_block(block_number, address)
1440        }
1441    }
1442}
1443
1444impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1445    /// Get basic account information.
1446    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1447        // use latest state provider
1448        let state_provider = self.latest_ref()?;
1449        state_provider.basic_account(address)
1450    }
1451}
1452
1453impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1454    type Receipt = ReceiptTy<N>;
1455
1456    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1457    ///
1458    /// If data for the block does not exist, this will return [`None`].
1459    ///
1460    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1461    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1462    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1463    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1464    /// first place.
1465    fn get_state(
1466        &self,
1467        block: BlockNumber,
1468    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1469        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1470            let state = state.block_ref().execution_outcome().clone();
1471            Ok(Some(state))
1472        } else {
1473            Self::get_state(self, block..=block)
1474        }
1475    }
1476}
1477
1478impl<N: ProviderNodeTypes> TrieReader for ConsistentProvider<N> {
1479    fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
1480        self.storage_provider.trie_reverts(from)
1481    }
1482
1483    fn get_block_trie_updates(
1484        &self,
1485        block_number: BlockNumber,
1486    ) -> ProviderResult<TrieUpdatesSorted> {
1487        self.storage_provider.get_block_trie_updates(block_number)
1488    }
1489}
1490
1491#[cfg(test)]
1492mod tests {
1493    use crate::{
1494        providers::blockchain_provider::BlockchainProvider,
1495        test_utils::create_test_provider_factory, BlockWriter,
1496    };
1497    use alloy_eips::BlockHashOrNumber;
1498    use alloy_primitives::B256;
1499    use itertools::Itertools;
1500    use rand::Rng;
1501    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1502    use reth_db_api::models::AccountBeforeTx;
1503    use reth_ethereum_primitives::Block;
1504    use reth_execution_types::ExecutionOutcome;
1505    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1506    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1507    use reth_testing_utils::generators::{
1508        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1509    };
1510    use revm_database::BundleState;
1511    use std::{
1512        ops::{Bound, Range, RangeBounds},
1513        sync::Arc,
1514    };
1515
1516    const TEST_BLOCKS_COUNT: usize = 5;
1517
1518    fn random_blocks(
1519        rng: &mut impl Rng,
1520        database_blocks: usize,
1521        in_memory_blocks: usize,
1522        requests_count: Option<Range<u8>>,
1523        withdrawals_count: Option<Range<u8>>,
1524        tx_count: impl RangeBounds<u8>,
1525    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1526        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1527
1528        let tx_start = match tx_count.start_bound() {
1529            Bound::Included(&n) | Bound::Excluded(&n) => n,
1530            Bound::Unbounded => u8::MIN,
1531        };
1532        let tx_end = match tx_count.end_bound() {
1533            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1534            Bound::Unbounded => u8::MAX,
1535        };
1536
1537        let blocks = random_block_range(
1538            rng,
1539            0..=block_range,
1540            BlockRangeParams {
1541                parent: Some(B256::ZERO),
1542                tx_count: tx_start..tx_end,
1543                requests_count,
1544                withdrawals_count,
1545            },
1546        );
1547        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1548        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1549    }
1550
1551    #[test]
1552    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1553        // Initialize random number generator and provider factory
1554        let mut rng = generators::rng();
1555        let factory = create_test_provider_factory();
1556
1557        // Generate 10 random blocks and split into database and in-memory blocks
1558        let blocks = random_block_range(
1559            &mut rng,
1560            0..=10,
1561            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1562        );
1563        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1564
1565        // Insert first 5 blocks into the database
1566        let provider_rw = factory.provider_rw()?;
1567        for block in database_blocks {
1568            provider_rw.insert_block(
1569                block.clone().try_recover().expect("failed to seal block with senders"),
1570            )?;
1571        }
1572        provider_rw.commit()?;
1573
1574        // Create a new provider
1575        let provider = BlockchainProvider::new(factory)?;
1576        let consistent_provider = provider.consistent_provider()?;
1577
1578        // Useful blocks
1579        let first_db_block = database_blocks.first().unwrap();
1580        let first_in_mem_block = in_memory_blocks.first().unwrap();
1581        let last_in_mem_block = in_memory_blocks.last().unwrap();
1582
1583        // No block in memory before setting in memory state
1584        assert_eq!(
1585            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1586            None
1587        );
1588        assert_eq!(
1589            consistent_provider
1590                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1591            None
1592        );
1593        // No pending block in memory
1594        assert_eq!(
1595            consistent_provider
1596                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1597            None
1598        );
1599
1600        // Insert first block into the in-memory state
1601        let in_memory_block_senders =
1602            first_in_mem_block.senders().expect("failed to recover senders");
1603        let chain = NewCanonicalChain::Commit {
1604            new: vec![ExecutedBlock {
1605                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1606                    first_in_mem_block.clone(),
1607                    in_memory_block_senders,
1608                )),
1609                ..Default::default()
1610            }],
1611        };
1612        consistent_provider.canonical_in_memory_state.update_chain(chain);
1613        let consistent_provider = provider.consistent_provider()?;
1614
1615        // Now the block should be found in memory
1616        assert_eq!(
1617            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1618            Some(first_in_mem_block.clone().into_block())
1619        );
1620        assert_eq!(
1621            consistent_provider
1622                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1623            Some(first_in_mem_block.clone().into_block())
1624        );
1625
1626        // Find the first block in database by hash
1627        assert_eq!(
1628            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1629            Some(first_db_block.clone().into_block())
1630        );
1631        assert_eq!(
1632            consistent_provider
1633                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1634            Some(first_db_block.clone().into_block())
1635        );
1636
1637        // No pending block in database
1638        assert_eq!(
1639            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1640            None
1641        );
1642
1643        // Insert the last block into the pending state
1644        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1645            recovered_block: Arc::new(RecoveredBlock::new_sealed(
1646                last_in_mem_block.clone(),
1647                Default::default(),
1648            )),
1649            ..Default::default()
1650        });
1651
1652        // Now the last block should be found in memory
1653        assert_eq!(
1654            consistent_provider
1655                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1656            Some(last_in_mem_block.clone_block())
1657        );
1658
1659        Ok(())
1660    }
1661
1662    #[test]
1663    fn test_block_reader_block() -> eyre::Result<()> {
1664        // Initialize random number generator and provider factory
1665        let mut rng = generators::rng();
1666        let factory = create_test_provider_factory();
1667
1668        // Generate 10 random blocks and split into database and in-memory blocks
1669        let blocks = random_block_range(
1670            &mut rng,
1671            0..=10,
1672            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1673        );
1674        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1675
1676        // Insert first 5 blocks into the database
1677        let provider_rw = factory.provider_rw()?;
1678        for block in database_blocks {
1679            provider_rw.insert_block(
1680                block.clone().try_recover().expect("failed to seal block with senders"),
1681            )?;
1682        }
1683        provider_rw.commit()?;
1684
1685        // Create a new provider
1686        let provider = BlockchainProvider::new(factory)?;
1687        let consistent_provider = provider.consistent_provider()?;
1688
1689        // First in memory block
1690        let first_in_mem_block = in_memory_blocks.first().unwrap();
1691        // First database block
1692        let first_db_block = database_blocks.first().unwrap();
1693
1694        // First in memory block should not be found yet as not integrated to the in-memory state
1695        assert_eq!(
1696            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1697            None
1698        );
1699        assert_eq!(
1700            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1701            None
1702        );
1703
1704        // Insert first block into the in-memory state
1705        let in_memory_block_senders =
1706            first_in_mem_block.senders().expect("failed to recover senders");
1707        let chain = NewCanonicalChain::Commit {
1708            new: vec![ExecutedBlock {
1709                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1710                    first_in_mem_block.clone(),
1711                    in_memory_block_senders,
1712                )),
1713                ..Default::default()
1714            }],
1715        };
1716        consistent_provider.canonical_in_memory_state.update_chain(chain);
1717
1718        let consistent_provider = provider.consistent_provider()?;
1719
1720        // First in memory block should be found
1721        assert_eq!(
1722            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1723            Some(first_in_mem_block.clone().into_block())
1724        );
1725        assert_eq!(
1726            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1727            Some(first_in_mem_block.clone().into_block())
1728        );
1729
1730        // First database block should be found
1731        assert_eq!(
1732            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1733            Some(first_db_block.clone().into_block())
1734        );
1735        assert_eq!(
1736            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1737            Some(first_db_block.clone().into_block())
1738        );
1739
1740        Ok(())
1741    }
1742
1743    #[test]
1744    fn test_changeset_reader() -> eyre::Result<()> {
1745        let mut rng = generators::rng();
1746
1747        let (database_blocks, in_memory_blocks) =
1748            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1749
1750        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1751        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1752        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1753
1754        let accounts = random_eoa_accounts(&mut rng, 2);
1755
1756        let (database_changesets, database_state) = random_changeset_range(
1757            &mut rng,
1758            &database_blocks,
1759            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1760            0..0,
1761            0..0,
1762        );
1763        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1764            &mut rng,
1765            &in_memory_blocks,
1766            database_state
1767                .iter()
1768                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1769            0..0,
1770            0..0,
1771        );
1772
1773        let factory = create_test_provider_factory();
1774
1775        let provider_rw = factory.provider_rw()?;
1776        provider_rw.append_blocks_with_state(
1777            database_blocks
1778                .into_iter()
1779                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1780                .collect(),
1781            &ExecutionOutcome {
1782                bundle: BundleState::new(
1783                    database_state.into_iter().map(|(address, (account, _))| {
1784                        (address, None, Some(account.into()), Default::default())
1785                    }),
1786                    database_changesets
1787                        .iter()
1788                        .map(|block_changesets| {
1789                            block_changesets.iter().map(|(address, account, _)| {
1790                                (*address, Some(Some((*account).into())), [])
1791                            })
1792                        })
1793                        .collect::<Vec<_>>(),
1794                    Vec::new(),
1795                ),
1796                first_block: first_database_block,
1797                ..Default::default()
1798            },
1799            Default::default(),
1800        )?;
1801        provider_rw.commit()?;
1802
1803        let provider = BlockchainProvider::new(factory)?;
1804
1805        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1806        let chain = NewCanonicalChain::Commit {
1807            new: vec![in_memory_blocks
1808                .first()
1809                .map(|block| {
1810                    let senders = block.senders().expect("failed to recover senders");
1811                    ExecutedBlock {
1812                        recovered_block: Arc::new(RecoveredBlock::new_sealed(
1813                            block.clone(),
1814                            senders,
1815                        )),
1816                        execution_output: Arc::new(ExecutionOutcome {
1817                            bundle: BundleState::new(
1818                                in_memory_state.into_iter().map(|(address, (account, _))| {
1819                                    (address, None, Some(account.into()), Default::default())
1820                                }),
1821                                [in_memory_changesets.iter().map(|(address, account, _)| {
1822                                    (*address, Some(Some((*account).into())), Vec::new())
1823                                })],
1824                                [],
1825                            ),
1826                            first_block: first_in_memory_block,
1827                            ..Default::default()
1828                        }),
1829                        ..Default::default()
1830                    }
1831                })
1832                .unwrap()],
1833        };
1834        provider.canonical_in_memory_state.update_chain(chain);
1835
1836        let consistent_provider = provider.consistent_provider()?;
1837
1838        assert_eq!(
1839            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1840            database_changesets
1841                .into_iter()
1842                .next_back()
1843                .unwrap()
1844                .into_iter()
1845                .sorted_by_key(|(address, _, _)| *address)
1846                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1847                .collect::<Vec<_>>()
1848        );
1849        assert_eq!(
1850            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1851            in_memory_changesets
1852                .into_iter()
1853                .sorted_by_key(|(address, _, _)| *address)
1854                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1855                .collect::<Vec<_>>()
1856        );
1857
1858        Ok(())
1859    }
1860}