reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4    BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7    TransactionsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11    eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12    HashOrNumber,
13};
14use alloy_primitives::{
15    map::{hash_map, HashMap},
16    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{
24    Account, BlockBody, RecoveredBlock, SealedBlock, SealedHeader, StorageEntry,
25};
26use reth_prune_types::{PruneCheckpoint, PruneSegment};
27use reth_stages_types::{StageCheckpoint, StageId};
28use reth_storage_api::{
29    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30    StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36    sync::Arc,
37};
38use tracing::trace;
39
40/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
41/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
42/// this provider.
43///
44/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
45/// time-out.
46#[derive(Debug)]
47#[doc(hidden)] // triggers ICE for `cargo docs`
48pub struct ConsistentProvider<N: ProviderNodeTypes> {
49    /// Storage provider.
50    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51    /// Head block at time of [`Self`] creation
52    head_block: Option<Arc<BlockState<N::Primitives>>>,
53    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
54    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
59    ///
60    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
61    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
62    /// view of memory and database.
63    pub fn new(
64        storage_provider_factory: ProviderFactory<N>,
65        state: CanonicalInMemoryState<N::Primitives>,
66    ) -> ProviderResult<Self> {
67        // Each one provides a snapshot at the time of instantiation, but its order matters.
68        //
69        // If we acquire first the database provider, it's possible that before the in-memory chain
70        // snapshot is instantiated, it will flush blocks to disk. This would
71        // mean that our database provider would not have access to the flushed blocks (since it's
72        // working under an older view), while the in-memory state may have deleted them
73        // entirely. Resulting in gaps on the range.
74        let head_block = state.head_state();
75        let storage_provider = storage_provider_factory.database_provider_ro()?;
76        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77    }
78
79    // Helper function to convert range bounds
80    fn convert_range_bounds<T>(
81        &self,
82        range: impl RangeBounds<T>,
83        end_unbounded: impl FnOnce() -> T,
84    ) -> (T, T)
85    where
86        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87    {
88        let start = match range.start_bound() {
89            Bound::Included(&n) => n,
90            Bound::Excluded(&n) => n + T::from(1u8),
91            Bound::Unbounded => T::from(0u8),
92        };
93
94        let end = match range.end_bound() {
95            Bound::Included(&n) => n,
96            Bound::Excluded(&n) => n - T::from(1u8),
97            Bound::Unbounded => end_unbounded(),
98        };
99
100        (start, end)
101    }
102
103    /// Storage provider for latest block
104    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105        trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107        // use latest state provider if the head state exists
108        if let Some(state) = &self.head_block {
109            trace!(target: "providers::blockchain", "Using head state for latest state provider");
110            Ok(self.block_state_provider_ref(state)?.boxed())
111        } else {
112            trace!(target: "providers::blockchain", "Using database state for latest state provider");
113            Ok(self.storage_provider.latest())
114        }
115    }
116
117    fn history_by_block_hash_ref<'a>(
118        &'a self,
119        block_hash: BlockHash,
120    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123        self.get_in_memory_or_storage_by_block(
124            block_hash.into(),
125            |_| self.storage_provider.history_by_block_hash(block_hash),
126            |block_state| {
127                let state_provider = self.block_state_provider_ref(block_state)?;
128                Ok(Box::new(state_provider))
129            },
130        )
131    }
132
133    /// Returns a state provider indexed by the given block number or tag.
134    fn state_by_block_number_ref<'a>(
135        &'a self,
136        number: BlockNumber,
137    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138        let hash =
139            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140        self.history_by_block_hash_ref(hash)
141    }
142
143    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
144    ///
145    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
146    pub fn get_state(
147        &self,
148        range: RangeInclusive<BlockNumber>,
149    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150        if range.is_empty() {
151            return Ok(None)
152        }
153        let start_block_number = *range.start();
154        let end_block_number = *range.end();
155
156        // We are not removing block meta as it is used to get block changesets.
157        let mut block_bodies = Vec::new();
158        for block_num in range.clone() {
159            let block_body = self
160                .block_body_indices(block_num)?
161                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162            block_bodies.push((block_num, block_body))
163        }
164
165        // get transaction receipts
166        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167        else {
168            return Ok(None)
169        };
170        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171            return Ok(None)
172        };
173
174        let mut account_changeset = Vec::new();
175        for block_num in range.clone() {
176            let changeset =
177                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178            account_changeset.extend(changeset);
179        }
180
181        let mut storage_changeset = Vec::new();
182        for block_num in range {
183            let changeset = self.storage_changeset(block_num)?;
184            storage_changeset.extend(changeset);
185        }
186
187        let (state, reverts) =
188            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190        let mut receipt_iter =
191            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193        let mut receipts = Vec::with_capacity(block_bodies.len());
194        // loop break if we are at the end of the blocks.
195        for (_, block_body) in block_bodies {
196            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197            for tx_num in block_body.tx_num_range() {
198                let receipt = receipt_iter
199                    .next()
200                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201                block_receipts.push(receipt);
202            }
203            receipts.push(block_receipts);
204        }
205
206        Ok(Some(ExecutionOutcome::new_init(
207            state,
208            reverts,
209            // We skip new contracts since we never delete them from the database
210            Vec::new(),
211            receipts,
212            start_block_number,
213            Vec::new(),
214        )))
215    }
216
217    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
218    /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
219    /// storage and account changesets.
220    fn populate_bundle_state(
221        &self,
222        account_changeset: Vec<(u64, AccountBeforeTx)>,
223        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224        block_range_end: BlockNumber,
225    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226        let mut state: BundleStateInit = HashMap::default();
227        let mut reverts: RevertsInit = HashMap::default();
228        let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230        // add account changeset changes
231        for (block_number, account_before) in account_changeset.into_iter().rev() {
232            let AccountBeforeTx { info: old_info, address } = account_before;
233            match state.entry(address) {
234                hash_map::Entry::Vacant(entry) => {
235                    let new_info = state_provider.basic_account(&address)?;
236                    entry.insert((old_info, new_info, HashMap::default()));
237                }
238                hash_map::Entry::Occupied(mut entry) => {
239                    // overwrite old account state.
240                    entry.get_mut().0 = old_info;
241                }
242            }
243            // insert old info into reverts.
244            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245        }
246
247        // add storage changeset changes
248        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249            let BlockNumberAddress((block_number, address)) = block_and_address;
250            // get account state or insert from plain state.
251            let account_state = match state.entry(address) {
252                hash_map::Entry::Vacant(entry) => {
253                    let present_info = state_provider.basic_account(&address)?;
254                    entry.insert((present_info, present_info, HashMap::default()))
255                }
256                hash_map::Entry::Occupied(entry) => entry.into_mut(),
257            };
258
259            // match storage.
260            match account_state.2.entry(old_storage.key) {
261                hash_map::Entry::Vacant(entry) => {
262                    let new_storage_value =
263                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264                    entry.insert((old_storage.value, new_storage_value));
265                }
266                hash_map::Entry::Occupied(mut entry) => {
267                    entry.get_mut().0 = old_storage.value;
268                }
269            };
270
271            reverts
272                .entry(block_number)
273                .or_default()
274                .entry(address)
275                .or_default()
276                .1
277                .push(old_storage);
278        }
279
280        Ok((state, reverts))
281    }
282
283    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
284    /// is met.
285    ///
286    /// Creates a snapshot of the in-memory chain state and database provider to prevent
287    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
288    /// recent in-memory blocks in case of overlaps.
289    ///
290    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
291    ///   user to retrieve the required items from the database using [`RangeInclusive`].
292    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
293    ///   state, allowing for selection or filtering for the desired data.
294    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295        &self,
296        range: impl RangeBounds<BlockNumber>,
297        fetch_db_range: F,
298        map_block_state_item: G,
299        mut predicate: P,
300    ) -> ProviderResult<Vec<T>>
301    where
302        F: FnOnce(
303            &DatabaseProviderRO<N::DB, N>,
304            RangeInclusive<BlockNumber>,
305            &mut P,
306        ) -> ProviderResult<Vec<T>>,
307        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308        P: FnMut(&T) -> bool,
309    {
310        // Each one provides a snapshot at the time of instantiation, but its order matters.
311        //
312        // If we acquire first the database provider, it's possible that before the in-memory chain
313        // snapshot is instantiated, it will flush blocks to disk. This would
314        // mean that our database provider would not have access to the flushed blocks (since it's
315        // working under an older view), while the in-memory state may have deleted them
316        // entirely. Resulting in gaps on the range.
317        let mut in_memory_chain =
318            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319        let db_provider = &self.storage_provider;
320
321        let (start, end) = self.convert_range_bounds(range, || {
322            // the first block is the highest one.
323            in_memory_chain
324                .first()
325                .map(|b| b.number())
326                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327        });
328
329        if start > end {
330            return Ok(vec![])
331        }
332
333        // Split range into storage_range and in-memory range. If the in-memory range is not
334        // necessary drop it early.
335        //
336        // The last block of `in_memory_chain` is the lowest block number.
337        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338            Some(lowest_memory_block) if lowest_memory_block <= end => {
339                let highest_memory_block =
340                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342                // Database will for a time overlap with in-memory-chain blocks. In
343                // case of a re-org, it can mean that the database blocks are of a forked chain, and
344                // so, we should prioritize the in-memory overlapped blocks.
345                let in_memory_range =
346                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348                // If requested range is in the middle of the in-memory range, remove the necessary
349                // lowest blocks
350                in_memory_chain.truncate(
351                    in_memory_chain
352                        .len()
353                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354                );
355
356                let storage_range =
357                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359                (Some((in_memory_chain, in_memory_range)), storage_range)
360            }
361            _ => {
362                // Drop the in-memory chain so we don't hold blocks in memory.
363                drop(in_memory_chain);
364
365                (None, Some(start..=end))
366            }
367        };
368
369        let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371        if let Some(storage_range) = storage_range {
372            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373            items.append(&mut db_items);
374
375            // The predicate was not met, if the number of items differs from the expected. So, we
376            // return what we have.
377            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378                return Ok(items)
379            }
380        }
381
382        if let Some((in_memory_chain, in_memory_range)) = in_memory {
383            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384                debug_assert!(num == block.number());
385                if let Some(item) = map_block_state_item(block, &mut predicate) {
386                    items.push(item);
387                } else {
388                    break
389                }
390            }
391        }
392
393        Ok(items)
394    }
395
396    /// This uses a given [`BlockState`] to initialize a state provider for that block.
397    fn block_state_provider_ref(
398        &self,
399        state: &BlockState<N::Primitives>,
400    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401        let anchor_hash = state.anchor().hash;
402        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405    }
406
407    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
408    ///
409    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
410    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
411    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
412    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413        &self,
414        range: impl RangeBounds<BlockNumber>,
415        fetch_from_db: S,
416        fetch_from_block_state: M,
417    ) -> ProviderResult<Vec<R>>
418    where
419        S: FnOnce(
420            &DatabaseProviderRO<N::DB, N>,
421            RangeInclusive<TxNumber>,
422        ) -> ProviderResult<Vec<R>>,
423        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424    {
425        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426        let provider = &self.storage_provider;
427
428        // Get the last block number stored in the storage which does NOT overlap with in-memory
429        // chain.
430        let last_database_block_number = in_mem_chain
431            .last()
432            .map(|b| Ok(b.anchor().number))
433            .unwrap_or_else(|| provider.last_block_number())?;
434
435        // Get the next tx number for the last block stored in the storage, which marks the start of
436        // the in-memory state.
437        let last_block_body_index = provider
438            .block_body_indices(last_database_block_number)?
439            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442        let (start, end) = self.convert_range_bounds(range, || {
443            in_mem_chain
444                .iter()
445                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446                .sum::<u64>() +
447                last_block_body_index.last_tx_num()
448        });
449
450        if start > end {
451            return Ok(vec![])
452        }
453
454        let mut tx_range = start..=end;
455
456        // If the range is entirely before the first in-memory transaction number, fetch from
457        // storage
458        if *tx_range.end() < in_memory_tx_num {
459            return fetch_from_db(provider, tx_range);
460        }
461
462        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464        // If the range spans storage and memory, get elements from storage first.
465        if *tx_range.start() < in_memory_tx_num {
466            // Determine the range that needs to be fetched from storage.
467            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469            // Set the remaining transaction range for in-memory
470            tx_range = in_memory_tx_num..=*tx_range.end();
471
472            items.extend(fetch_from_db(provider, db_range)?);
473        }
474
475        // Iterate from the lowest block to the highest in-memory chain
476        for block_state in in_mem_chain.iter().rev() {
477            let block_tx_count =
478                block_state.block_ref().recovered_block().body().transactions().len();
479            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481            // If the transaction range start is equal or higher than the next block first
482            // transaction, advance
483            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484                in_memory_tx_num += block_tx_count as u64;
485                continue
486            }
487
488            // This should only be more than 0 once, in case of a partial range inside a block.
489            let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491            items.extend(fetch_from_block_state(
492                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493                block_state,
494            )?);
495
496            in_memory_tx_num += block_tx_count as u64;
497
498            // Break if the range has been fully processed
499            if in_memory_tx_num > *tx_range.end() {
500                break
501            }
502
503            // Set updated range
504            tx_range = in_memory_tx_num..=*tx_range.end();
505        }
506
507        Ok(items)
508    }
509
510    /// Fetches data from either in-memory state or persistent storage by transaction
511    /// [`HashOrNumber`].
512    fn get_in_memory_or_storage_by_tx<S, M, R>(
513        &self,
514        id: HashOrNumber,
515        fetch_from_db: S,
516        fetch_from_block_state: M,
517    ) -> ProviderResult<Option<R>>
518    where
519        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521    {
522        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523        let provider = &self.storage_provider;
524
525        // Get the last block number stored in the database which does NOT overlap with in-memory
526        // chain.
527        let last_database_block_number = in_mem_chain
528            .last()
529            .map(|b| Ok(b.anchor().number))
530            .unwrap_or_else(|| provider.last_block_number())?;
531
532        // Get the next tx number for the last block stored in the database and consider it the
533        // first tx number of the in-memory state
534        let last_block_body_index = provider
535            .block_body_indices(last_database_block_number)?
536            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539        // If the transaction number is less than the first in-memory transaction number, make a
540        // database lookup
541        if let HashOrNumber::Number(id) = id {
542            if id < in_memory_tx_num {
543                return fetch_from_db(provider)
544            }
545        }
546
547        // Iterate from the lowest block to the highest
548        for block_state in in_mem_chain.iter().rev() {
549            let executed_block = block_state.block_ref();
550            let block = executed_block.recovered_block();
551
552            for tx_index in 0..block.body().transactions().len() {
553                match id {
554                    HashOrNumber::Hash(tx_hash) => {
555                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557                        }
558                    }
559                    HashOrNumber::Number(id) => {
560                        if id == in_memory_tx_num {
561                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562                        }
563                    }
564                }
565
566                in_memory_tx_num += 1;
567            }
568        }
569
570        // Not found in-memory, so check database.
571        if let HashOrNumber::Hash(_) = id {
572            return fetch_from_db(provider)
573        }
574
575        Ok(None)
576    }
577
578    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
579    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580        &self,
581        id: BlockHashOrNumber,
582        fetch_from_db: S,
583        fetch_from_block_state: M,
584    ) -> ProviderResult<R>
585    where
586        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588    {
589        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590            return fetch_from_block_state(block_state)
591        }
592        fetch_from_db(&self.storage_provider)
593    }
594
595    /// Consumes the provider and returns a state provider for the specific block hash.
596    pub(crate) fn into_state_provider_at_block_hash(
597        self,
598        block_hash: BlockHash,
599    ) -> ProviderResult<Box<dyn StateProvider>> {
600        let Self { storage_provider, head_block, .. } = self;
601        let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
602            let block_number = storage_provider
603                .block_number(block_hash)?
604                .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
605            storage_provider.try_into_history_at_block(block_number)
606        };
607        if let Some(Some(block_state)) =
608            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
609        {
610            let anchor_hash = block_state.anchor().hash;
611            let latest_historical = into_history_at_block_hash(anchor_hash)?;
612            return Ok(Box::new(block_state.state_provider(latest_historical)));
613        }
614        into_history_at_block_hash(block_hash)
615    }
616}
617
618impl<N: ProviderNodeTypes> ConsistentProvider<N> {
619    /// Ensures that the given block number is canonical (synced)
620    ///
621    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
622    /// out of range and would lead to invalid results, mainly during initial sync.
623    ///
624    /// Verifying the `block_number` would be expensive since we need to lookup sync table
625    /// Instead, we ensure that the `block_number` is within the range of the
626    /// [`Self::best_block_number`] which is updated when a block is synced.
627    #[inline]
628    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
629        let latest = self.best_block_number()?;
630        if block_number > latest {
631            Err(ProviderError::HeaderNotFound(block_number.into()))
632        } else {
633            Ok(())
634        }
635    }
636}
637
638impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
639    type Primitives = N::Primitives;
640}
641
642impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
643    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
644        self.storage_provider.static_file_provider()
645    }
646}
647
648impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
649    type Header = HeaderTy<N>;
650
651    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
652        self.get_in_memory_or_storage_by_block(
653            (*block_hash).into(),
654            |db_provider| db_provider.header(block_hash),
655            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
656        )
657    }
658
659    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
660        self.get_in_memory_or_storage_by_block(
661            num.into(),
662            |db_provider| db_provider.header_by_number(num),
663            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
664        )
665    }
666
667    fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
668        if let Some(num) = self.block_number(*hash)? {
669            self.header_td_by_number(num)
670        } else {
671            Ok(None)
672        }
673    }
674
675    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
676        let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
677        {
678            // If the block exists in memory, we should return a TD for it.
679            //
680            // The canonical in memory state should only store post-merge blocks. Post-merge blocks
681            // have zero difficulty. This means we can use the total difficulty for the last
682            // finalized block number if present (so that we are not affected by reorgs), if not the
683            // last number in the database will be used.
684            if let Some(last_finalized_num_hash) =
685                self.canonical_in_memory_state.get_finalized_num_hash()
686            {
687                last_finalized_num_hash.number
688            } else {
689                self.last_block_number()?
690            }
691        } else {
692            // Otherwise, return what we have on disk for the input block
693            number
694        };
695        self.storage_provider.header_td_by_number(number)
696    }
697
698    fn headers_range(
699        &self,
700        range: impl RangeBounds<BlockNumber>,
701    ) -> ProviderResult<Vec<Self::Header>> {
702        self.get_in_memory_or_storage_by_block_range_while(
703            range,
704            |db_provider, range, _| db_provider.headers_range(range),
705            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
706            |_| true,
707        )
708    }
709
710    fn sealed_header(
711        &self,
712        number: BlockNumber,
713    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
714        self.get_in_memory_or_storage_by_block(
715            number.into(),
716            |db_provider| db_provider.sealed_header(number),
717            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
718        )
719    }
720
721    fn sealed_headers_range(
722        &self,
723        range: impl RangeBounds<BlockNumber>,
724    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
725        self.get_in_memory_or_storage_by_block_range_while(
726            range,
727            |db_provider, range, _| db_provider.sealed_headers_range(range),
728            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
729            |_| true,
730        )
731    }
732
733    fn sealed_headers_while(
734        &self,
735        range: impl RangeBounds<BlockNumber>,
736        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
737    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
738        self.get_in_memory_or_storage_by_block_range_while(
739            range,
740            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
741            |block_state, predicate| {
742                let header = block_state.block_ref().recovered_block().sealed_header();
743                predicate(header).then(|| header.clone())
744            },
745            predicate,
746        )
747    }
748}
749
750impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
751    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
752        self.get_in_memory_or_storage_by_block(
753            number.into(),
754            |db_provider| db_provider.block_hash(number),
755            |block_state| Ok(Some(block_state.hash())),
756        )
757    }
758
759    fn canonical_hashes_range(
760        &self,
761        start: BlockNumber,
762        end: BlockNumber,
763    ) -> ProviderResult<Vec<B256>> {
764        self.get_in_memory_or_storage_by_block_range_while(
765            start..end,
766            |db_provider, inclusive_range, _| {
767                db_provider
768                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
769            },
770            |block_state, _| Some(block_state.hash()),
771            |_| true,
772        )
773    }
774}
775
776impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
777    fn chain_info(&self) -> ProviderResult<ChainInfo> {
778        let best_number = self.best_block_number()?;
779        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
780    }
781
782    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
783        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
784    }
785
786    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
787        self.storage_provider.last_block_number()
788    }
789
790    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
791        self.get_in_memory_or_storage_by_block(
792            hash.into(),
793            |db_provider| db_provider.block_number(hash),
794            |block_state| Ok(Some(block_state.number())),
795        )
796    }
797}
798
799impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
800    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
801        Ok(self.canonical_in_memory_state.pending_block_num_hash())
802    }
803
804    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
805        Ok(self.canonical_in_memory_state.get_safe_num_hash())
806    }
807
808    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
809        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
810    }
811}
812
813impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
814    type Block = BlockTy<N>;
815
816    fn find_block_by_hash(
817        &self,
818        hash: B256,
819        source: BlockSource,
820    ) -> ProviderResult<Option<Self::Block>> {
821        if matches!(source, BlockSource::Canonical | BlockSource::Any) {
822            if let Some(block) = self.get_in_memory_or_storage_by_block(
823                hash.into(),
824                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
825                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
826            )? {
827                return Ok(Some(block))
828            }
829        }
830
831        if matches!(source, BlockSource::Pending | BlockSource::Any) {
832            return Ok(self
833                .canonical_in_memory_state
834                .pending_block()
835                .filter(|b| b.hash() == hash)
836                .map(|b| b.into_block()))
837        }
838
839        Ok(None)
840    }
841
842    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
843        self.get_in_memory_or_storage_by_block(
844            id,
845            |db_provider| db_provider.block(id),
846            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
847        )
848    }
849
850    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
851        Ok(self.canonical_in_memory_state.pending_recovered_block())
852    }
853
854    fn pending_block_and_receipts(
855        &self,
856    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
857        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
858    }
859
860    /// Returns the block with senders with matching number or hash from database.
861    ///
862    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
863    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
864    ///
865    /// Returns `None` if block is not found.
866    fn recovered_block(
867        &self,
868        id: BlockHashOrNumber,
869        transaction_kind: TransactionVariant,
870    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
871        self.get_in_memory_or_storage_by_block(
872            id,
873            |db_provider| db_provider.recovered_block(id, transaction_kind),
874            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
875        )
876    }
877
878    fn sealed_block_with_senders(
879        &self,
880        id: BlockHashOrNumber,
881        transaction_kind: TransactionVariant,
882    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
883        self.get_in_memory_or_storage_by_block(
884            id,
885            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
886            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
887        )
888    }
889
890    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
891        self.get_in_memory_or_storage_by_block_range_while(
892            range,
893            |db_provider, range, _| db_provider.block_range(range),
894            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
895            |_| true,
896        )
897    }
898
899    fn block_with_senders_range(
900        &self,
901        range: RangeInclusive<BlockNumber>,
902    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
903        self.get_in_memory_or_storage_by_block_range_while(
904            range,
905            |db_provider, range, _| db_provider.block_with_senders_range(range),
906            |block_state, _| Some(block_state.block().recovered_block().clone()),
907            |_| true,
908        )
909    }
910
911    fn recovered_block_range(
912        &self,
913        range: RangeInclusive<BlockNumber>,
914    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
915        self.get_in_memory_or_storage_by_block_range_while(
916            range,
917            |db_provider, range, _| db_provider.recovered_block_range(range),
918            |block_state, _| Some(block_state.block().recovered_block().clone()),
919            |_| true,
920        )
921    }
922}
923
924impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
925    type Transaction = TxTy<N>;
926
927    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
928        self.get_in_memory_or_storage_by_tx(
929            tx_hash.into(),
930            |db_provider| db_provider.transaction_id(tx_hash),
931            |_, tx_number, _| Ok(Some(tx_number)),
932        )
933    }
934
935    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
936        self.get_in_memory_or_storage_by_tx(
937            id.into(),
938            |provider| provider.transaction_by_id(id),
939            |tx_index, _, block_state| {
940                Ok(block_state
941                    .block_ref()
942                    .recovered_block()
943                    .body()
944                    .transactions()
945                    .get(tx_index)
946                    .cloned())
947            },
948        )
949    }
950
951    fn transaction_by_id_unhashed(
952        &self,
953        id: TxNumber,
954    ) -> ProviderResult<Option<Self::Transaction>> {
955        self.get_in_memory_or_storage_by_tx(
956            id.into(),
957            |provider| provider.transaction_by_id_unhashed(id),
958            |tx_index, _, block_state| {
959                Ok(block_state
960                    .block_ref()
961                    .recovered_block()
962                    .body()
963                    .transactions()
964                    .get(tx_index)
965                    .cloned())
966            },
967        )
968    }
969
970    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
971        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
972            return Ok(Some(tx))
973        }
974
975        self.storage_provider.transaction_by_hash(hash)
976    }
977
978    fn transaction_by_hash_with_meta(
979        &self,
980        tx_hash: TxHash,
981    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
982        if let Some((tx, meta)) =
983            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
984        {
985            return Ok(Some((tx, meta)))
986        }
987
988        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
989    }
990
991    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
992        self.get_in_memory_or_storage_by_tx(
993            id.into(),
994            |provider| provider.transaction_block(id),
995            |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
996        )
997    }
998
999    fn transactions_by_block(
1000        &self,
1001        id: BlockHashOrNumber,
1002    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1003        self.get_in_memory_or_storage_by_block(
1004            id,
1005            |provider| provider.transactions_by_block(id),
1006            |block_state| {
1007                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
1008            },
1009        )
1010    }
1011
1012    fn transactions_by_block_range(
1013        &self,
1014        range: impl RangeBounds<BlockNumber>,
1015    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1016        self.get_in_memory_or_storage_by_block_range_while(
1017            range,
1018            |db_provider, range, _| db_provider.transactions_by_block_range(range),
1019            |block_state, _| {
1020                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1021            },
1022            |_| true,
1023        )
1024    }
1025
1026    fn transactions_by_tx_range(
1027        &self,
1028        range: impl RangeBounds<TxNumber>,
1029    ) -> ProviderResult<Vec<Self::Transaction>> {
1030        self.get_in_memory_or_storage_by_tx_range(
1031            range,
1032            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1033            |index_range, block_state| {
1034                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1035                    .to_vec())
1036            },
1037        )
1038    }
1039
1040    fn senders_by_tx_range(
1041        &self,
1042        range: impl RangeBounds<TxNumber>,
1043    ) -> ProviderResult<Vec<Address>> {
1044        self.get_in_memory_or_storage_by_tx_range(
1045            range,
1046            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1047            |index_range, block_state| {
1048                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1049            },
1050        )
1051    }
1052
1053    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1054        self.get_in_memory_or_storage_by_tx(
1055            id.into(),
1056            |provider| provider.transaction_sender(id),
1057            |tx_index, _, block_state| {
1058                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1059            },
1060        )
1061    }
1062}
1063
1064impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1065    type Receipt = ReceiptTy<N>;
1066
1067    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1068        self.get_in_memory_or_storage_by_tx(
1069            id.into(),
1070            |provider| provider.receipt(id),
1071            |tx_index, _, block_state| {
1072                Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1073            },
1074        )
1075    }
1076
1077    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1078        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1079            let executed_block = block_state.block_ref();
1080            let block = executed_block.recovered_block();
1081            let receipts = block_state.executed_block_receipts();
1082
1083            // assuming 1:1 correspondence between transactions and receipts
1084            debug_assert_eq!(
1085                block.body().transactions().len(),
1086                receipts.len(),
1087                "Mismatch between transaction and receipt count"
1088            );
1089
1090            if let Some(tx_index) =
1091                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1092            {
1093                // safe to use tx_index for receipts due to 1:1 correspondence
1094                return Ok(receipts.get(tx_index).cloned());
1095            }
1096        }
1097
1098        self.storage_provider.receipt_by_hash(hash)
1099    }
1100
1101    fn receipts_by_block(
1102        &self,
1103        block: BlockHashOrNumber,
1104    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1105        self.get_in_memory_or_storage_by_block(
1106            block,
1107            |db_provider| db_provider.receipts_by_block(block),
1108            |block_state| Ok(Some(block_state.executed_block_receipts())),
1109        )
1110    }
1111
1112    fn receipts_by_tx_range(
1113        &self,
1114        range: impl RangeBounds<TxNumber>,
1115    ) -> ProviderResult<Vec<Self::Receipt>> {
1116        self.get_in_memory_or_storage_by_tx_range(
1117            range,
1118            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1119            |index_range, block_state| {
1120                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1121            },
1122        )
1123    }
1124
1125    fn receipts_by_block_range(
1126        &self,
1127        block_range: RangeInclusive<BlockNumber>,
1128    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1129        self.storage_provider.receipts_by_block_range(block_range)
1130    }
1131}
1132
1133impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1134    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1135        match block {
1136            BlockId::Hash(rpc_block_hash) => {
1137                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1138                if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1139                    if let Some(state) = self
1140                        .head_block
1141                        .as_ref()
1142                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1143                    {
1144                        receipts = Some(state.executed_block_receipts());
1145                    }
1146                }
1147                Ok(receipts)
1148            }
1149            BlockId::Number(num_tag) => match num_tag {
1150                BlockNumberOrTag::Pending => Ok(self
1151                    .canonical_in_memory_state
1152                    .pending_state()
1153                    .map(|block_state| block_state.executed_block_receipts())),
1154                _ => {
1155                    if let Some(num) = self.convert_block_number(num_tag)? {
1156                        self.receipts_by_block(num.into())
1157                    } else {
1158                        Ok(None)
1159                    }
1160                }
1161            },
1162        }
1163    }
1164}
1165
1166impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1167    fn block_body_indices(
1168        &self,
1169        number: BlockNumber,
1170    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1171        self.get_in_memory_or_storage_by_block(
1172            number.into(),
1173            |db_provider| db_provider.block_body_indices(number),
1174            |block_state| {
1175                // Find the last block indices on database
1176                let last_storage_block_number = block_state.anchor().number;
1177                let mut stored_indices = self
1178                    .storage_provider
1179                    .block_body_indices(last_storage_block_number)?
1180                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1181
1182                // Prepare our block indices
1183                stored_indices.first_tx_num = stored_indices.next_tx_num();
1184                stored_indices.tx_count = 0;
1185
1186                // Iterate from the lowest block in memory until our target block
1187                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1188                    let block_tx_count =
1189                        state.block_ref().recovered_block().body().transactions().len() as u64;
1190                    if state.block_ref().recovered_block().number() == number {
1191                        stored_indices.tx_count = block_tx_count;
1192                    } else {
1193                        stored_indices.first_tx_num += block_tx_count;
1194                    }
1195                }
1196
1197                Ok(Some(stored_indices))
1198            },
1199        )
1200    }
1201
1202    fn block_body_indices_range(
1203        &self,
1204        range: RangeInclusive<BlockNumber>,
1205    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1206        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1207    }
1208}
1209
1210impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1211    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1212        self.storage_provider.get_stage_checkpoint(id)
1213    }
1214
1215    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1216        self.storage_provider.get_stage_checkpoint_progress(id)
1217    }
1218
1219    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1220        self.storage_provider.get_all_checkpoints()
1221    }
1222}
1223
1224impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1225    fn get_prune_checkpoint(
1226        &self,
1227        segment: PruneSegment,
1228    ) -> ProviderResult<Option<PruneCheckpoint>> {
1229        self.storage_provider.get_prune_checkpoint(segment)
1230    }
1231
1232    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1233        self.storage_provider.get_prune_checkpoints()
1234    }
1235}
1236
1237impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1238    type ChainSpec = N::ChainSpec;
1239
1240    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1241        ChainSpecProvider::chain_spec(&self.storage_provider)
1242    }
1243}
1244
1245impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1246    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1247        match id {
1248            BlockId::Number(num) => self.block_by_number_or_tag(num),
1249            BlockId::Hash(hash) => {
1250                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1251                // so not at the provider level?
1252                // if we decide to do this at a higher level, then we can make this an automatic
1253                // trait impl
1254                if Some(true) == hash.require_canonical {
1255                    // check the database, canonical blocks are only stored in the database
1256                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1257                } else {
1258                    self.block_by_hash(hash.block_hash)
1259                }
1260            }
1261        }
1262    }
1263
1264    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1265        Ok(match id {
1266            BlockNumberOrTag::Latest => {
1267                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1268            }
1269            BlockNumberOrTag::Finalized => {
1270                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1271            }
1272            BlockNumberOrTag::Safe => {
1273                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1274            }
1275            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1276            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1277
1278            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1279        })
1280    }
1281
1282    fn sealed_header_by_number_or_tag(
1283        &self,
1284        id: BlockNumberOrTag,
1285    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1286        match id {
1287            BlockNumberOrTag::Latest => {
1288                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1289            }
1290            BlockNumberOrTag::Finalized => {
1291                Ok(self.canonical_in_memory_state.get_finalized_header())
1292            }
1293            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1294            BlockNumberOrTag::Earliest => self
1295                .header_by_number(self.earliest_block_number()?)?
1296                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1297            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1298            BlockNumberOrTag::Number(num) => self
1299                .header_by_number(num)?
1300                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1301        }
1302    }
1303
1304    fn sealed_header_by_id(
1305        &self,
1306        id: BlockId,
1307    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1308        Ok(match id {
1309            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1310            BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1311        })
1312    }
1313
1314    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1315        Ok(match id {
1316            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1317            BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1318        })
1319    }
1320}
1321
1322impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1323    fn storage_changeset(
1324        &self,
1325        block_number: BlockNumber,
1326    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1327        if let Some(state) =
1328            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1329        {
1330            let changesets = state
1331                .block()
1332                .execution_output
1333                .bundle
1334                .reverts
1335                .clone()
1336                .to_plain_state_reverts()
1337                .storage
1338                .into_iter()
1339                .flatten()
1340                .flat_map(|revert: PlainStorageRevert| {
1341                    revert.storage_revert.into_iter().map(move |(key, value)| {
1342                        (
1343                            BlockNumberAddress((block_number, revert.address)),
1344                            StorageEntry { key: key.into(), value: value.to_previous_value() },
1345                        )
1346                    })
1347                })
1348                .collect();
1349            Ok(changesets)
1350        } else {
1351            // Perform checks on whether or not changesets exist for the block.
1352
1353            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1354            let storage_history_exists = self
1355                .storage_provider
1356                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1357                .and_then(|checkpoint| {
1358                    // return true if the block number is ahead of the prune checkpoint.
1359                    //
1360                    // The checkpoint stores the highest pruned block number, so we should make
1361                    // sure the block_number is strictly greater.
1362                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1363                })
1364                .unwrap_or(true);
1365
1366            if !storage_history_exists {
1367                return Err(ProviderError::StateAtBlockPruned(block_number))
1368            }
1369
1370            self.storage_provider.storage_changeset(block_number)
1371        }
1372    }
1373}
1374
1375impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1376    fn account_block_changeset(
1377        &self,
1378        block_number: BlockNumber,
1379    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1380        if let Some(state) =
1381            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1382        {
1383            let changesets = state
1384                .block_ref()
1385                .execution_output
1386                .bundle
1387                .reverts
1388                .clone()
1389                .to_plain_state_reverts()
1390                .accounts
1391                .into_iter()
1392                .flatten()
1393                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1394                .collect();
1395            Ok(changesets)
1396        } else {
1397            // Perform checks on whether or not changesets exist for the block.
1398
1399            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1400            let account_history_exists = self
1401                .storage_provider
1402                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1403                .and_then(|checkpoint| {
1404                    // return true if the block number is ahead of the prune checkpoint.
1405                    //
1406                    // The checkpoint stores the highest pruned block number, so we should make
1407                    // sure the block_number is strictly greater.
1408                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1409                })
1410                .unwrap_or(true);
1411
1412            if !account_history_exists {
1413                return Err(ProviderError::StateAtBlockPruned(block_number))
1414            }
1415
1416            self.storage_provider.account_block_changeset(block_number)
1417        }
1418    }
1419}
1420
1421impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1422    /// Get basic account information.
1423    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1424        // use latest state provider
1425        let state_provider = self.latest_ref()?;
1426        state_provider.basic_account(address)
1427    }
1428}
1429
1430impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1431    type Receipt = ReceiptTy<N>;
1432
1433    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1434    ///
1435    /// If data for the block does not exist, this will return [`None`].
1436    ///
1437    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1438    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1439    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1440    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1441    /// first place.
1442    fn get_state(
1443        &self,
1444        block: BlockNumber,
1445    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1446        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1447            let state = state.block_ref().execution_outcome().clone();
1448            Ok(Some(state))
1449        } else {
1450            Self::get_state(self, block..=block)
1451        }
1452    }
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457    use crate::{
1458        providers::blockchain_provider::BlockchainProvider,
1459        test_utils::create_test_provider_factory, BlockWriter,
1460    };
1461    use alloy_eips::BlockHashOrNumber;
1462    use alloy_primitives::B256;
1463    use itertools::Itertools;
1464    use rand::Rng;
1465    use reth_chain_state::{
1466        ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates, NewCanonicalChain,
1467    };
1468    use reth_db_api::models::AccountBeforeTx;
1469    use reth_ethereum_primitives::Block;
1470    use reth_execution_types::ExecutionOutcome;
1471    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1472    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1473    use reth_testing_utils::generators::{
1474        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1475    };
1476    use revm_database::BundleState;
1477    use std::{
1478        ops::{Bound, Range, RangeBounds},
1479        sync::Arc,
1480    };
1481
1482    const TEST_BLOCKS_COUNT: usize = 5;
1483
1484    fn random_blocks(
1485        rng: &mut impl Rng,
1486        database_blocks: usize,
1487        in_memory_blocks: usize,
1488        requests_count: Option<Range<u8>>,
1489        withdrawals_count: Option<Range<u8>>,
1490        tx_count: impl RangeBounds<u8>,
1491    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1492        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1493
1494        let tx_start = match tx_count.start_bound() {
1495            Bound::Included(&n) | Bound::Excluded(&n) => n,
1496            Bound::Unbounded => u8::MIN,
1497        };
1498        let tx_end = match tx_count.end_bound() {
1499            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1500            Bound::Unbounded => u8::MAX,
1501        };
1502
1503        let blocks = random_block_range(
1504            rng,
1505            0..=block_range,
1506            BlockRangeParams {
1507                parent: Some(B256::ZERO),
1508                tx_count: tx_start..tx_end,
1509                requests_count,
1510                withdrawals_count,
1511            },
1512        );
1513        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1514        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1515    }
1516
1517    #[test]
1518    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1519        // Initialize random number generator and provider factory
1520        let mut rng = generators::rng();
1521        let factory = create_test_provider_factory();
1522
1523        // Generate 10 random blocks and split into database and in-memory blocks
1524        let blocks = random_block_range(
1525            &mut rng,
1526            0..=10,
1527            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1528        );
1529        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1530
1531        // Insert first 5 blocks into the database
1532        let provider_rw = factory.provider_rw()?;
1533        for block in database_blocks {
1534            provider_rw.insert_historical_block(
1535                block.clone().try_recover().expect("failed to seal block with senders"),
1536            )?;
1537        }
1538        provider_rw.commit()?;
1539
1540        // Create a new provider
1541        let provider = BlockchainProvider::new(factory)?;
1542        let consistent_provider = provider.consistent_provider()?;
1543
1544        // Useful blocks
1545        let first_db_block = database_blocks.first().unwrap();
1546        let first_in_mem_block = in_memory_blocks.first().unwrap();
1547        let last_in_mem_block = in_memory_blocks.last().unwrap();
1548
1549        // No block in memory before setting in memory state
1550        assert_eq!(
1551            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1552            None
1553        );
1554        assert_eq!(
1555            consistent_provider
1556                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1557            None
1558        );
1559        // No pending block in memory
1560        assert_eq!(
1561            consistent_provider
1562                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1563            None
1564        );
1565
1566        // Insert first block into the in-memory state
1567        let in_memory_block_senders =
1568            first_in_mem_block.senders().expect("failed to recover senders");
1569        let chain = NewCanonicalChain::Commit {
1570            new: vec![ExecutedBlockWithTrieUpdates::new(
1571                Arc::new(RecoveredBlock::new_sealed(
1572                    first_in_mem_block.clone(),
1573                    in_memory_block_senders,
1574                )),
1575                Default::default(),
1576                Default::default(),
1577                ExecutedTrieUpdates::empty(),
1578            )],
1579        };
1580        consistent_provider.canonical_in_memory_state.update_chain(chain);
1581        let consistent_provider = provider.consistent_provider()?;
1582
1583        // Now the block should be found in memory
1584        assert_eq!(
1585            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1586            Some(first_in_mem_block.clone().into_block())
1587        );
1588        assert_eq!(
1589            consistent_provider
1590                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1591            Some(first_in_mem_block.clone().into_block())
1592        );
1593
1594        // Find the first block in database by hash
1595        assert_eq!(
1596            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1597            Some(first_db_block.clone().into_block())
1598        );
1599        assert_eq!(
1600            consistent_provider
1601                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1602            Some(first_db_block.clone().into_block())
1603        );
1604
1605        // No pending block in database
1606        assert_eq!(
1607            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1608            None
1609        );
1610
1611        // Insert the last block into the pending state
1612        provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1613            block: ExecutedBlock {
1614                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1615                    last_in_mem_block.clone(),
1616                    Default::default(),
1617                )),
1618                execution_output: Default::default(),
1619                hashed_state: Default::default(),
1620            },
1621            trie: ExecutedTrieUpdates::empty(),
1622        });
1623
1624        // Now the last block should be found in memory
1625        assert_eq!(
1626            consistent_provider
1627                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1628            Some(last_in_mem_block.clone_block())
1629        );
1630
1631        Ok(())
1632    }
1633
1634    #[test]
1635    fn test_block_reader_block() -> eyre::Result<()> {
1636        // Initialize random number generator and provider factory
1637        let mut rng = generators::rng();
1638        let factory = create_test_provider_factory();
1639
1640        // Generate 10 random blocks and split into database and in-memory blocks
1641        let blocks = random_block_range(
1642            &mut rng,
1643            0..=10,
1644            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1645        );
1646        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1647
1648        // Insert first 5 blocks into the database
1649        let provider_rw = factory.provider_rw()?;
1650        for block in database_blocks {
1651            provider_rw.insert_historical_block(
1652                block.clone().try_recover().expect("failed to seal block with senders"),
1653            )?;
1654        }
1655        provider_rw.commit()?;
1656
1657        // Create a new provider
1658        let provider = BlockchainProvider::new(factory)?;
1659        let consistent_provider = provider.consistent_provider()?;
1660
1661        // First in memory block
1662        let first_in_mem_block = in_memory_blocks.first().unwrap();
1663        // First database block
1664        let first_db_block = database_blocks.first().unwrap();
1665
1666        // First in memory block should not be found yet as not integrated to the in-memory state
1667        assert_eq!(
1668            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1669            None
1670        );
1671        assert_eq!(
1672            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1673            None
1674        );
1675
1676        // Insert first block into the in-memory state
1677        let in_memory_block_senders =
1678            first_in_mem_block.senders().expect("failed to recover senders");
1679        let chain = NewCanonicalChain::Commit {
1680            new: vec![ExecutedBlockWithTrieUpdates::new(
1681                Arc::new(RecoveredBlock::new_sealed(
1682                    first_in_mem_block.clone(),
1683                    in_memory_block_senders,
1684                )),
1685                Default::default(),
1686                Default::default(),
1687                ExecutedTrieUpdates::empty(),
1688            )],
1689        };
1690        consistent_provider.canonical_in_memory_state.update_chain(chain);
1691
1692        let consistent_provider = provider.consistent_provider()?;
1693
1694        // First in memory block should be found
1695        assert_eq!(
1696            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1697            Some(first_in_mem_block.clone().into_block())
1698        );
1699        assert_eq!(
1700            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1701            Some(first_in_mem_block.clone().into_block())
1702        );
1703
1704        // First database block should be found
1705        assert_eq!(
1706            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1707            Some(first_db_block.clone().into_block())
1708        );
1709        assert_eq!(
1710            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1711            Some(first_db_block.clone().into_block())
1712        );
1713
1714        Ok(())
1715    }
1716
1717    #[test]
1718    fn test_changeset_reader() -> eyre::Result<()> {
1719        let mut rng = generators::rng();
1720
1721        let (database_blocks, in_memory_blocks) =
1722            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1723
1724        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1725        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1726        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1727
1728        let accounts = random_eoa_accounts(&mut rng, 2);
1729
1730        let (database_changesets, database_state) = random_changeset_range(
1731            &mut rng,
1732            &database_blocks,
1733            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1734            0..0,
1735            0..0,
1736        );
1737        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1738            &mut rng,
1739            &in_memory_blocks,
1740            database_state
1741                .iter()
1742                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1743            0..0,
1744            0..0,
1745        );
1746
1747        let factory = create_test_provider_factory();
1748
1749        let provider_rw = factory.provider_rw()?;
1750        provider_rw.append_blocks_with_state(
1751            database_blocks
1752                .into_iter()
1753                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1754                .collect(),
1755            &ExecutionOutcome {
1756                bundle: BundleState::new(
1757                    database_state.into_iter().map(|(address, (account, _))| {
1758                        (address, None, Some(account.into()), Default::default())
1759                    }),
1760                    database_changesets
1761                        .iter()
1762                        .map(|block_changesets| {
1763                            block_changesets.iter().map(|(address, account, _)| {
1764                                (*address, Some(Some((*account).into())), [])
1765                            })
1766                        })
1767                        .collect::<Vec<_>>(),
1768                    Vec::new(),
1769                ),
1770                first_block: first_database_block,
1771                ..Default::default()
1772            },
1773            Default::default(),
1774            Default::default(),
1775        )?;
1776        provider_rw.commit()?;
1777
1778        let provider = BlockchainProvider::new(factory)?;
1779
1780        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1781        let chain = NewCanonicalChain::Commit {
1782            new: vec![in_memory_blocks
1783                .first()
1784                .map(|block| {
1785                    let senders = block.senders().expect("failed to recover senders");
1786                    ExecutedBlockWithTrieUpdates::new(
1787                        Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1788                        Arc::new(ExecutionOutcome {
1789                            bundle: BundleState::new(
1790                                in_memory_state.into_iter().map(|(address, (account, _))| {
1791                                    (address, None, Some(account.into()), Default::default())
1792                                }),
1793                                [in_memory_changesets.iter().map(|(address, account, _)| {
1794                                    (*address, Some(Some((*account).into())), Vec::new())
1795                                })],
1796                                [],
1797                            ),
1798                            first_block: first_in_memory_block,
1799                            ..Default::default()
1800                        }),
1801                        Default::default(),
1802                        ExecutedTrieUpdates::empty(),
1803                    )
1804                })
1805                .unwrap()],
1806        };
1807        provider.canonical_in_memory_state.update_chain(chain);
1808
1809        let consistent_provider = provider.consistent_provider()?;
1810
1811        assert_eq!(
1812            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1813            database_changesets
1814                .into_iter()
1815                .next_back()
1816                .unwrap()
1817                .into_iter()
1818                .sorted_by_key(|(address, _, _)| *address)
1819                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1820                .collect::<Vec<_>>()
1821        );
1822        assert_eq!(
1823            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1824            in_memory_changesets
1825                .into_iter()
1826                .sorted_by_key(|(address, _, _)| *address)
1827                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1828                .collect::<Vec<_>>()
1829        );
1830
1831        Ok(())
1832    }
1833}