reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4    BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7    TransactionsProvider, WithdrawalsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11    eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber, BlockId, BlockNumHash,
12    BlockNumberOrTag, HashOrNumber,
13};
14use alloy_primitives::{
15    map::{hash_map, HashMap},
16    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::{ChainInfo, EthereumHardforks};
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{
24    Account, BlockBody, RecoveredBlock, SealedBlock, SealedHeader, StorageEntry,
25};
26use reth_prune_types::{PruneCheckpoint, PruneSegment};
27use reth_stages_types::{StageCheckpoint, StageId};
28use reth_storage_api::{
29    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, OmmersProvider,
30    StateProvider, StorageChangeSetReader,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36    sync::Arc,
37};
38use tracing::trace;
39
40/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
41/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
42/// this provider.
43///
44/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
45/// time-out.
46#[derive(Debug)]
47#[doc(hidden)] // triggers ICE for `cargo docs`
48pub struct ConsistentProvider<N: ProviderNodeTypes> {
49    /// Storage provider.
50    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51    /// Head block at time of [`Self`] creation
52    head_block: Option<Arc<BlockState<N::Primitives>>>,
53    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
54    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
59    ///
60    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
61    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
62    /// view of memory and database.
63    pub fn new(
64        storage_provider_factory: ProviderFactory<N>,
65        state: CanonicalInMemoryState<N::Primitives>,
66    ) -> ProviderResult<Self> {
67        // Each one provides a snapshot at the time of instantiation, but its order matters.
68        //
69        // If we acquire first the database provider, it's possible that before the in-memory chain
70        // snapshot is instantiated, it will flush blocks to disk. This would
71        // mean that our database provider would not have access to the flushed blocks (since it's
72        // working under an older view), while the in-memory state may have deleted them
73        // entirely. Resulting in gaps on the range.
74        let head_block = state.head_state();
75        let storage_provider = storage_provider_factory.database_provider_ro()?;
76        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77    }
78
79    // Helper function to convert range bounds
80    fn convert_range_bounds<T>(
81        &self,
82        range: impl RangeBounds<T>,
83        end_unbounded: impl FnOnce() -> T,
84    ) -> (T, T)
85    where
86        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87    {
88        let start = match range.start_bound() {
89            Bound::Included(&n) => n,
90            Bound::Excluded(&n) => n + T::from(1u8),
91            Bound::Unbounded => T::from(0u8),
92        };
93
94        let end = match range.end_bound() {
95            Bound::Included(&n) => n,
96            Bound::Excluded(&n) => n - T::from(1u8),
97            Bound::Unbounded => end_unbounded(),
98        };
99
100        (start, end)
101    }
102
103    /// Storage provider for latest block
104    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105        trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107        // use latest state provider if the head state exists
108        if let Some(state) = &self.head_block {
109            trace!(target: "providers::blockchain", "Using head state for latest state provider");
110            Ok(self.block_state_provider_ref(state)?.boxed())
111        } else {
112            trace!(target: "providers::blockchain", "Using database state for latest state provider");
113            Ok(self.storage_provider.latest())
114        }
115    }
116
117    fn history_by_block_hash_ref<'a>(
118        &'a self,
119        block_hash: BlockHash,
120    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123        self.get_in_memory_or_storage_by_block(
124            block_hash.into(),
125            |_| self.storage_provider.history_by_block_hash(block_hash),
126            |block_state| {
127                let state_provider = self.block_state_provider_ref(block_state)?;
128                Ok(Box::new(state_provider))
129            },
130        )
131    }
132
133    /// Returns a state provider indexed by the given block number or tag.
134    fn state_by_block_number_ref<'a>(
135        &'a self,
136        number: BlockNumber,
137    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138        let hash =
139            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140        self.history_by_block_hash_ref(hash)
141    }
142
143    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
144    ///
145    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
146    pub fn get_state(
147        &self,
148        range: RangeInclusive<BlockNumber>,
149    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150        if range.is_empty() {
151            return Ok(None)
152        }
153        let start_block_number = *range.start();
154        let end_block_number = *range.end();
155
156        // We are not removing block meta as it is used to get block changesets.
157        let mut block_bodies = Vec::new();
158        for block_num in range.clone() {
159            let block_body = self
160                .block_body_indices(block_num)?
161                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162            block_bodies.push((block_num, block_body))
163        }
164
165        // get transaction receipts
166        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167        else {
168            return Ok(None)
169        };
170        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171            return Ok(None)
172        };
173
174        let mut account_changeset = Vec::new();
175        for block_num in range.clone() {
176            let changeset =
177                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178            account_changeset.extend(changeset);
179        }
180
181        let mut storage_changeset = Vec::new();
182        for block_num in range {
183            let changeset = self.storage_changeset(block_num)?;
184            storage_changeset.extend(changeset);
185        }
186
187        let (state, reverts) =
188            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190        let mut receipt_iter =
191            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193        let mut receipts = Vec::with_capacity(block_bodies.len());
194        // loop break if we are at the end of the blocks.
195        for (_, block_body) in block_bodies {
196            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197            for tx_num in block_body.tx_num_range() {
198                let receipt = receipt_iter
199                    .next()
200                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201                block_receipts.push(receipt);
202            }
203            receipts.push(block_receipts);
204        }
205
206        Ok(Some(ExecutionOutcome::new_init(
207            state,
208            reverts,
209            // We skip new contracts since we never delete them from the database
210            Vec::new(),
211            receipts,
212            start_block_number,
213            Vec::new(),
214        )))
215    }
216
217    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
218    /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
219    /// storage and account changesets.
220    fn populate_bundle_state(
221        &self,
222        account_changeset: Vec<(u64, AccountBeforeTx)>,
223        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224        block_range_end: BlockNumber,
225    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226        let mut state: BundleStateInit = HashMap::default();
227        let mut reverts: RevertsInit = HashMap::default();
228        let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230        // add account changeset changes
231        for (block_number, account_before) in account_changeset.into_iter().rev() {
232            let AccountBeforeTx { info: old_info, address } = account_before;
233            match state.entry(address) {
234                hash_map::Entry::Vacant(entry) => {
235                    let new_info = state_provider.basic_account(&address)?;
236                    entry.insert((old_info, new_info, HashMap::default()));
237                }
238                hash_map::Entry::Occupied(mut entry) => {
239                    // overwrite old account state.
240                    entry.get_mut().0 = old_info;
241                }
242            }
243            // insert old info into reverts.
244            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245        }
246
247        // add storage changeset changes
248        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249            let BlockNumberAddress((block_number, address)) = block_and_address;
250            // get account state or insert from plain state.
251            let account_state = match state.entry(address) {
252                hash_map::Entry::Vacant(entry) => {
253                    let present_info = state_provider.basic_account(&address)?;
254                    entry.insert((present_info, present_info, HashMap::default()))
255                }
256                hash_map::Entry::Occupied(entry) => entry.into_mut(),
257            };
258
259            // match storage.
260            match account_state.2.entry(old_storage.key) {
261                hash_map::Entry::Vacant(entry) => {
262                    let new_storage_value =
263                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264                    entry.insert((old_storage.value, new_storage_value));
265                }
266                hash_map::Entry::Occupied(mut entry) => {
267                    entry.get_mut().0 = old_storage.value;
268                }
269            };
270
271            reverts
272                .entry(block_number)
273                .or_default()
274                .entry(address)
275                .or_default()
276                .1
277                .push(old_storage);
278        }
279
280        Ok((state, reverts))
281    }
282
283    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
284    /// is met.
285    ///
286    /// Creates a snapshot of the in-memory chain state and database provider to prevent
287    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
288    /// recent in-memory blocks in case of overlaps.
289    ///
290    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
291    ///   user to retrieve the required items from the database using [`RangeInclusive`].
292    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
293    ///   state, allowing for selection or filtering for the desired data.
294    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295        &self,
296        range: impl RangeBounds<BlockNumber>,
297        fetch_db_range: F,
298        map_block_state_item: G,
299        mut predicate: P,
300    ) -> ProviderResult<Vec<T>>
301    where
302        F: FnOnce(
303            &DatabaseProviderRO<N::DB, N>,
304            RangeInclusive<BlockNumber>,
305            &mut P,
306        ) -> ProviderResult<Vec<T>>,
307        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308        P: FnMut(&T) -> bool,
309    {
310        // Each one provides a snapshot at the time of instantiation, but its order matters.
311        //
312        // If we acquire first the database provider, it's possible that before the in-memory chain
313        // snapshot is instantiated, it will flush blocks to disk. This would
314        // mean that our database provider would not have access to the flushed blocks (since it's
315        // working under an older view), while the in-memory state may have deleted them
316        // entirely. Resulting in gaps on the range.
317        let mut in_memory_chain =
318            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319        let db_provider = &self.storage_provider;
320
321        let (start, end) = self.convert_range_bounds(range, || {
322            // the first block is the highest one.
323            in_memory_chain
324                .first()
325                .map(|b| b.number())
326                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327        });
328
329        if start > end {
330            return Ok(vec![])
331        }
332
333        // Split range into storage_range and in-memory range. If the in-memory range is not
334        // necessary drop it early.
335        //
336        // The last block of `in_memory_chain` is the lowest block number.
337        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338            Some(lowest_memory_block) if lowest_memory_block <= end => {
339                let highest_memory_block =
340                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342                // Database will for a time overlap with in-memory-chain blocks. In
343                // case of a re-org, it can mean that the database blocks are of a forked chain, and
344                // so, we should prioritize the in-memory overlapped blocks.
345                let in_memory_range =
346                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348                // If requested range is in the middle of the in-memory range, remove the necessary
349                // lowest blocks
350                in_memory_chain.truncate(
351                    in_memory_chain
352                        .len()
353                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354                );
355
356                let storage_range =
357                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359                (Some((in_memory_chain, in_memory_range)), storage_range)
360            }
361            _ => {
362                // Drop the in-memory chain so we don't hold blocks in memory.
363                drop(in_memory_chain);
364
365                (None, Some(start..=end))
366            }
367        };
368
369        let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371        if let Some(storage_range) = storage_range {
372            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373            items.append(&mut db_items);
374
375            // The predicate was not met, if the number of items differs from the expected. So, we
376            // return what we have.
377            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378                return Ok(items)
379            }
380        }
381
382        if let Some((in_memory_chain, in_memory_range)) = in_memory {
383            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384                debug_assert!(num == block.number());
385                if let Some(item) = map_block_state_item(block, &mut predicate) {
386                    items.push(item);
387                } else {
388                    break
389                }
390            }
391        }
392
393        Ok(items)
394    }
395
396    /// This uses a given [`BlockState`] to initialize a state provider for that block.
397    fn block_state_provider_ref(
398        &self,
399        state: &BlockState<N::Primitives>,
400    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401        let anchor_hash = state.anchor().hash;
402        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405    }
406
407    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
408    ///
409    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
410    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
411    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
412    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413        &self,
414        range: impl RangeBounds<BlockNumber>,
415        fetch_from_db: S,
416        fetch_from_block_state: M,
417    ) -> ProviderResult<Vec<R>>
418    where
419        S: FnOnce(
420            &DatabaseProviderRO<N::DB, N>,
421            RangeInclusive<TxNumber>,
422        ) -> ProviderResult<Vec<R>>,
423        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424    {
425        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426        let provider = &self.storage_provider;
427
428        // Get the last block number stored in the storage which does NOT overlap with in-memory
429        // chain.
430        let last_database_block_number = in_mem_chain
431            .last()
432            .map(|b| Ok(b.anchor().number))
433            .unwrap_or_else(|| provider.last_block_number())?;
434
435        // Get the next tx number for the last block stored in the storage, which marks the start of
436        // the in-memory state.
437        let last_block_body_index = provider
438            .block_body_indices(last_database_block_number)?
439            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442        let (start, end) = self.convert_range_bounds(range, || {
443            in_mem_chain
444                .iter()
445                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446                .sum::<u64>() +
447                last_block_body_index.last_tx_num()
448        });
449
450        if start > end {
451            return Ok(vec![])
452        }
453
454        let mut tx_range = start..=end;
455
456        // If the range is entirely before the first in-memory transaction number, fetch from
457        // storage
458        if *tx_range.end() < in_memory_tx_num {
459            return fetch_from_db(provider, tx_range);
460        }
461
462        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464        // If the range spans storage and memory, get elements from storage first.
465        if *tx_range.start() < in_memory_tx_num {
466            // Determine the range that needs to be fetched from storage.
467            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469            // Set the remaining transaction range for in-memory
470            tx_range = in_memory_tx_num..=*tx_range.end();
471
472            items.extend(fetch_from_db(provider, db_range)?);
473        }
474
475        // Iterate from the lowest block to the highest in-memory chain
476        for block_state in in_mem_chain.iter().rev() {
477            let block_tx_count =
478                block_state.block_ref().recovered_block().body().transactions().len();
479            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481            // If the transaction range start is equal or higher than the next block first
482            // transaction, advance
483            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484                in_memory_tx_num += block_tx_count as u64;
485                continue
486            }
487
488            // This should only be more than 0 once, in case of a partial range inside a block.
489            let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491            items.extend(fetch_from_block_state(
492                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493                block_state,
494            )?);
495
496            in_memory_tx_num += block_tx_count as u64;
497
498            // Break if the range has been fully processed
499            if in_memory_tx_num > *tx_range.end() {
500                break
501            }
502
503            // Set updated range
504            tx_range = in_memory_tx_num..=*tx_range.end();
505        }
506
507        Ok(items)
508    }
509
510    /// Fetches data from either in-memory state or persistent storage by transaction
511    /// [`HashOrNumber`].
512    fn get_in_memory_or_storage_by_tx<S, M, R>(
513        &self,
514        id: HashOrNumber,
515        fetch_from_db: S,
516        fetch_from_block_state: M,
517    ) -> ProviderResult<Option<R>>
518    where
519        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521    {
522        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523        let provider = &self.storage_provider;
524
525        // Get the last block number stored in the database which does NOT overlap with in-memory
526        // chain.
527        let last_database_block_number = in_mem_chain
528            .last()
529            .map(|b| Ok(b.anchor().number))
530            .unwrap_or_else(|| provider.last_block_number())?;
531
532        // Get the next tx number for the last block stored in the database and consider it the
533        // first tx number of the in-memory state
534        let last_block_body_index = provider
535            .block_body_indices(last_database_block_number)?
536            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539        // If the transaction number is less than the first in-memory transaction number, make a
540        // database lookup
541        if let HashOrNumber::Number(id) = id {
542            if id < in_memory_tx_num {
543                return fetch_from_db(provider)
544            }
545        }
546
547        // Iterate from the lowest block to the highest
548        for block_state in in_mem_chain.iter().rev() {
549            let executed_block = block_state.block_ref();
550            let block = executed_block.recovered_block();
551
552            for tx_index in 0..block.body().transactions().len() {
553                match id {
554                    HashOrNumber::Hash(tx_hash) => {
555                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557                        }
558                    }
559                    HashOrNumber::Number(id) => {
560                        if id == in_memory_tx_num {
561                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562                        }
563                    }
564                }
565
566                in_memory_tx_num += 1;
567            }
568        }
569
570        // Not found in-memory, so check database.
571        if let HashOrNumber::Hash(_) = id {
572            return fetch_from_db(provider)
573        }
574
575        Ok(None)
576    }
577
578    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
579    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580        &self,
581        id: BlockHashOrNumber,
582        fetch_from_db: S,
583        fetch_from_block_state: M,
584    ) -> ProviderResult<R>
585    where
586        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588    {
589        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590            return fetch_from_block_state(block_state)
591        }
592        fetch_from_db(&self.storage_provider)
593    }
594}
595
596impl<N: ProviderNodeTypes> ConsistentProvider<N> {
597    /// Ensures that the given block number is canonical (synced)
598    ///
599    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
600    /// out of range and would lead to invalid results, mainly during initial sync.
601    ///
602    /// Verifying the `block_number` would be expensive since we need to lookup sync table
603    /// Instead, we ensure that the `block_number` is within the range of the
604    /// [`Self::best_block_number`] which is updated when a block is synced.
605    #[inline]
606    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
607        let latest = self.best_block_number()?;
608        if block_number > latest {
609            Err(ProviderError::HeaderNotFound(block_number.into()))
610        } else {
611            Ok(())
612        }
613    }
614}
615
616impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
617    type Primitives = N::Primitives;
618}
619
620impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
621    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
622        self.storage_provider.static_file_provider()
623    }
624}
625
626impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
627    type Header = HeaderTy<N>;
628
629    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
630        self.get_in_memory_or_storage_by_block(
631            (*block_hash).into(),
632            |db_provider| db_provider.header(block_hash),
633            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
634        )
635    }
636
637    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
638        self.get_in_memory_or_storage_by_block(
639            num.into(),
640            |db_provider| db_provider.header_by_number(num),
641            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
642        )
643    }
644
645    fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
646        if let Some(num) = self.block_number(*hash)? {
647            self.header_td_by_number(num)
648        } else {
649            Ok(None)
650        }
651    }
652
653    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
654        let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
655        {
656            // If the block exists in memory, we should return a TD for it.
657            //
658            // The canonical in memory state should only store post-merge blocks. Post-merge blocks
659            // have zero difficulty. This means we can use the total difficulty for the last
660            // finalized block number if present (so that we are not affected by reorgs), if not the
661            // last number in the database will be used.
662            if let Some(last_finalized_num_hash) =
663                self.canonical_in_memory_state.get_finalized_num_hash()
664            {
665                last_finalized_num_hash.number
666            } else {
667                self.last_block_number()?
668            }
669        } else {
670            // Otherwise, return what we have on disk for the input block
671            number
672        };
673        self.storage_provider.header_td_by_number(number)
674    }
675
676    fn headers_range(
677        &self,
678        range: impl RangeBounds<BlockNumber>,
679    ) -> ProviderResult<Vec<Self::Header>> {
680        self.get_in_memory_or_storage_by_block_range_while(
681            range,
682            |db_provider, range, _| db_provider.headers_range(range),
683            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
684            |_| true,
685        )
686    }
687
688    fn sealed_header(
689        &self,
690        number: BlockNumber,
691    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
692        self.get_in_memory_or_storage_by_block(
693            number.into(),
694            |db_provider| db_provider.sealed_header(number),
695            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
696        )
697    }
698
699    fn sealed_headers_range(
700        &self,
701        range: impl RangeBounds<BlockNumber>,
702    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
703        self.get_in_memory_or_storage_by_block_range_while(
704            range,
705            |db_provider, range, _| db_provider.sealed_headers_range(range),
706            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
707            |_| true,
708        )
709    }
710
711    fn sealed_headers_while(
712        &self,
713        range: impl RangeBounds<BlockNumber>,
714        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
715    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
716        self.get_in_memory_or_storage_by_block_range_while(
717            range,
718            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
719            |block_state, predicate| {
720                let header = block_state.block_ref().recovered_block().sealed_header();
721                predicate(header).then(|| header.clone())
722            },
723            predicate,
724        )
725    }
726}
727
728impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
729    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
730        self.get_in_memory_or_storage_by_block(
731            number.into(),
732            |db_provider| db_provider.block_hash(number),
733            |block_state| Ok(Some(block_state.hash())),
734        )
735    }
736
737    fn canonical_hashes_range(
738        &self,
739        start: BlockNumber,
740        end: BlockNumber,
741    ) -> ProviderResult<Vec<B256>> {
742        self.get_in_memory_or_storage_by_block_range_while(
743            start..end,
744            |db_provider, inclusive_range, _| {
745                db_provider
746                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
747            },
748            |block_state, _| Some(block_state.hash()),
749            |_| true,
750        )
751    }
752}
753
754impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
755    fn chain_info(&self) -> ProviderResult<ChainInfo> {
756        let best_number = self.best_block_number()?;
757        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
758    }
759
760    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
761        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
762    }
763
764    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
765        self.storage_provider.last_block_number()
766    }
767
768    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
769        self.get_in_memory_or_storage_by_block(
770            hash.into(),
771            |db_provider| db_provider.block_number(hash),
772            |block_state| Ok(Some(block_state.number())),
773        )
774    }
775}
776
777impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
778    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
779        Ok(self.canonical_in_memory_state.pending_block_num_hash())
780    }
781
782    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
783        Ok(self.canonical_in_memory_state.get_safe_num_hash())
784    }
785
786    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
787        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
788    }
789}
790
791impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
792    type Block = BlockTy<N>;
793
794    fn find_block_by_hash(
795        &self,
796        hash: B256,
797        source: BlockSource,
798    ) -> ProviderResult<Option<Self::Block>> {
799        if matches!(source, BlockSource::Canonical | BlockSource::Any) {
800            if let Some(block) = self.get_in_memory_or_storage_by_block(
801                hash.into(),
802                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
803                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
804            )? {
805                return Ok(Some(block))
806            }
807        }
808
809        if matches!(source, BlockSource::Pending | BlockSource::Any) {
810            return Ok(self
811                .canonical_in_memory_state
812                .pending_block()
813                .filter(|b| b.hash() == hash)
814                .map(|b| b.into_block()))
815        }
816
817        Ok(None)
818    }
819
820    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
821        self.get_in_memory_or_storage_by_block(
822            id,
823            |db_provider| db_provider.block(id),
824            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
825        )
826    }
827
828    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
829        Ok(self.canonical_in_memory_state.pending_block())
830    }
831
832    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
833        Ok(self.canonical_in_memory_state.pending_recovered_block())
834    }
835
836    fn pending_block_and_receipts(
837        &self,
838    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
839        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
840    }
841
842    /// Returns the block with senders with matching number or hash from database.
843    ///
844    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
845    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
846    ///
847    /// Returns `None` if block is not found.
848    fn recovered_block(
849        &self,
850        id: BlockHashOrNumber,
851        transaction_kind: TransactionVariant,
852    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
853        self.get_in_memory_or_storage_by_block(
854            id,
855            |db_provider| db_provider.recovered_block(id, transaction_kind),
856            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
857        )
858    }
859
860    fn sealed_block_with_senders(
861        &self,
862        id: BlockHashOrNumber,
863        transaction_kind: TransactionVariant,
864    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
865        self.get_in_memory_or_storage_by_block(
866            id,
867            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
868            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
869        )
870    }
871
872    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
873        self.get_in_memory_or_storage_by_block_range_while(
874            range,
875            |db_provider, range, _| db_provider.block_range(range),
876            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
877            |_| true,
878        )
879    }
880
881    fn block_with_senders_range(
882        &self,
883        range: RangeInclusive<BlockNumber>,
884    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
885        self.get_in_memory_or_storage_by_block_range_while(
886            range,
887            |db_provider, range, _| db_provider.block_with_senders_range(range),
888            |block_state, _| Some(block_state.block().recovered_block().clone()),
889            |_| true,
890        )
891    }
892
893    fn recovered_block_range(
894        &self,
895        range: RangeInclusive<BlockNumber>,
896    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
897        self.get_in_memory_or_storage_by_block_range_while(
898            range,
899            |db_provider, range, _| db_provider.recovered_block_range(range),
900            |block_state, _| Some(block_state.block().recovered_block().clone()),
901            |_| true,
902        )
903    }
904}
905
906impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
907    type Transaction = TxTy<N>;
908
909    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
910        self.get_in_memory_or_storage_by_tx(
911            tx_hash.into(),
912            |db_provider| db_provider.transaction_id(tx_hash),
913            |_, tx_number, _| Ok(Some(tx_number)),
914        )
915    }
916
917    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
918        self.get_in_memory_or_storage_by_tx(
919            id.into(),
920            |provider| provider.transaction_by_id(id),
921            |tx_index, _, block_state| {
922                Ok(block_state
923                    .block_ref()
924                    .recovered_block()
925                    .body()
926                    .transactions()
927                    .get(tx_index)
928                    .cloned())
929            },
930        )
931    }
932
933    fn transaction_by_id_unhashed(
934        &self,
935        id: TxNumber,
936    ) -> ProviderResult<Option<Self::Transaction>> {
937        self.get_in_memory_or_storage_by_tx(
938            id.into(),
939            |provider| provider.transaction_by_id_unhashed(id),
940            |tx_index, _, block_state| {
941                Ok(block_state
942                    .block_ref()
943                    .recovered_block()
944                    .body()
945                    .transactions()
946                    .get(tx_index)
947                    .cloned())
948            },
949        )
950    }
951
952    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
953        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
954            return Ok(Some(tx))
955        }
956
957        self.storage_provider.transaction_by_hash(hash)
958    }
959
960    fn transaction_by_hash_with_meta(
961        &self,
962        tx_hash: TxHash,
963    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
964        if let Some((tx, meta)) =
965            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
966        {
967            return Ok(Some((tx, meta)))
968        }
969
970        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
971    }
972
973    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
974        self.get_in_memory_or_storage_by_tx(
975            id.into(),
976            |provider| provider.transaction_block(id),
977            |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
978        )
979    }
980
981    fn transactions_by_block(
982        &self,
983        id: BlockHashOrNumber,
984    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
985        self.get_in_memory_or_storage_by_block(
986            id,
987            |provider| provider.transactions_by_block(id),
988            |block_state| {
989                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
990            },
991        )
992    }
993
994    fn transactions_by_block_range(
995        &self,
996        range: impl RangeBounds<BlockNumber>,
997    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
998        self.get_in_memory_or_storage_by_block_range_while(
999            range,
1000            |db_provider, range, _| db_provider.transactions_by_block_range(range),
1001            |block_state, _| {
1002                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1003            },
1004            |_| true,
1005        )
1006    }
1007
1008    fn transactions_by_tx_range(
1009        &self,
1010        range: impl RangeBounds<TxNumber>,
1011    ) -> ProviderResult<Vec<Self::Transaction>> {
1012        self.get_in_memory_or_storage_by_tx_range(
1013            range,
1014            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1015            |index_range, block_state| {
1016                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1017                    .to_vec())
1018            },
1019        )
1020    }
1021
1022    fn senders_by_tx_range(
1023        &self,
1024        range: impl RangeBounds<TxNumber>,
1025    ) -> ProviderResult<Vec<Address>> {
1026        self.get_in_memory_or_storage_by_tx_range(
1027            range,
1028            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1029            |index_range, block_state| {
1030                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1031            },
1032        )
1033    }
1034
1035    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1036        self.get_in_memory_or_storage_by_tx(
1037            id.into(),
1038            |provider| provider.transaction_sender(id),
1039            |tx_index, _, block_state| {
1040                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1041            },
1042        )
1043    }
1044}
1045
1046impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1047    type Receipt = ReceiptTy<N>;
1048
1049    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1050        self.get_in_memory_or_storage_by_tx(
1051            id.into(),
1052            |provider| provider.receipt(id),
1053            |tx_index, _, block_state| {
1054                Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1055            },
1056        )
1057    }
1058
1059    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1060        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1061            let executed_block = block_state.block_ref();
1062            let block = executed_block.recovered_block();
1063            let receipts = block_state.executed_block_receipts();
1064
1065            // assuming 1:1 correspondence between transactions and receipts
1066            debug_assert_eq!(
1067                block.body().transactions().len(),
1068                receipts.len(),
1069                "Mismatch between transaction and receipt count"
1070            );
1071
1072            if let Some(tx_index) =
1073                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1074            {
1075                // safe to use tx_index for receipts due to 1:1 correspondence
1076                return Ok(receipts.get(tx_index).cloned());
1077            }
1078        }
1079
1080        self.storage_provider.receipt_by_hash(hash)
1081    }
1082
1083    fn receipts_by_block(
1084        &self,
1085        block: BlockHashOrNumber,
1086    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1087        self.get_in_memory_or_storage_by_block(
1088            block,
1089            |db_provider| db_provider.receipts_by_block(block),
1090            |block_state| Ok(Some(block_state.executed_block_receipts())),
1091        )
1092    }
1093
1094    fn receipts_by_tx_range(
1095        &self,
1096        range: impl RangeBounds<TxNumber>,
1097    ) -> ProviderResult<Vec<Self::Receipt>> {
1098        self.get_in_memory_or_storage_by_tx_range(
1099            range,
1100            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1101            |index_range, block_state| {
1102                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1103            },
1104        )
1105    }
1106}
1107
1108impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1109    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1110        match block {
1111            BlockId::Hash(rpc_block_hash) => {
1112                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1113                if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1114                    if let Some(state) = self
1115                        .head_block
1116                        .as_ref()
1117                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1118                    {
1119                        receipts = Some(state.executed_block_receipts());
1120                    }
1121                }
1122                Ok(receipts)
1123            }
1124            BlockId::Number(num_tag) => match num_tag {
1125                BlockNumberOrTag::Pending => Ok(self
1126                    .canonical_in_memory_state
1127                    .pending_state()
1128                    .map(|block_state| block_state.executed_block_receipts())),
1129                _ => {
1130                    if let Some(num) = self.convert_block_number(num_tag)? {
1131                        self.receipts_by_block(num.into())
1132                    } else {
1133                        Ok(None)
1134                    }
1135                }
1136            },
1137        }
1138    }
1139}
1140
1141impl<N: ProviderNodeTypes> WithdrawalsProvider for ConsistentProvider<N> {
1142    fn withdrawals_by_block(
1143        &self,
1144        id: BlockHashOrNumber,
1145        timestamp: u64,
1146    ) -> ProviderResult<Option<Withdrawals>> {
1147        if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) {
1148            return Ok(None)
1149        }
1150
1151        self.get_in_memory_or_storage_by_block(
1152            id,
1153            |db_provider| db_provider.withdrawals_by_block(id, timestamp),
1154            |block_state| {
1155                Ok(block_state.block_ref().recovered_block().body().withdrawals().cloned())
1156            },
1157        )
1158    }
1159}
1160
1161impl<N: ProviderNodeTypes> OmmersProvider for ConsistentProvider<N> {
1162    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1163        self.get_in_memory_or_storage_by_block(
1164            id,
1165            |db_provider| db_provider.ommers(id),
1166            |block_state| {
1167                if self.chain_spec().is_paris_active_at_block(block_state.number()) {
1168                    return Ok(Some(Vec::new()))
1169                }
1170
1171                Ok(block_state.block_ref().recovered_block().body().ommers().map(|o| o.to_vec()))
1172            },
1173        )
1174    }
1175}
1176
1177impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1178    fn block_body_indices(
1179        &self,
1180        number: BlockNumber,
1181    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1182        self.get_in_memory_or_storage_by_block(
1183            number.into(),
1184            |db_provider| db_provider.block_body_indices(number),
1185            |block_state| {
1186                // Find the last block indices on database
1187                let last_storage_block_number = block_state.anchor().number;
1188                let mut stored_indices = self
1189                    .storage_provider
1190                    .block_body_indices(last_storage_block_number)?
1191                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1192
1193                // Prepare our block indices
1194                stored_indices.first_tx_num = stored_indices.next_tx_num();
1195                stored_indices.tx_count = 0;
1196
1197                // Iterate from the lowest block in memory until our target block
1198                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1199                    let block_tx_count =
1200                        state.block_ref().recovered_block().body().transactions().len() as u64;
1201                    if state.block_ref().recovered_block().number() == number {
1202                        stored_indices.tx_count = block_tx_count;
1203                    } else {
1204                        stored_indices.first_tx_num += block_tx_count;
1205                    }
1206                }
1207
1208                Ok(Some(stored_indices))
1209            },
1210        )
1211    }
1212
1213    fn block_body_indices_range(
1214        &self,
1215        range: RangeInclusive<BlockNumber>,
1216    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1217        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1218    }
1219}
1220
1221impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1222    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1223        self.storage_provider.get_stage_checkpoint(id)
1224    }
1225
1226    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1227        self.storage_provider.get_stage_checkpoint_progress(id)
1228    }
1229
1230    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1231        self.storage_provider.get_all_checkpoints()
1232    }
1233}
1234
1235impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1236    fn get_prune_checkpoint(
1237        &self,
1238        segment: PruneSegment,
1239    ) -> ProviderResult<Option<PruneCheckpoint>> {
1240        self.storage_provider.get_prune_checkpoint(segment)
1241    }
1242
1243    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1244        self.storage_provider.get_prune_checkpoints()
1245    }
1246}
1247
1248impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1249    type ChainSpec = N::ChainSpec;
1250
1251    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1252        ChainSpecProvider::chain_spec(&self.storage_provider)
1253    }
1254}
1255
1256impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1257    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1258        match id {
1259            BlockId::Number(num) => self.block_by_number_or_tag(num),
1260            BlockId::Hash(hash) => {
1261                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1262                // so not at the provider level?
1263                // if we decide to do this at a higher level, then we can make this an automatic
1264                // trait impl
1265                if Some(true) == hash.require_canonical {
1266                    // check the database, canonical blocks are only stored in the database
1267                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1268                } else {
1269                    self.block_by_hash(hash.block_hash)
1270                }
1271            }
1272        }
1273    }
1274
1275    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1276        Ok(match id {
1277            BlockNumberOrTag::Latest => {
1278                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1279            }
1280            BlockNumberOrTag::Finalized => {
1281                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1282            }
1283            BlockNumberOrTag::Safe => {
1284                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1285            }
1286            BlockNumberOrTag::Earliest => self.header_by_number(0)?,
1287            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1288
1289            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1290        })
1291    }
1292
1293    fn sealed_header_by_number_or_tag(
1294        &self,
1295        id: BlockNumberOrTag,
1296    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1297        match id {
1298            BlockNumberOrTag::Latest => {
1299                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1300            }
1301            BlockNumberOrTag::Finalized => {
1302                Ok(self.canonical_in_memory_state.get_finalized_header())
1303            }
1304            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1305            BlockNumberOrTag::Earliest => self
1306                .header_by_number(0)?
1307                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1308            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1309            BlockNumberOrTag::Number(num) => self
1310                .header_by_number(num)?
1311                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1312        }
1313    }
1314
1315    fn sealed_header_by_id(
1316        &self,
1317        id: BlockId,
1318    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1319        Ok(match id {
1320            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1321            BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1322        })
1323    }
1324
1325    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1326        Ok(match id {
1327            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1328            BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1329        })
1330    }
1331
1332    fn ommers_by_id(&self, id: BlockId) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1333        match id {
1334            BlockId::Number(num) => self.ommers_by_number_or_tag(num),
1335            BlockId::Hash(hash) => {
1336                // TODO: EIP-1898 question, see above
1337                // here it is not handled
1338                self.ommers(BlockHashOrNumber::Hash(hash.block_hash))
1339            }
1340        }
1341    }
1342}
1343
1344impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1345    fn storage_changeset(
1346        &self,
1347        block_number: BlockNumber,
1348    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1349        if let Some(state) =
1350            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1351        {
1352            let changesets = state
1353                .block()
1354                .execution_output
1355                .bundle
1356                .reverts
1357                .clone()
1358                .to_plain_state_reverts()
1359                .storage
1360                .into_iter()
1361                .flatten()
1362                .flat_map(|revert: PlainStorageRevert| {
1363                    revert.storage_revert.into_iter().map(move |(key, value)| {
1364                        (
1365                            BlockNumberAddress((block_number, revert.address)),
1366                            StorageEntry { key: key.into(), value: value.to_previous_value() },
1367                        )
1368                    })
1369                })
1370                .collect();
1371            Ok(changesets)
1372        } else {
1373            // Perform checks on whether or not changesets exist for the block.
1374
1375            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1376            let storage_history_exists = self
1377                .storage_provider
1378                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1379                .and_then(|checkpoint| {
1380                    // return true if the block number is ahead of the prune checkpoint.
1381                    //
1382                    // The checkpoint stores the highest pruned block number, so we should make
1383                    // sure the block_number is strictly greater.
1384                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1385                })
1386                .unwrap_or(true);
1387
1388            if !storage_history_exists {
1389                return Err(ProviderError::StateAtBlockPruned(block_number))
1390            }
1391
1392            self.storage_provider.storage_changeset(block_number)
1393        }
1394    }
1395}
1396
1397impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1398    fn account_block_changeset(
1399        &self,
1400        block_number: BlockNumber,
1401    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1402        if let Some(state) =
1403            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1404        {
1405            let changesets = state
1406                .block_ref()
1407                .execution_output
1408                .bundle
1409                .reverts
1410                .clone()
1411                .to_plain_state_reverts()
1412                .accounts
1413                .into_iter()
1414                .flatten()
1415                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1416                .collect();
1417            Ok(changesets)
1418        } else {
1419            // Perform checks on whether or not changesets exist for the block.
1420
1421            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1422            let account_history_exists = self
1423                .storage_provider
1424                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1425                .and_then(|checkpoint| {
1426                    // return true if the block number is ahead of the prune checkpoint.
1427                    //
1428                    // The checkpoint stores the highest pruned block number, so we should make
1429                    // sure the block_number is strictly greater.
1430                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1431                })
1432                .unwrap_or(true);
1433
1434            if !account_history_exists {
1435                return Err(ProviderError::StateAtBlockPruned(block_number))
1436            }
1437
1438            self.storage_provider.account_block_changeset(block_number)
1439        }
1440    }
1441}
1442
1443impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1444    /// Get basic account information.
1445    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1446        // use latest state provider
1447        let state_provider = self.latest_ref()?;
1448        state_provider.basic_account(address)
1449    }
1450}
1451
1452impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1453    type Receipt = ReceiptTy<N>;
1454
1455    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1456    ///
1457    /// If data for the block does not exist, this will return [`None`].
1458    ///
1459    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1460    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1461    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1462    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1463    /// first place.
1464    fn get_state(
1465        &self,
1466        block: BlockNumber,
1467    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1468        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1469            let state = state.block_ref().execution_outcome().clone();
1470            Ok(Some(state))
1471        } else {
1472            Self::get_state(self, block..=block)
1473        }
1474    }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479    use crate::{
1480        providers::blockchain_provider::BlockchainProvider,
1481        test_utils::create_test_provider_factory, BlockWriter,
1482    };
1483    use alloy_eips::BlockHashOrNumber;
1484    use alloy_primitives::B256;
1485    use itertools::Itertools;
1486    use rand::Rng;
1487    use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, NewCanonicalChain};
1488    use reth_db_api::models::AccountBeforeTx;
1489    use reth_ethereum_primitives::Block;
1490    use reth_execution_types::ExecutionOutcome;
1491    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1492    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1493    use reth_testing_utils::generators::{
1494        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1495    };
1496    use revm_database::BundleState;
1497    use std::{
1498        ops::{Bound, Range, RangeBounds},
1499        sync::Arc,
1500    };
1501
1502    const TEST_BLOCKS_COUNT: usize = 5;
1503
1504    fn random_blocks(
1505        rng: &mut impl Rng,
1506        database_blocks: usize,
1507        in_memory_blocks: usize,
1508        requests_count: Option<Range<u8>>,
1509        withdrawals_count: Option<Range<u8>>,
1510        tx_count: impl RangeBounds<u8>,
1511    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1512        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1513
1514        let tx_start = match tx_count.start_bound() {
1515            Bound::Included(&n) | Bound::Excluded(&n) => n,
1516            Bound::Unbounded => u8::MIN,
1517        };
1518        let tx_end = match tx_count.end_bound() {
1519            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1520            Bound::Unbounded => u8::MAX,
1521        };
1522
1523        let blocks = random_block_range(
1524            rng,
1525            0..=block_range,
1526            BlockRangeParams {
1527                parent: Some(B256::ZERO),
1528                tx_count: tx_start..tx_end,
1529                requests_count,
1530                withdrawals_count,
1531            },
1532        );
1533        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1534        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1535    }
1536
1537    #[test]
1538    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1539        // Initialize random number generator and provider factory
1540        let mut rng = generators::rng();
1541        let factory = create_test_provider_factory();
1542
1543        // Generate 10 random blocks and split into database and in-memory blocks
1544        let blocks = random_block_range(
1545            &mut rng,
1546            0..=10,
1547            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1548        );
1549        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1550
1551        // Insert first 5 blocks into the database
1552        let provider_rw = factory.provider_rw()?;
1553        for block in database_blocks {
1554            provider_rw.insert_historical_block(
1555                block.clone().try_recover().expect("failed to seal block with senders"),
1556            )?;
1557        }
1558        provider_rw.commit()?;
1559
1560        // Create a new provider
1561        let provider = BlockchainProvider::new(factory)?;
1562        let consistent_provider = provider.consistent_provider()?;
1563
1564        // Useful blocks
1565        let first_db_block = database_blocks.first().unwrap();
1566        let first_in_mem_block = in_memory_blocks.first().unwrap();
1567        let last_in_mem_block = in_memory_blocks.last().unwrap();
1568
1569        // No block in memory before setting in memory state
1570        assert_eq!(
1571            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1572            None
1573        );
1574        assert_eq!(
1575            consistent_provider
1576                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1577            None
1578        );
1579        // No pending block in memory
1580        assert_eq!(
1581            consistent_provider
1582                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1583            None
1584        );
1585
1586        // Insert first block into the in-memory state
1587        let in_memory_block_senders =
1588            first_in_mem_block.senders().expect("failed to recover senders");
1589        let chain = NewCanonicalChain::Commit {
1590            new: vec![ExecutedBlockWithTrieUpdates::new(
1591                Arc::new(RecoveredBlock::new_sealed(
1592                    first_in_mem_block.clone(),
1593                    in_memory_block_senders,
1594                )),
1595                Default::default(),
1596                Default::default(),
1597                Default::default(),
1598            )],
1599        };
1600        consistent_provider.canonical_in_memory_state.update_chain(chain);
1601        let consistent_provider = provider.consistent_provider()?;
1602
1603        // Now the block should be found in memory
1604        assert_eq!(
1605            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1606            Some(first_in_mem_block.clone().into_block())
1607        );
1608        assert_eq!(
1609            consistent_provider
1610                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1611            Some(first_in_mem_block.clone().into_block())
1612        );
1613
1614        // Find the first block in database by hash
1615        assert_eq!(
1616            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1617            Some(first_db_block.clone().into_block())
1618        );
1619        assert_eq!(
1620            consistent_provider
1621                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1622            Some(first_db_block.clone().into_block())
1623        );
1624
1625        // No pending block in database
1626        assert_eq!(
1627            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1628            None
1629        );
1630
1631        // Insert the last block into the pending state
1632        provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1633            block: ExecutedBlock {
1634                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1635                    last_in_mem_block.clone(),
1636                    Default::default(),
1637                )),
1638                execution_output: Default::default(),
1639                hashed_state: Default::default(),
1640            },
1641            trie: Default::default(),
1642        });
1643
1644        // Now the last block should be found in memory
1645        assert_eq!(
1646            consistent_provider
1647                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1648            Some(last_in_mem_block.clone_block())
1649        );
1650
1651        Ok(())
1652    }
1653
1654    #[test]
1655    fn test_block_reader_block() -> eyre::Result<()> {
1656        // Initialize random number generator and provider factory
1657        let mut rng = generators::rng();
1658        let factory = create_test_provider_factory();
1659
1660        // Generate 10 random blocks and split into database and in-memory blocks
1661        let blocks = random_block_range(
1662            &mut rng,
1663            0..=10,
1664            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1665        );
1666        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1667
1668        // Insert first 5 blocks into the database
1669        let provider_rw = factory.provider_rw()?;
1670        for block in database_blocks {
1671            provider_rw.insert_historical_block(
1672                block.clone().try_recover().expect("failed to seal block with senders"),
1673            )?;
1674        }
1675        provider_rw.commit()?;
1676
1677        // Create a new provider
1678        let provider = BlockchainProvider::new(factory)?;
1679        let consistent_provider = provider.consistent_provider()?;
1680
1681        // First in memory block
1682        let first_in_mem_block = in_memory_blocks.first().unwrap();
1683        // First database block
1684        let first_db_block = database_blocks.first().unwrap();
1685
1686        // First in memory block should not be found yet as not integrated to the in-memory state
1687        assert_eq!(
1688            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1689            None
1690        );
1691        assert_eq!(
1692            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1693            None
1694        );
1695
1696        // Insert first block into the in-memory state
1697        let in_memory_block_senders =
1698            first_in_mem_block.senders().expect("failed to recover senders");
1699        let chain = NewCanonicalChain::Commit {
1700            new: vec![ExecutedBlockWithTrieUpdates::new(
1701                Arc::new(RecoveredBlock::new_sealed(
1702                    first_in_mem_block.clone(),
1703                    in_memory_block_senders,
1704                )),
1705                Default::default(),
1706                Default::default(),
1707                Default::default(),
1708            )],
1709        };
1710        consistent_provider.canonical_in_memory_state.update_chain(chain);
1711
1712        let consistent_provider = provider.consistent_provider()?;
1713
1714        // First in memory block should be found
1715        assert_eq!(
1716            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1717            Some(first_in_mem_block.clone().into_block())
1718        );
1719        assert_eq!(
1720            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1721            Some(first_in_mem_block.clone().into_block())
1722        );
1723
1724        // First database block should be found
1725        assert_eq!(
1726            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1727            Some(first_db_block.clone().into_block())
1728        );
1729        assert_eq!(
1730            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1731            Some(first_db_block.clone().into_block())
1732        );
1733
1734        Ok(())
1735    }
1736
1737    #[test]
1738    fn test_changeset_reader() -> eyre::Result<()> {
1739        let mut rng = generators::rng();
1740
1741        let (database_blocks, in_memory_blocks) =
1742            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1743
1744        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1745        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1746        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1747
1748        let accounts = random_eoa_accounts(&mut rng, 2);
1749
1750        let (database_changesets, database_state) = random_changeset_range(
1751            &mut rng,
1752            &database_blocks,
1753            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1754            0..0,
1755            0..0,
1756        );
1757        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1758            &mut rng,
1759            &in_memory_blocks,
1760            database_state
1761                .iter()
1762                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1763            0..0,
1764            0..0,
1765        );
1766
1767        let factory = create_test_provider_factory();
1768
1769        let provider_rw = factory.provider_rw()?;
1770        provider_rw.append_blocks_with_state(
1771            database_blocks
1772                .into_iter()
1773                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1774                .collect(),
1775            &ExecutionOutcome {
1776                bundle: BundleState::new(
1777                    database_state.into_iter().map(|(address, (account, _))| {
1778                        (address, None, Some(account.into()), Default::default())
1779                    }),
1780                    database_changesets
1781                        .iter()
1782                        .map(|block_changesets| {
1783                            block_changesets.iter().map(|(address, account, _)| {
1784                                (*address, Some(Some((*account).into())), [])
1785                            })
1786                        })
1787                        .collect::<Vec<_>>(),
1788                    Vec::new(),
1789                ),
1790                first_block: first_database_block,
1791                ..Default::default()
1792            },
1793            Default::default(),
1794            Default::default(),
1795        )?;
1796        provider_rw.commit()?;
1797
1798        let provider = BlockchainProvider::new(factory)?;
1799
1800        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1801        let chain = NewCanonicalChain::Commit {
1802            new: vec![in_memory_blocks
1803                .first()
1804                .map(|block| {
1805                    let senders = block.senders().expect("failed to recover senders");
1806                    ExecutedBlockWithTrieUpdates::new(
1807                        Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1808                        Arc::new(ExecutionOutcome {
1809                            bundle: BundleState::new(
1810                                in_memory_state.into_iter().map(|(address, (account, _))| {
1811                                    (address, None, Some(account.into()), Default::default())
1812                                }),
1813                                [in_memory_changesets.iter().map(|(address, account, _)| {
1814                                    (*address, Some(Some((*account).into())), Vec::new())
1815                                })],
1816                                [],
1817                            ),
1818                            first_block: first_in_memory_block,
1819                            ..Default::default()
1820                        }),
1821                        Default::default(),
1822                        Default::default(),
1823                    )
1824                })
1825                .unwrap()],
1826        };
1827        provider.canonical_in_memory_state.update_chain(chain);
1828
1829        let consistent_provider = provider.consistent_provider()?;
1830
1831        assert_eq!(
1832            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1833            database_changesets
1834                .into_iter()
1835                .next_back()
1836                .unwrap()
1837                .into_iter()
1838                .sorted_by_key(|(address, _, _)| *address)
1839                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1840                .collect::<Vec<_>>()
1841        );
1842        assert_eq!(
1843            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1844            in_memory_changesets
1845                .into_iter()
1846                .sorted_by_key(|(address, _, _)| *address)
1847                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1848                .collect::<Vec<_>>()
1849        );
1850
1851        Ok(())
1852    }
1853}