Skip to main content

reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4    to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5    BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8    TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12    eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13    HashOrNumber,
14};
15use alloy_primitives::{
16    map::{hash_map, HashMap},
17    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
25use reth_prune_types::{PruneCheckpoint, PruneSegment};
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::{
29    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30    StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36    sync::Arc,
37};
38use tracing::trace;
39
40/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
41/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
42/// this provider.
43///
44/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
45/// time-out.
46#[derive(Debug)]
47#[doc(hidden)] // triggers ICE for `cargo docs`
48pub struct ConsistentProvider<N: ProviderNodeTypes> {
49    /// Storage provider.
50    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51    /// Head block at time of [`Self`] creation
52    head_block: Option<Arc<BlockState<N::Primitives>>>,
53    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
54    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
59    ///
60    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
61    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
62    /// view of memory and database.
63    pub fn new(
64        storage_provider_factory: ProviderFactory<N>,
65        state: CanonicalInMemoryState<N::Primitives>,
66    ) -> ProviderResult<Self> {
67        // Each one provides a snapshot at the time of instantiation, but its order matters.
68        //
69        // If we acquire first the database provider, it's possible that before the in-memory chain
70        // snapshot is instantiated, it will flush blocks to disk. This would
71        // mean that our database provider would not have access to the flushed blocks (since it's
72        // working under an older view), while the in-memory state may have deleted them
73        // entirely. Resulting in gaps on the range.
74        let head_block = state.head_state();
75        let storage_provider = storage_provider_factory.database_provider_ro()?;
76        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77    }
78
79    // Helper function to convert range bounds
80    fn convert_range_bounds<T>(
81        &self,
82        range: impl RangeBounds<T>,
83        end_unbounded: impl FnOnce() -> T,
84    ) -> (T, T)
85    where
86        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87    {
88        let start = match range.start_bound() {
89            Bound::Included(&n) => n,
90            Bound::Excluded(&n) => n + T::from(1u8),
91            Bound::Unbounded => T::from(0u8),
92        };
93
94        let end = match range.end_bound() {
95            Bound::Included(&n) => n,
96            Bound::Excluded(&n) => n - T::from(1u8),
97            Bound::Unbounded => end_unbounded(),
98        };
99
100        (start, end)
101    }
102
103    /// Storage provider for latest block
104    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105        trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107        // use latest state provider if the head state exists
108        if let Some(state) = &self.head_block {
109            trace!(target: "providers::blockchain", "Using head state for latest state provider");
110            Ok(self.block_state_provider_ref(state)?.boxed())
111        } else {
112            trace!(target: "providers::blockchain", "Using database state for latest state provider");
113            Ok(self.storage_provider.latest())
114        }
115    }
116
117    fn history_by_block_hash_ref<'a>(
118        &'a self,
119        block_hash: BlockHash,
120    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123        self.get_in_memory_or_storage_by_block(
124            block_hash.into(),
125            |_| self.storage_provider.history_by_block_hash(block_hash),
126            |block_state| {
127                let state_provider = self.block_state_provider_ref(block_state)?;
128                Ok(Box::new(state_provider))
129            },
130        )
131    }
132
133    /// Returns a state provider indexed by the given block number or tag.
134    fn state_by_block_number_ref<'a>(
135        &'a self,
136        number: BlockNumber,
137    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138        let hash =
139            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140        self.history_by_block_hash_ref(hash)
141    }
142
143    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
144    ///
145    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
146    pub fn get_state(
147        &self,
148        range: RangeInclusive<BlockNumber>,
149    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150        if range.is_empty() {
151            return Ok(None)
152        }
153        let start_block_number = *range.start();
154        let end_block_number = *range.end();
155
156        // We are not removing block meta as it is used to get block changesets.
157        let mut block_bodies = Vec::new();
158        for block_num in range.clone() {
159            let block_body = self
160                .block_body_indices(block_num)?
161                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162            block_bodies.push((block_num, block_body))
163        }
164
165        // get transaction receipts
166        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167        else {
168            return Ok(None)
169        };
170        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171            return Ok(None)
172        };
173
174        let mut account_changeset = Vec::new();
175        for block_num in range.clone() {
176            let changeset =
177                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178            account_changeset.extend(changeset);
179        }
180
181        let mut storage_changeset = Vec::new();
182        for block_num in range {
183            let changeset = self.storage_changeset(block_num)?;
184            storage_changeset.extend(changeset);
185        }
186
187        let (state, reverts) =
188            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190        let mut receipt_iter =
191            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193        let mut receipts = Vec::with_capacity(block_bodies.len());
194        // loop break if we are at the end of the blocks.
195        for (_, block_body) in block_bodies {
196            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197            for tx_num in block_body.tx_num_range() {
198                let receipt = receipt_iter
199                    .next()
200                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201                block_receipts.push(receipt);
202            }
203            receipts.push(block_receipts);
204        }
205
206        Ok(Some(ExecutionOutcome::new_init(
207            state,
208            reverts,
209            // We skip new contracts since we never delete them from the database
210            Vec::new(),
211            receipts,
212            start_block_number,
213            Vec::new(),
214        )))
215    }
216
217    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
218    /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
219    /// storage and account changesets.
220    fn populate_bundle_state(
221        &self,
222        account_changeset: Vec<(u64, AccountBeforeTx)>,
223        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224        block_range_end: BlockNumber,
225    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226        let mut state: BundleStateInit = HashMap::default();
227        let mut reverts: RevertsInit = HashMap::default();
228        let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230        // add account changeset changes
231        for (block_number, account_before) in account_changeset.into_iter().rev() {
232            let AccountBeforeTx { info: old_info, address } = account_before;
233            match state.entry(address) {
234                hash_map::Entry::Vacant(entry) => {
235                    let new_info = state_provider.basic_account(&address)?;
236                    entry.insert((old_info, new_info, HashMap::default()));
237                }
238                hash_map::Entry::Occupied(mut entry) => {
239                    // overwrite old account state.
240                    entry.get_mut().0 = old_info;
241                }
242            }
243            // insert old info into reverts.
244            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245        }
246
247        // add storage changeset changes
248        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249            let BlockNumberAddress((block_number, address)) = block_and_address;
250            // get account state or insert from plain state.
251            let account_state = match state.entry(address) {
252                hash_map::Entry::Vacant(entry) => {
253                    let present_info = state_provider.basic_account(&address)?;
254                    entry.insert((present_info, present_info, HashMap::default()))
255                }
256                hash_map::Entry::Occupied(entry) => entry.into_mut(),
257            };
258
259            // match storage.
260            match account_state.2.entry(old_storage.key) {
261                hash_map::Entry::Vacant(entry) => {
262                    let new_storage_value =
263                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264                    entry.insert((old_storage.value, new_storage_value));
265                }
266                hash_map::Entry::Occupied(mut entry) => {
267                    entry.get_mut().0 = old_storage.value;
268                }
269            };
270
271            reverts
272                .entry(block_number)
273                .or_default()
274                .entry(address)
275                .or_default()
276                .1
277                .push(old_storage);
278        }
279
280        Ok((state, reverts))
281    }
282
283    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
284    /// is met.
285    ///
286    /// Creates a snapshot of the in-memory chain state and database provider to prevent
287    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
288    /// recent in-memory blocks in case of overlaps.
289    ///
290    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
291    ///   user to retrieve the required items from the database using [`RangeInclusive`].
292    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
293    ///   state, allowing for selection or filtering for the desired data.
294    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295        &self,
296        range: impl RangeBounds<BlockNumber>,
297        fetch_db_range: F,
298        map_block_state_item: G,
299        mut predicate: P,
300    ) -> ProviderResult<Vec<T>>
301    where
302        F: FnOnce(
303            &DatabaseProviderRO<N::DB, N>,
304            RangeInclusive<BlockNumber>,
305            &mut P,
306        ) -> ProviderResult<Vec<T>>,
307        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308        P: FnMut(&T) -> bool,
309    {
310        // Each one provides a snapshot at the time of instantiation, but its order matters.
311        //
312        // If we acquire first the database provider, it's possible that before the in-memory chain
313        // snapshot is instantiated, it will flush blocks to disk. This would
314        // mean that our database provider would not have access to the flushed blocks (since it's
315        // working under an older view), while the in-memory state may have deleted them
316        // entirely. Resulting in gaps on the range.
317        let mut in_memory_chain =
318            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319        let db_provider = &self.storage_provider;
320
321        let (start, end) = self.convert_range_bounds(range, || {
322            // the first block is the highest one.
323            in_memory_chain
324                .first()
325                .map(|b| b.number())
326                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327        });
328
329        if start > end {
330            return Ok(vec![])
331        }
332
333        // Split range into storage_range and in-memory range. If the in-memory range is not
334        // necessary drop it early.
335        //
336        // The last block of `in_memory_chain` is the lowest block number.
337        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338            Some(lowest_memory_block) if lowest_memory_block <= end => {
339                let highest_memory_block =
340                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342                // Database will for a time overlap with in-memory-chain blocks. In
343                // case of a re-org, it can mean that the database blocks are of a forked chain, and
344                // so, we should prioritize the in-memory overlapped blocks.
345                let in_memory_range =
346                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348                // If requested range is in the middle of the in-memory range, remove the necessary
349                // lowest blocks
350                in_memory_chain.truncate(
351                    in_memory_chain
352                        .len()
353                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354                );
355
356                let storage_range =
357                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359                (Some((in_memory_chain, in_memory_range)), storage_range)
360            }
361            _ => {
362                // Drop the in-memory chain so we don't hold blocks in memory.
363                drop(in_memory_chain);
364
365                (None, Some(start..=end))
366            }
367        };
368
369        let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371        if let Some(storage_range) = storage_range {
372            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373            items.append(&mut db_items);
374
375            // The predicate was not met, if the number of items differs from the expected. So, we
376            // return what we have.
377            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378                return Ok(items)
379            }
380        }
381
382        if let Some((in_memory_chain, in_memory_range)) = in_memory {
383            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384                debug_assert!(num == block.number());
385                if let Some(item) = map_block_state_item(block, &mut predicate) {
386                    items.push(item);
387                } else {
388                    break
389                }
390            }
391        }
392
393        Ok(items)
394    }
395
396    /// This uses a given [`BlockState`] to initialize a state provider for that block.
397    fn block_state_provider_ref(
398        &self,
399        state: &BlockState<N::Primitives>,
400    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401        let anchor_hash = state.anchor().hash;
402        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405    }
406
407    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
408    ///
409    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
410    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
411    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
412    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413        &self,
414        range: impl RangeBounds<BlockNumber>,
415        fetch_from_db: S,
416        fetch_from_block_state: M,
417    ) -> ProviderResult<Vec<R>>
418    where
419        S: FnOnce(
420            &DatabaseProviderRO<N::DB, N>,
421            RangeInclusive<TxNumber>,
422        ) -> ProviderResult<Vec<R>>,
423        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424    {
425        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426        let provider = &self.storage_provider;
427
428        // Get the last block number stored in the storage which does NOT overlap with in-memory
429        // chain.
430        let last_database_block_number = in_mem_chain
431            .last()
432            .map(|b| Ok(b.anchor().number))
433            .unwrap_or_else(|| provider.last_block_number())?;
434
435        // Get the next tx number for the last block stored in the storage, which marks the start of
436        // the in-memory state.
437        let last_block_body_index = provider
438            .block_body_indices(last_database_block_number)?
439            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442        let (start, end) = self.convert_range_bounds(range, || {
443            in_mem_chain
444                .iter()
445                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446                .sum::<u64>() +
447                last_block_body_index.last_tx_num()
448        });
449
450        if start > end {
451            return Ok(vec![])
452        }
453
454        let mut tx_range = start..=end;
455
456        // If the range is entirely before the first in-memory transaction number, fetch from
457        // storage
458        if *tx_range.end() < in_memory_tx_num {
459            return fetch_from_db(provider, tx_range);
460        }
461
462        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464        // If the range spans storage and memory, get elements from storage first.
465        if *tx_range.start() < in_memory_tx_num {
466            // Determine the range that needs to be fetched from storage.
467            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469            // Set the remaining transaction range for in-memory
470            tx_range = in_memory_tx_num..=*tx_range.end();
471
472            items.extend(fetch_from_db(provider, db_range)?);
473        }
474
475        // Iterate from the lowest block to the highest in-memory chain
476        for block_state in in_mem_chain.iter().rev() {
477            let block_tx_count =
478                block_state.block_ref().recovered_block().body().transactions().len();
479            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481            // If the transaction range start is equal or higher than the next block first
482            // transaction, advance
483            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484                in_memory_tx_num += block_tx_count as u64;
485                continue
486            }
487
488            // This should only be more than 0 once, in case of a partial range inside a block.
489            let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491            items.extend(fetch_from_block_state(
492                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493                block_state,
494            )?);
495
496            in_memory_tx_num += block_tx_count as u64;
497
498            // Break if the range has been fully processed
499            if in_memory_tx_num > *tx_range.end() {
500                break
501            }
502
503            // Set updated range
504            tx_range = in_memory_tx_num..=*tx_range.end();
505        }
506
507        Ok(items)
508    }
509
510    /// Fetches data from either in-memory state or persistent storage by transaction
511    /// [`HashOrNumber`].
512    fn get_in_memory_or_storage_by_tx<S, M, R>(
513        &self,
514        id: HashOrNumber,
515        fetch_from_db: S,
516        fetch_from_block_state: M,
517    ) -> ProviderResult<Option<R>>
518    where
519        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521    {
522        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523        let provider = &self.storage_provider;
524
525        // Get the last block number stored in the database which does NOT overlap with in-memory
526        // chain.
527        let last_database_block_number = in_mem_chain
528            .last()
529            .map(|b| Ok(b.anchor().number))
530            .unwrap_or_else(|| provider.last_block_number())?;
531
532        // Get the next tx number for the last block stored in the database and consider it the
533        // first tx number of the in-memory state
534        let last_block_body_index = provider
535            .block_body_indices(last_database_block_number)?
536            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539        // If the transaction number is less than the first in-memory transaction number, make a
540        // database lookup
541        if let HashOrNumber::Number(id) = id &&
542            id < in_memory_tx_num
543        {
544            return fetch_from_db(provider)
545        }
546
547        // Iterate from the lowest block to the highest
548        for block_state in in_mem_chain.iter().rev() {
549            let executed_block = block_state.block_ref();
550            let block = executed_block.recovered_block();
551
552            for tx_index in 0..block.body().transactions().len() {
553                match id {
554                    HashOrNumber::Hash(tx_hash) => {
555                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557                        }
558                    }
559                    HashOrNumber::Number(id) => {
560                        if id == in_memory_tx_num {
561                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562                        }
563                    }
564                }
565
566                in_memory_tx_num += 1;
567            }
568        }
569
570        // Not found in-memory, so check database.
571        if let HashOrNumber::Hash(_) = id {
572            return fetch_from_db(provider)
573        }
574
575        Ok(None)
576    }
577
578    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
579    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580        &self,
581        id: BlockHashOrNumber,
582        fetch_from_db: S,
583        fetch_from_block_state: M,
584    ) -> ProviderResult<R>
585    where
586        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588    {
589        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590            return fetch_from_block_state(block_state)
591        }
592        fetch_from_db(&self.storage_provider)
593    }
594
595    /// Consumes the provider and returns a state provider for the specific block hash.
596    pub(crate) fn into_state_provider_at_block_hash(
597        self,
598        block_hash: BlockHash,
599    ) -> ProviderResult<StateProviderBox> {
600        let Self { storage_provider, head_block, .. } = self;
601        let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
602            let block_number = storage_provider
603                .block_number(block_hash)?
604                .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
605            storage_provider.try_into_history_at_block(block_number)
606        };
607        if let Some(Some(block_state)) =
608            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
609        {
610            let anchor_hash = block_state.anchor().hash;
611            let latest_historical = into_history_at_block_hash(anchor_hash)?;
612            return Ok(Box::new(block_state.state_provider(latest_historical)));
613        }
614        into_history_at_block_hash(block_hash)
615    }
616}
617
618impl<N: ProviderNodeTypes> ConsistentProvider<N> {
619    /// Ensures that the given block number is canonical (synced)
620    ///
621    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
622    /// out of range and would lead to invalid results, mainly during initial sync.
623    ///
624    /// Verifying the `block_number` would be expensive since we need to lookup sync table
625    /// Instead, we ensure that the `block_number` is within the range of the
626    /// [`Self::best_block_number`] which is updated when a block is synced.
627    #[inline]
628    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
629        let latest = self.best_block_number()?;
630        if block_number > latest {
631            Err(ProviderError::HeaderNotFound(block_number.into()))
632        } else {
633            Ok(())
634        }
635    }
636}
637
638impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
639    type Primitives = N::Primitives;
640}
641
642impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
643    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
644        self.storage_provider.static_file_provider()
645    }
646
647    fn get_static_file_writer(
648        &self,
649        block: BlockNumber,
650        segment: StaticFileSegment,
651    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
652        self.storage_provider.get_static_file_writer(block, segment)
653    }
654}
655
656impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
657    type Header = HeaderTy<N>;
658
659    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
660        self.get_in_memory_or_storage_by_block(
661            block_hash.into(),
662            |db_provider| db_provider.header(block_hash),
663            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
664        )
665    }
666
667    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
668        self.get_in_memory_or_storage_by_block(
669            num.into(),
670            |db_provider| db_provider.header_by_number(num),
671            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
672        )
673    }
674
675    fn headers_range(
676        &self,
677        range: impl RangeBounds<BlockNumber>,
678    ) -> ProviderResult<Vec<Self::Header>> {
679        self.get_in_memory_or_storage_by_block_range_while(
680            range,
681            |db_provider, range, _| db_provider.headers_range(range),
682            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
683            |_| true,
684        )
685    }
686
687    fn sealed_header(
688        &self,
689        number: BlockNumber,
690    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
691        self.get_in_memory_or_storage_by_block(
692            number.into(),
693            |db_provider| db_provider.sealed_header(number),
694            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
695        )
696    }
697
698    fn sealed_headers_range(
699        &self,
700        range: impl RangeBounds<BlockNumber>,
701    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
702        self.get_in_memory_or_storage_by_block_range_while(
703            range,
704            |db_provider, range, _| db_provider.sealed_headers_range(range),
705            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
706            |_| true,
707        )
708    }
709
710    fn sealed_headers_while(
711        &self,
712        range: impl RangeBounds<BlockNumber>,
713        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
714    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
715        self.get_in_memory_or_storage_by_block_range_while(
716            range,
717            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
718            |block_state, predicate| {
719                let header = block_state.block_ref().recovered_block().sealed_header();
720                predicate(header).then(|| header.clone())
721            },
722            predicate,
723        )
724    }
725}
726
727impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
728    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
729        self.get_in_memory_or_storage_by_block(
730            number.into(),
731            |db_provider| db_provider.block_hash(number),
732            |block_state| Ok(Some(block_state.hash())),
733        )
734    }
735
736    fn canonical_hashes_range(
737        &self,
738        start: BlockNumber,
739        end: BlockNumber,
740    ) -> ProviderResult<Vec<B256>> {
741        self.get_in_memory_or_storage_by_block_range_while(
742            start..end,
743            |db_provider, inclusive_range, _| {
744                db_provider
745                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
746            },
747            |block_state, _| Some(block_state.hash()),
748            |_| true,
749        )
750    }
751}
752
753impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
754    fn chain_info(&self) -> ProviderResult<ChainInfo> {
755        let best_number = self.best_block_number()?;
756        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
757    }
758
759    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
760        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
761    }
762
763    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
764        self.storage_provider.last_block_number()
765    }
766
767    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
768        self.get_in_memory_or_storage_by_block(
769            hash.into(),
770            |db_provider| db_provider.block_number(hash),
771            |block_state| Ok(Some(block_state.number())),
772        )
773    }
774}
775
776impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
777    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
778        Ok(self.canonical_in_memory_state.pending_block_num_hash())
779    }
780
781    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
782        Ok(self.canonical_in_memory_state.get_safe_num_hash())
783    }
784
785    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
786        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
787    }
788}
789
790impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
791    type Block = BlockTy<N>;
792
793    fn find_block_by_hash(
794        &self,
795        hash: B256,
796        source: BlockSource,
797    ) -> ProviderResult<Option<Self::Block>> {
798        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
799            let Some(block) = self.get_in_memory_or_storage_by_block(
800                hash.into(),
801                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
802                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
803            )?
804        {
805            return Ok(Some(block))
806        }
807
808        if matches!(source, BlockSource::Pending | BlockSource::Any) {
809            return Ok(self
810                .canonical_in_memory_state
811                .pending_block()
812                .filter(|b| b.hash() == hash)
813                .map(|b| b.into_block()))
814        }
815
816        Ok(None)
817    }
818
819    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
820        self.get_in_memory_or_storage_by_block(
821            id,
822            |db_provider| db_provider.block(id),
823            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
824        )
825    }
826
827    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
828        Ok(self.canonical_in_memory_state.pending_recovered_block())
829    }
830
831    fn pending_block_and_receipts(
832        &self,
833    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
834        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
835    }
836
837    /// Returns the block with senders with matching number or hash from database.
838    ///
839    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
840    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
841    ///
842    /// Returns `None` if block is not found.
843    fn recovered_block(
844        &self,
845        id: BlockHashOrNumber,
846        transaction_kind: TransactionVariant,
847    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
848        self.get_in_memory_or_storage_by_block(
849            id,
850            |db_provider| db_provider.recovered_block(id, transaction_kind),
851            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
852        )
853    }
854
855    fn sealed_block_with_senders(
856        &self,
857        id: BlockHashOrNumber,
858        transaction_kind: TransactionVariant,
859    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
860        self.get_in_memory_or_storage_by_block(
861            id,
862            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
863            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
864        )
865    }
866
867    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
868        self.get_in_memory_or_storage_by_block_range_while(
869            range,
870            |db_provider, range, _| db_provider.block_range(range),
871            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
872            |_| true,
873        )
874    }
875
876    fn block_with_senders_range(
877        &self,
878        range: RangeInclusive<BlockNumber>,
879    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
880        self.get_in_memory_or_storage_by_block_range_while(
881            range,
882            |db_provider, range, _| db_provider.block_with_senders_range(range),
883            |block_state, _| Some(block_state.block().recovered_block().clone()),
884            |_| true,
885        )
886    }
887
888    fn recovered_block_range(
889        &self,
890        range: RangeInclusive<BlockNumber>,
891    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
892        self.get_in_memory_or_storage_by_block_range_while(
893            range,
894            |db_provider, range, _| db_provider.recovered_block_range(range),
895            |block_state, _| Some(block_state.block().recovered_block().clone()),
896            |_| true,
897        )
898    }
899
900    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
901        self.get_in_memory_or_storage_by_tx(
902            id.into(),
903            |db_provider| db_provider.block_by_transaction_id(id),
904            |_, _, block_state| Ok(Some(block_state.number())),
905        )
906    }
907}
908
909impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
910    type Transaction = TxTy<N>;
911
912    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
913        self.get_in_memory_or_storage_by_tx(
914            tx_hash.into(),
915            |db_provider| db_provider.transaction_id(tx_hash),
916            |_, tx_number, _| Ok(Some(tx_number)),
917        )
918    }
919
920    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
921        self.get_in_memory_or_storage_by_tx(
922            id.into(),
923            |provider| provider.transaction_by_id(id),
924            |tx_index, _, block_state| {
925                Ok(block_state
926                    .block_ref()
927                    .recovered_block()
928                    .body()
929                    .transactions()
930                    .get(tx_index)
931                    .cloned())
932            },
933        )
934    }
935
936    fn transaction_by_id_unhashed(
937        &self,
938        id: TxNumber,
939    ) -> ProviderResult<Option<Self::Transaction>> {
940        self.get_in_memory_or_storage_by_tx(
941            id.into(),
942            |provider| provider.transaction_by_id_unhashed(id),
943            |tx_index, _, block_state| {
944                Ok(block_state
945                    .block_ref()
946                    .recovered_block()
947                    .body()
948                    .transactions()
949                    .get(tx_index)
950                    .cloned())
951            },
952        )
953    }
954
955    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
956        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
957            return Ok(Some(tx))
958        }
959
960        self.storage_provider.transaction_by_hash(hash)
961    }
962
963    fn transaction_by_hash_with_meta(
964        &self,
965        tx_hash: TxHash,
966    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
967        if let Some((tx, meta)) =
968            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
969        {
970            return Ok(Some((tx, meta)))
971        }
972
973        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
974    }
975
976    fn transactions_by_block(
977        &self,
978        id: BlockHashOrNumber,
979    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
980        self.get_in_memory_or_storage_by_block(
981            id,
982            |provider| provider.transactions_by_block(id),
983            |block_state| {
984                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
985            },
986        )
987    }
988
989    fn transactions_by_block_range(
990        &self,
991        range: impl RangeBounds<BlockNumber>,
992    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
993        self.get_in_memory_or_storage_by_block_range_while(
994            range,
995            |db_provider, range, _| db_provider.transactions_by_block_range(range),
996            |block_state, _| {
997                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
998            },
999            |_| true,
1000        )
1001    }
1002
1003    fn transactions_by_tx_range(
1004        &self,
1005        range: impl RangeBounds<TxNumber>,
1006    ) -> ProviderResult<Vec<Self::Transaction>> {
1007        self.get_in_memory_or_storage_by_tx_range(
1008            range,
1009            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1010            |index_range, block_state| {
1011                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1012                    .to_vec())
1013            },
1014        )
1015    }
1016
1017    fn senders_by_tx_range(
1018        &self,
1019        range: impl RangeBounds<TxNumber>,
1020    ) -> ProviderResult<Vec<Address>> {
1021        self.get_in_memory_or_storage_by_tx_range(
1022            range,
1023            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1024            |index_range, block_state| {
1025                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1026            },
1027        )
1028    }
1029
1030    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1031        self.get_in_memory_or_storage_by_tx(
1032            id.into(),
1033            |provider| provider.transaction_sender(id),
1034            |tx_index, _, block_state| {
1035                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1036            },
1037        )
1038    }
1039}
1040
1041impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1042    type Receipt = ReceiptTy<N>;
1043
1044    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1045        self.get_in_memory_or_storage_by_tx(
1046            id.into(),
1047            |provider| provider.receipt(id),
1048            |tx_index, _, block_state| {
1049                Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1050            },
1051        )
1052    }
1053
1054    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1055        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1056            let executed_block = block_state.block_ref();
1057            let block = executed_block.recovered_block();
1058            let receipts = block_state.executed_block_receipts_ref();
1059
1060            // assuming 1:1 correspondence between transactions and receipts
1061            debug_assert_eq!(
1062                block.body().transactions().len(),
1063                receipts.len(),
1064                "Mismatch between transaction and receipt count"
1065            );
1066
1067            if let Some(tx_index) =
1068                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1069            {
1070                // safe to use tx_index for receipts due to 1:1 correspondence
1071                return Ok(receipts.get(tx_index).cloned());
1072            }
1073        }
1074
1075        self.storage_provider.receipt_by_hash(hash)
1076    }
1077
1078    fn receipts_by_block(
1079        &self,
1080        block: BlockHashOrNumber,
1081    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1082        self.get_in_memory_or_storage_by_block(
1083            block,
1084            |db_provider| db_provider.receipts_by_block(block),
1085            |block_state| Ok(Some(block_state.executed_block_receipts())),
1086        )
1087    }
1088
1089    fn receipts_by_tx_range(
1090        &self,
1091        range: impl RangeBounds<TxNumber>,
1092    ) -> ProviderResult<Vec<Self::Receipt>> {
1093        self.get_in_memory_or_storage_by_tx_range(
1094            range,
1095            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1096            |index_range, block_state| {
1097                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1098            },
1099        )
1100    }
1101
1102    fn receipts_by_block_range(
1103        &self,
1104        block_range: RangeInclusive<BlockNumber>,
1105    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1106        self.storage_provider.receipts_by_block_range(block_range)
1107    }
1108}
1109
1110impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1111    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1112        match block {
1113            BlockId::Hash(rpc_block_hash) => {
1114                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1115                if receipts.is_none() &&
1116                    !rpc_block_hash.require_canonical.unwrap_or(false) &&
1117                    let Some(state) = self
1118                        .head_block
1119                        .as_ref()
1120                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1121                {
1122                    receipts = Some(state.executed_block_receipts());
1123                }
1124                Ok(receipts)
1125            }
1126            BlockId::Number(num_tag) => match num_tag {
1127                BlockNumberOrTag::Pending => Ok(self
1128                    .canonical_in_memory_state
1129                    .pending_state()
1130                    .map(|block_state| block_state.executed_block_receipts())),
1131                _ => {
1132                    if let Some(num) = self.convert_block_number(num_tag)? {
1133                        self.receipts_by_block(num.into())
1134                    } else {
1135                        Ok(None)
1136                    }
1137                }
1138            },
1139        }
1140    }
1141}
1142
1143impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1144    fn block_body_indices(
1145        &self,
1146        number: BlockNumber,
1147    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1148        self.get_in_memory_or_storage_by_block(
1149            number.into(),
1150            |db_provider| db_provider.block_body_indices(number),
1151            |block_state| {
1152                // Find the last block indices on database
1153                let last_storage_block_number = block_state.anchor().number;
1154                let mut stored_indices = self
1155                    .storage_provider
1156                    .block_body_indices(last_storage_block_number)?
1157                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1158
1159                // Prepare our block indices
1160                stored_indices.first_tx_num = stored_indices.next_tx_num();
1161                stored_indices.tx_count = 0;
1162
1163                // Iterate from the lowest block in memory until our target block
1164                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1165                    let block_tx_count =
1166                        state.block_ref().recovered_block().body().transactions().len() as u64;
1167                    if state.block_ref().recovered_block().number() == number {
1168                        stored_indices.tx_count = block_tx_count;
1169                    } else {
1170                        stored_indices.first_tx_num += block_tx_count;
1171                    }
1172                }
1173
1174                Ok(Some(stored_indices))
1175            },
1176        )
1177    }
1178
1179    fn block_body_indices_range(
1180        &self,
1181        range: RangeInclusive<BlockNumber>,
1182    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1183        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1184    }
1185}
1186
1187impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1188    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1189        self.storage_provider.get_stage_checkpoint(id)
1190    }
1191
1192    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1193        self.storage_provider.get_stage_checkpoint_progress(id)
1194    }
1195
1196    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1197        self.storage_provider.get_all_checkpoints()
1198    }
1199}
1200
1201impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1202    fn get_prune_checkpoint(
1203        &self,
1204        segment: PruneSegment,
1205    ) -> ProviderResult<Option<PruneCheckpoint>> {
1206        self.storage_provider.get_prune_checkpoint(segment)
1207    }
1208
1209    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1210        self.storage_provider.get_prune_checkpoints()
1211    }
1212}
1213
1214impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1215    type ChainSpec = N::ChainSpec;
1216
1217    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1218        ChainSpecProvider::chain_spec(&self.storage_provider)
1219    }
1220}
1221
1222impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1223    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1224        match id {
1225            BlockId::Number(num) => self.block_by_number_or_tag(num),
1226            BlockId::Hash(hash) => {
1227                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1228                // so not at the provider level?
1229                // if we decide to do this at a higher level, then we can make this an automatic
1230                // trait impl
1231                if Some(true) == hash.require_canonical {
1232                    // check the database, canonical blocks are only stored in the database
1233                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1234                } else {
1235                    self.block_by_hash(hash.block_hash)
1236                }
1237            }
1238        }
1239    }
1240
1241    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1242        Ok(match id {
1243            BlockNumberOrTag::Latest => {
1244                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1245            }
1246            BlockNumberOrTag::Finalized => {
1247                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1248            }
1249            BlockNumberOrTag::Safe => {
1250                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1251            }
1252            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1253            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1254
1255            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1256        })
1257    }
1258
1259    fn sealed_header_by_number_or_tag(
1260        &self,
1261        id: BlockNumberOrTag,
1262    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1263        match id {
1264            BlockNumberOrTag::Latest => {
1265                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1266            }
1267            BlockNumberOrTag::Finalized => {
1268                Ok(self.canonical_in_memory_state.get_finalized_header())
1269            }
1270            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1271            BlockNumberOrTag::Earliest => self
1272                .header_by_number(self.earliest_block_number()?)?
1273                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1274            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1275            BlockNumberOrTag::Number(num) => self
1276                .header_by_number(num)?
1277                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1278        }
1279    }
1280
1281    fn sealed_header_by_id(
1282        &self,
1283        id: BlockId,
1284    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1285        Ok(match id {
1286            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1287            BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1288        })
1289    }
1290
1291    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1292        Ok(match id {
1293            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1294            BlockId::Hash(hash) => self.header(hash.block_hash)?,
1295        })
1296    }
1297}
1298
1299impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1300    fn storage_changeset(
1301        &self,
1302        block_number: BlockNumber,
1303    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1304        if let Some(state) =
1305            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1306        {
1307            let changesets = state
1308                .block()
1309                .execution_output
1310                .state
1311                .reverts
1312                .clone()
1313                .to_plain_state_reverts()
1314                .storage
1315                .into_iter()
1316                .flatten()
1317                .flat_map(|revert: PlainStorageRevert| {
1318                    revert.storage_revert.into_iter().map(move |(key, value)| {
1319                        (
1320                            BlockNumberAddress((block_number, revert.address)),
1321                            StorageEntry { key: key.into(), value: value.to_previous_value() },
1322                        )
1323                    })
1324                })
1325                .collect();
1326            Ok(changesets)
1327        } else {
1328            // Perform checks on whether or not changesets exist for the block.
1329
1330            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1331            let storage_history_exists = self
1332                .storage_provider
1333                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1334                .and_then(|checkpoint| {
1335                    // return true if the block number is ahead of the prune checkpoint.
1336                    //
1337                    // The checkpoint stores the highest pruned block number, so we should make
1338                    // sure the block_number is strictly greater.
1339                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1340                })
1341                .unwrap_or(true);
1342
1343            if !storage_history_exists {
1344                return Err(ProviderError::StateAtBlockPruned(block_number))
1345            }
1346
1347            self.storage_provider.storage_changeset(block_number)
1348        }
1349    }
1350
1351    fn get_storage_before_block(
1352        &self,
1353        block_number: BlockNumber,
1354        address: Address,
1355        storage_key: B256,
1356    ) -> ProviderResult<Option<StorageEntry>> {
1357        if let Some(state) =
1358            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1359        {
1360            let changeset = state
1361                .block_ref()
1362                .execution_output
1363                .state
1364                .reverts
1365                .clone()
1366                .to_plain_state_reverts()
1367                .storage
1368                .into_iter()
1369                .flatten()
1370                .find_map(|revert: PlainStorageRevert| {
1371                    if revert.address != address {
1372                        return None
1373                    }
1374                    revert.storage_revert.into_iter().find_map(|(key, value)| {
1375                        let key = key.into();
1376                        (key == storage_key)
1377                            .then(|| StorageEntry { key, value: value.to_previous_value() })
1378                    })
1379                });
1380            Ok(changeset)
1381        } else {
1382            let storage_history_exists = self
1383                .storage_provider
1384                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1385                .and_then(|checkpoint| {
1386                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1387                })
1388                .unwrap_or(true);
1389
1390            if !storage_history_exists {
1391                return Err(ProviderError::StateAtBlockPruned(block_number))
1392            }
1393
1394            self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1395        }
1396    }
1397
1398    fn storage_changesets_range(
1399        &self,
1400        range: impl RangeBounds<BlockNumber>,
1401    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1402        let range = to_range(range);
1403        let mut changesets = Vec::new();
1404        let database_start = range.start;
1405        let mut database_end = range.end;
1406
1407        if let Some(head_block) = &self.head_block {
1408            database_end = head_block.anchor().number;
1409
1410            let chain = head_block.chain().collect::<Vec<_>>();
1411            for state in chain {
1412                let block_changesets = state
1413                    .block_ref()
1414                    .execution_output
1415                    .state
1416                    .reverts
1417                    .clone()
1418                    .to_plain_state_reverts()
1419                    .storage
1420                    .into_iter()
1421                    .flatten()
1422                    .flat_map(|revert: PlainStorageRevert| {
1423                        revert.storage_revert.into_iter().map(move |(key, value)| {
1424                            (
1425                                BlockNumberAddress((state.number(), revert.address)),
1426                                StorageEntry { key: key.into(), value: value.to_previous_value() },
1427                            )
1428                        })
1429                    });
1430
1431                changesets.extend(block_changesets);
1432            }
1433        }
1434
1435        if database_start < database_end {
1436            let storage_history_exists = self
1437                .storage_provider
1438                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1439                .and_then(|checkpoint| {
1440                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1441                })
1442                .unwrap_or(true);
1443
1444            if !storage_history_exists {
1445                return Err(ProviderError::StateAtBlockPruned(database_start))
1446            }
1447
1448            let db_changesets = self
1449                .storage_provider
1450                .storage_changesets_range(database_start..=database_end - 1)?;
1451            changesets.extend(db_changesets);
1452        }
1453
1454        changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1455
1456        Ok(changesets)
1457    }
1458
1459    fn storage_changeset_count(&self) -> ProviderResult<usize> {
1460        let mut count = 0;
1461        if let Some(head_block) = &self.head_block {
1462            for state in head_block.chain() {
1463                count += state
1464                    .block_ref()
1465                    .execution_output
1466                    .state
1467                    .reverts
1468                    .clone()
1469                    .to_plain_state_reverts()
1470                    .storage
1471                    .into_iter()
1472                    .flatten()
1473                    .map(|revert: PlainStorageRevert| revert.storage_revert.len())
1474                    .sum::<usize>();
1475            }
1476        }
1477
1478        count += self.storage_provider.storage_changeset_count()?;
1479
1480        Ok(count)
1481    }
1482}
1483
1484impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1485    fn account_block_changeset(
1486        &self,
1487        block_number: BlockNumber,
1488    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1489        if let Some(state) =
1490            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1491        {
1492            let changesets = state
1493                .block_ref()
1494                .execution_output
1495                .state
1496                .reverts
1497                .clone()
1498                .to_plain_state_reverts()
1499                .accounts
1500                .into_iter()
1501                .flatten()
1502                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1503                .collect();
1504            Ok(changesets)
1505        } else {
1506            // Perform checks on whether or not changesets exist for the block.
1507
1508            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1509            let account_history_exists = self
1510                .storage_provider
1511                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1512                .and_then(|checkpoint| {
1513                    // return true if the block number is ahead of the prune checkpoint.
1514                    //
1515                    // The checkpoint stores the highest pruned block number, so we should make
1516                    // sure the block_number is strictly greater.
1517                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1518                })
1519                .unwrap_or(true);
1520
1521            if !account_history_exists {
1522                return Err(ProviderError::StateAtBlockPruned(block_number))
1523            }
1524
1525            self.storage_provider.account_block_changeset(block_number)
1526        }
1527    }
1528
1529    fn get_account_before_block(
1530        &self,
1531        block_number: BlockNumber,
1532        address: Address,
1533    ) -> ProviderResult<Option<AccountBeforeTx>> {
1534        if let Some(state) =
1535            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1536        {
1537            // Search in-memory state for the account changeset
1538            let changeset = state
1539                .block_ref()
1540                .execution_output
1541                .state
1542                .reverts
1543                .clone()
1544                .to_plain_state_reverts()
1545                .accounts
1546                .into_iter()
1547                .flatten()
1548                .find(|(addr, _)| addr == &address)
1549                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1550            Ok(changeset)
1551        } else {
1552            // Perform checks on whether or not changesets exist for the block.
1553            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1554            let account_history_exists = self
1555                .storage_provider
1556                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1557                .and_then(|checkpoint| {
1558                    // return true if the block number is ahead of the prune checkpoint.
1559                    //
1560                    // The checkpoint stores the highest pruned block number, so we should make
1561                    // sure the block_number is strictly greater.
1562                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1563                })
1564                .unwrap_or(true);
1565
1566            if !account_history_exists {
1567                return Err(ProviderError::StateAtBlockPruned(block_number))
1568            }
1569
1570            // Delegate to the storage provider for database lookups
1571            self.storage_provider.get_account_before_block(block_number, address)
1572        }
1573    }
1574
1575    fn account_changesets_range(
1576        &self,
1577        range: impl core::ops::RangeBounds<BlockNumber>,
1578    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1579        let range = to_range(range);
1580        let mut changesets = Vec::new();
1581        let database_start = range.start;
1582        let mut database_end = range.end;
1583
1584        // Check which blocks in the range are in memory
1585        if let Some(head_block) = &self.head_block {
1586            // the anchor is the end of the db range
1587            database_end = head_block.anchor().number;
1588
1589            let chain = head_block.chain().collect::<Vec<_>>();
1590            for state in chain {
1591                // found block in memory, collect its changesets
1592                let block_changesets = state
1593                    .block_ref()
1594                    .execution_output
1595                    .state
1596                    .reverts
1597                    .clone()
1598                    .to_plain_state_reverts()
1599                    .accounts
1600                    .into_iter()
1601                    .flatten()
1602                    .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1603
1604                for changeset in block_changesets {
1605                    changesets.push((state.number(), changeset));
1606                }
1607            }
1608        }
1609
1610        // get changesets from database for remaining blocks
1611        if database_start < database_end {
1612            // check if account history is pruned for these blocks
1613            let account_history_exists = self
1614                .storage_provider
1615                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1616                .and_then(|checkpoint| {
1617                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1618                })
1619                .unwrap_or(true);
1620
1621            if !account_history_exists {
1622                return Err(ProviderError::StateAtBlockPruned(database_start))
1623            }
1624
1625            let db_changesets =
1626                self.storage_provider.account_changesets_range(database_start..database_end)?;
1627            changesets.extend(db_changesets);
1628        }
1629
1630        changesets.sort_by_key(|(block_num, _)| *block_num);
1631
1632        Ok(changesets)
1633    }
1634
1635    fn account_changeset_count(&self) -> ProviderResult<usize> {
1636        // Count changesets from in-memory state
1637        let mut count = 0;
1638        if let Some(head_block) = &self.head_block {
1639            for state in head_block.chain() {
1640                count += state
1641                    .block_ref()
1642                    .execution_output
1643                    .state
1644                    .reverts
1645                    .clone()
1646                    .to_plain_state_reverts()
1647                    .accounts
1648                    .len();
1649            }
1650        }
1651
1652        // Add changesets from storage provider
1653        count += self.storage_provider.account_changeset_count()?;
1654
1655        Ok(count)
1656    }
1657}
1658
1659impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1660    /// Get basic account information.
1661    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1662        // use latest state provider
1663        let state_provider = self.latest_ref()?;
1664        state_provider.basic_account(address)
1665    }
1666}
1667
1668impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1669    type Receipt = ReceiptTy<N>;
1670
1671    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1672    ///
1673    /// If data for the block does not exist, this will return [`None`].
1674    ///
1675    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1676    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1677    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1678    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1679    /// first place.
1680    fn get_state(
1681        &self,
1682        block: BlockNumber,
1683    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1684        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1685            let state = state.block_ref().execution_outcome().clone();
1686            Ok(Some(ExecutionOutcome::from((state, block))))
1687        } else {
1688            Self::get_state(self, block..=block)
1689        }
1690    }
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695    use crate::{
1696        providers::blockchain_provider::BlockchainProvider,
1697        test_utils::create_test_provider_factory, BlockWriter,
1698    };
1699    use alloy_eips::BlockHashOrNumber;
1700    use alloy_primitives::B256;
1701    use itertools::Itertools;
1702    use rand::Rng;
1703    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1704    use reth_db_api::models::AccountBeforeTx;
1705    use reth_ethereum_primitives::Block;
1706    use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1707    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1708    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1709    use reth_testing_utils::generators::{
1710        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1711    };
1712    use revm_database::BundleState;
1713    use std::{
1714        ops::{Bound, Range, RangeBounds},
1715        sync::Arc,
1716    };
1717
1718    const TEST_BLOCKS_COUNT: usize = 5;
1719
1720    fn random_blocks(
1721        rng: &mut impl Rng,
1722        database_blocks: usize,
1723        in_memory_blocks: usize,
1724        requests_count: Option<Range<u8>>,
1725        withdrawals_count: Option<Range<u8>>,
1726        tx_count: impl RangeBounds<u8>,
1727    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1728        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1729
1730        let tx_start = match tx_count.start_bound() {
1731            Bound::Included(&n) | Bound::Excluded(&n) => n,
1732            Bound::Unbounded => u8::MIN,
1733        };
1734        let tx_end = match tx_count.end_bound() {
1735            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1736            Bound::Unbounded => u8::MAX,
1737        };
1738
1739        let blocks = random_block_range(
1740            rng,
1741            0..=block_range,
1742            BlockRangeParams {
1743                parent: Some(B256::ZERO),
1744                tx_count: tx_start..tx_end,
1745                requests_count,
1746                withdrawals_count,
1747            },
1748        );
1749        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1750        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1751    }
1752
1753    #[test]
1754    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1755        // Initialize random number generator and provider factory
1756        let mut rng = generators::rng();
1757        let factory = create_test_provider_factory();
1758
1759        // Generate 10 random blocks and split into database and in-memory blocks
1760        let blocks = random_block_range(
1761            &mut rng,
1762            0..=10,
1763            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1764        );
1765        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1766
1767        // Insert first 5 blocks into the database
1768        let provider_rw = factory.provider_rw()?;
1769        for block in database_blocks {
1770            provider_rw.insert_block(
1771                &block.clone().try_recover().expect("failed to seal block with senders"),
1772            )?;
1773        }
1774        provider_rw.commit()?;
1775
1776        // Create a new provider
1777        let provider = BlockchainProvider::new(factory)?;
1778        let consistent_provider = provider.consistent_provider()?;
1779
1780        // Useful blocks
1781        let first_db_block = database_blocks.first().unwrap();
1782        let first_in_mem_block = in_memory_blocks.first().unwrap();
1783        let last_in_mem_block = in_memory_blocks.last().unwrap();
1784
1785        // No block in memory before setting in memory state
1786        assert_eq!(
1787            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1788            None
1789        );
1790        assert_eq!(
1791            consistent_provider
1792                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1793            None
1794        );
1795        // No pending block in memory
1796        assert_eq!(
1797            consistent_provider
1798                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1799            None
1800        );
1801
1802        // Insert first block into the in-memory state
1803        let in_memory_block_senders =
1804            first_in_mem_block.senders().expect("failed to recover senders");
1805        let chain = NewCanonicalChain::Commit {
1806            new: vec![ExecutedBlock {
1807                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1808                    first_in_mem_block.clone(),
1809                    in_memory_block_senders,
1810                )),
1811                ..Default::default()
1812            }],
1813        };
1814        consistent_provider.canonical_in_memory_state.update_chain(chain);
1815        let consistent_provider = provider.consistent_provider()?;
1816
1817        // Now the block should be found in memory
1818        assert_eq!(
1819            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1820            Some(first_in_mem_block.clone().into_block())
1821        );
1822        assert_eq!(
1823            consistent_provider
1824                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1825            Some(first_in_mem_block.clone().into_block())
1826        );
1827
1828        // Find the first block in database by hash
1829        assert_eq!(
1830            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1831            Some(first_db_block.clone().into_block())
1832        );
1833        assert_eq!(
1834            consistent_provider
1835                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1836            Some(first_db_block.clone().into_block())
1837        );
1838
1839        // No pending block in database
1840        assert_eq!(
1841            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1842            None
1843        );
1844
1845        // Insert the last block into the pending state
1846        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1847            recovered_block: Arc::new(RecoveredBlock::new_sealed(
1848                last_in_mem_block.clone(),
1849                Default::default(),
1850            )),
1851            ..Default::default()
1852        });
1853
1854        // Now the last block should be found in memory
1855        assert_eq!(
1856            consistent_provider
1857                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1858            Some(last_in_mem_block.clone_block())
1859        );
1860
1861        Ok(())
1862    }
1863
1864    #[test]
1865    fn test_block_reader_block() -> eyre::Result<()> {
1866        // Initialize random number generator and provider factory
1867        let mut rng = generators::rng();
1868        let factory = create_test_provider_factory();
1869
1870        // Generate 10 random blocks and split into database and in-memory blocks
1871        let blocks = random_block_range(
1872            &mut rng,
1873            0..=10,
1874            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1875        );
1876        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1877
1878        // Insert first 5 blocks into the database
1879        let provider_rw = factory.provider_rw()?;
1880        for block in database_blocks {
1881            provider_rw.insert_block(
1882                &block.clone().try_recover().expect("failed to seal block with senders"),
1883            )?;
1884        }
1885        provider_rw.commit()?;
1886
1887        // Create a new provider
1888        let provider = BlockchainProvider::new(factory)?;
1889        let consistent_provider = provider.consistent_provider()?;
1890
1891        // First in memory block
1892        let first_in_mem_block = in_memory_blocks.first().unwrap();
1893        // First database block
1894        let first_db_block = database_blocks.first().unwrap();
1895
1896        // First in memory block should not be found yet as not integrated to the in-memory state
1897        assert_eq!(
1898            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1899            None
1900        );
1901        assert_eq!(
1902            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1903            None
1904        );
1905
1906        // Insert first block into the in-memory state
1907        let in_memory_block_senders =
1908            first_in_mem_block.senders().expect("failed to recover senders");
1909        let chain = NewCanonicalChain::Commit {
1910            new: vec![ExecutedBlock {
1911                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1912                    first_in_mem_block.clone(),
1913                    in_memory_block_senders,
1914                )),
1915                ..Default::default()
1916            }],
1917        };
1918        consistent_provider.canonical_in_memory_state.update_chain(chain);
1919
1920        let consistent_provider = provider.consistent_provider()?;
1921
1922        // First in memory block should be found
1923        assert_eq!(
1924            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1925            Some(first_in_mem_block.clone().into_block())
1926        );
1927        assert_eq!(
1928            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1929            Some(first_in_mem_block.clone().into_block())
1930        );
1931
1932        // First database block should be found
1933        assert_eq!(
1934            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1935            Some(first_db_block.clone().into_block())
1936        );
1937        assert_eq!(
1938            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1939            Some(first_db_block.clone().into_block())
1940        );
1941
1942        Ok(())
1943    }
1944
1945    #[test]
1946    fn test_changeset_reader() -> eyre::Result<()> {
1947        let mut rng = generators::rng();
1948
1949        let (database_blocks, in_memory_blocks) =
1950            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1951
1952        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1953        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1954        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1955
1956        let accounts = random_eoa_accounts(&mut rng, 2);
1957
1958        let (database_changesets, database_state) = random_changeset_range(
1959            &mut rng,
1960            &database_blocks,
1961            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1962            0..0,
1963            0..0,
1964        );
1965        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1966            &mut rng,
1967            &in_memory_blocks,
1968            database_state
1969                .iter()
1970                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1971            0..0,
1972            0..0,
1973        );
1974
1975        let factory = create_test_provider_factory();
1976
1977        let provider_rw = factory.provider_rw()?;
1978        provider_rw.append_blocks_with_state(
1979            database_blocks
1980                .into_iter()
1981                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1982                .collect(),
1983            &ExecutionOutcome {
1984                bundle: BundleState::new(
1985                    database_state.into_iter().map(|(address, (account, _))| {
1986                        (address, None, Some(account.into()), Default::default())
1987                    }),
1988                    database_changesets.iter().map(|block_changesets| {
1989                        block_changesets.iter().map(|(address, account, _)| {
1990                            (*address, Some(Some((*account).into())), [])
1991                        })
1992                    }),
1993                    Vec::new(),
1994                ),
1995                first_block: first_database_block,
1996                ..Default::default()
1997            },
1998            Default::default(),
1999        )?;
2000        provider_rw.commit()?;
2001
2002        let provider = BlockchainProvider::new(factory)?;
2003
2004        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
2005        let chain = NewCanonicalChain::Commit {
2006            new: vec![in_memory_blocks
2007                .first()
2008                .map(|block| {
2009                    let senders = block.senders().expect("failed to recover senders");
2010                    ExecutedBlock {
2011                        recovered_block: Arc::new(RecoveredBlock::new_sealed(
2012                            block.clone(),
2013                            senders,
2014                        )),
2015                        execution_output: Arc::new(BlockExecutionOutput {
2016                            state: BundleState::new(
2017                                in_memory_state.into_iter().map(|(address, (account, _))| {
2018                                    (address, None, Some(account.into()), Default::default())
2019                                }),
2020                                [in_memory_changesets.iter().map(|(address, account, _)| {
2021                                    (*address, Some(Some((*account).into())), Vec::new())
2022                                })],
2023                                [],
2024                            ),
2025                            result: BlockExecutionResult {
2026                                receipts: Default::default(),
2027                                requests: Default::default(),
2028                                gas_used: 0,
2029                                blob_gas_used: 0,
2030                            },
2031                        }),
2032                        ..Default::default()
2033                    }
2034                })
2035                .unwrap()],
2036        };
2037        provider.canonical_in_memory_state.update_chain(chain);
2038
2039        let consistent_provider = provider.consistent_provider()?;
2040
2041        assert_eq!(
2042            consistent_provider.account_block_changeset(last_database_block).unwrap(),
2043            database_changesets
2044                .into_iter()
2045                .next_back()
2046                .unwrap()
2047                .into_iter()
2048                .sorted_by_key(|(address, _, _)| *address)
2049                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2050                .collect::<Vec<_>>()
2051        );
2052        assert_eq!(
2053            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2054            in_memory_changesets
2055                .into_iter()
2056                .sorted_by_key(|(address, _, _)| *address)
2057                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2058                .collect::<Vec<_>>()
2059        );
2060
2061        Ok(())
2062    }
2063}