Skip to main content

reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4    to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5    BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8    TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12    eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13    HashOrNumber,
14};
15use alloy_primitives::{
16    map::{hash_map, HashMap},
17    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
25use reth_prune_types::{PruneCheckpoint, PruneSegment};
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::{
29    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30    StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36    sync::Arc,
37};
38use tracing::trace;
39
40/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
41/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
42/// this provider.
43///
44/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
45/// time-out.
46#[derive(Debug)]
47#[doc(hidden)] // triggers ICE for `cargo docs`
48pub struct ConsistentProvider<N: ProviderNodeTypes> {
49    /// Storage provider.
50    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51    /// Head block at time of [`Self`] creation
52    head_block: Option<Arc<BlockState<N::Primitives>>>,
53    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
54    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
59    ///
60    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
61    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
62    /// view of memory and database.
63    pub fn new(
64        storage_provider_factory: ProviderFactory<N>,
65        state: CanonicalInMemoryState<N::Primitives>,
66    ) -> ProviderResult<Self> {
67        // Each one provides a snapshot at the time of instantiation, but its order matters.
68        //
69        // If we acquire first the database provider, it's possible that before the in-memory chain
70        // snapshot is instantiated, it will flush blocks to disk. This would
71        // mean that our database provider would not have access to the flushed blocks (since it's
72        // working under an older view), while the in-memory state may have deleted them
73        // entirely. Resulting in gaps on the range.
74        let head_block = state.head_state();
75        let storage_provider = storage_provider_factory.database_provider_ro()?;
76        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77    }
78
79    // Helper function to convert range bounds
80    fn convert_range_bounds<T>(
81        &self,
82        range: impl RangeBounds<T>,
83        end_unbounded: impl FnOnce() -> T,
84    ) -> (T, T)
85    where
86        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87    {
88        let start = match range.start_bound() {
89            Bound::Included(&n) => n,
90            Bound::Excluded(&n) => n + T::from(1u8),
91            Bound::Unbounded => T::from(0u8),
92        };
93
94        let end = match range.end_bound() {
95            Bound::Included(&n) => n,
96            Bound::Excluded(&n) => n - T::from(1u8),
97            Bound::Unbounded => end_unbounded(),
98        };
99
100        (start, end)
101    }
102
103    /// Storage provider for latest block
104    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105        trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107        // use latest state provider if the head state exists
108        if let Some(state) = &self.head_block {
109            trace!(target: "providers::blockchain", "Using head state for latest state provider");
110            Ok(self.block_state_provider_ref(state)?.boxed())
111        } else {
112            trace!(target: "providers::blockchain", "Using database state for latest state provider");
113            Ok(self.storage_provider.latest())
114        }
115    }
116
117    fn history_by_block_hash_ref<'a>(
118        &'a self,
119        block_hash: BlockHash,
120    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123        self.get_in_memory_or_storage_by_block(
124            block_hash.into(),
125            |_| self.storage_provider.history_by_block_hash(block_hash),
126            |block_state| {
127                let state_provider = self.block_state_provider_ref(block_state)?;
128                Ok(Box::new(state_provider))
129            },
130        )
131    }
132
133    /// Returns a state provider indexed by the given block number or tag.
134    fn state_by_block_number_ref<'a>(
135        &'a self,
136        number: BlockNumber,
137    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138        let hash =
139            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140        self.history_by_block_hash_ref(hash)
141    }
142
143    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
144    ///
145    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
146    pub fn get_state(
147        &self,
148        range: RangeInclusive<BlockNumber>,
149    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150        if range.is_empty() {
151            return Ok(None)
152        }
153        let start_block_number = *range.start();
154        let end_block_number = *range.end();
155
156        // We are not removing block meta as it is used to get block changesets.
157        let mut block_bodies = Vec::new();
158        for block_num in range.clone() {
159            let block_body = self
160                .block_body_indices(block_num)?
161                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162            block_bodies.push((block_num, block_body))
163        }
164
165        // get transaction receipts
166        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167        else {
168            return Ok(None)
169        };
170        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171            return Ok(None)
172        };
173
174        let mut account_changeset = Vec::new();
175        for block_num in range.clone() {
176            let changeset =
177                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178            account_changeset.extend(changeset);
179        }
180
181        let mut storage_changeset = Vec::new();
182        for block_num in range {
183            let changeset = self.storage_changeset(block_num)?;
184            storage_changeset.extend(changeset);
185        }
186
187        let (state, reverts) =
188            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190        let mut receipt_iter =
191            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193        let mut receipts = Vec::with_capacity(block_bodies.len());
194        // loop break if we are at the end of the blocks.
195        for (_, block_body) in block_bodies {
196            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197            for tx_num in block_body.tx_num_range() {
198                let receipt = receipt_iter
199                    .next()
200                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201                block_receipts.push(receipt);
202            }
203            receipts.push(block_receipts);
204        }
205
206        Ok(Some(ExecutionOutcome::new_init(
207            state,
208            reverts,
209            // We skip new contracts since we never delete them from the database
210            Vec::new(),
211            receipts,
212            start_block_number,
213            Vec::new(),
214        )))
215    }
216
217    /// Populate a [`BundleStateInit`] and [`RevertsInit`] based on the given storage and account
218    /// changesets.
219    ///
220    /// Storage changeset keys are always plain (unhashed). Current values are read via
221    /// [`StateProvider::storage`], which handles hashing internally when `use_hashed_state` is
222    /// enabled.
223    fn populate_bundle_state(
224        &self,
225        account_changeset: Vec<(u64, AccountBeforeTx)>,
226        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
227        block_range_end: BlockNumber,
228    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
229        let mut state: BundleStateInit = HashMap::default();
230        let mut reverts: RevertsInit = HashMap::default();
231        let state_provider = self.state_by_block_number_ref(block_range_end)?;
232
233        // add account changeset changes
234        for (block_number, account_before) in account_changeset.into_iter().rev() {
235            let AccountBeforeTx { info: old_info, address } = account_before;
236            match state.entry(address) {
237                hash_map::Entry::Vacant(entry) => {
238                    let new_info = state_provider.basic_account(&address)?;
239                    entry.insert((old_info, new_info, HashMap::default()));
240                }
241                hash_map::Entry::Occupied(mut entry) => {
242                    // overwrite old account state.
243                    entry.get_mut().0 = old_info;
244                }
245            }
246            // insert old info into reverts.
247            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
248        }
249
250        // add storage changeset changes
251        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
252            let BlockNumberAddress((block_number, address)) = block_and_address;
253            // get account state or insert from plain state.
254            let account_state = match state.entry(address) {
255                hash_map::Entry::Vacant(entry) => {
256                    let present_info = state_provider.basic_account(&address)?;
257                    entry.insert((present_info, present_info, HashMap::default()))
258                }
259                hash_map::Entry::Occupied(entry) => entry.into_mut(),
260            };
261
262            // match storage.
263            match account_state.2.entry(old_storage.key) {
264                hash_map::Entry::Vacant(entry) => {
265                    let new_storage_value =
266                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
267                    entry.insert((old_storage.value, new_storage_value));
268                }
269                hash_map::Entry::Occupied(mut entry) => {
270                    entry.get_mut().0 = old_storage.value;
271                }
272            };
273
274            reverts
275                .entry(block_number)
276                .or_default()
277                .entry(address)
278                .or_default()
279                .1
280                .push(old_storage);
281        }
282
283        Ok((state, reverts))
284    }
285
286    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
287    /// is met.
288    ///
289    /// Creates a snapshot of the in-memory chain state and database provider to prevent
290    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
291    /// recent in-memory blocks in case of overlaps.
292    ///
293    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
294    ///   user to retrieve the required items from the database using [`RangeInclusive`].
295    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
296    ///   state, allowing for selection or filtering for the desired data.
297    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
298        &self,
299        range: impl RangeBounds<BlockNumber>,
300        fetch_db_range: F,
301        map_block_state_item: G,
302        mut predicate: P,
303    ) -> ProviderResult<Vec<T>>
304    where
305        F: FnOnce(
306            &DatabaseProviderRO<N::DB, N>,
307            RangeInclusive<BlockNumber>,
308            &mut P,
309        ) -> ProviderResult<Vec<T>>,
310        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
311        P: FnMut(&T) -> bool,
312    {
313        // Each one provides a snapshot at the time of instantiation, but its order matters.
314        //
315        // If we acquire first the database provider, it's possible that before the in-memory chain
316        // snapshot is instantiated, it will flush blocks to disk. This would
317        // mean that our database provider would not have access to the flushed blocks (since it's
318        // working under an older view), while the in-memory state may have deleted them
319        // entirely. Resulting in gaps on the range.
320        let mut in_memory_chain =
321            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
322        let db_provider = &self.storage_provider;
323
324        let (start, end) = self.convert_range_bounds(range, || {
325            // the first block is the highest one.
326            in_memory_chain
327                .first()
328                .map(|b| b.number())
329                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
330        });
331
332        if start > end {
333            return Ok(vec![])
334        }
335
336        // Split range into storage_range and in-memory range. If the in-memory range is not
337        // necessary drop it early.
338        //
339        // The last block of `in_memory_chain` is the lowest block number.
340        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
341            Some(lowest_memory_block) if lowest_memory_block <= end => {
342                let highest_memory_block =
343                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
344
345                // Database will for a time overlap with in-memory-chain blocks. In
346                // case of a re-org, it can mean that the database blocks are of a forked chain, and
347                // so, we should prioritize the in-memory overlapped blocks.
348                let in_memory_range =
349                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
350
351                // If requested range is in the middle of the in-memory range, remove the necessary
352                // lowest blocks
353                in_memory_chain.truncate(
354                    in_memory_chain
355                        .len()
356                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
357                );
358
359                let storage_range =
360                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
361
362                (Some((in_memory_chain, in_memory_range)), storage_range)
363            }
364            _ => {
365                // Drop the in-memory chain so we don't hold blocks in memory.
366                drop(in_memory_chain);
367
368                (None, Some(start..=end))
369            }
370        };
371
372        let mut items = Vec::with_capacity((end - start + 1) as usize);
373
374        if let Some(storage_range) = storage_range {
375            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
376            items.append(&mut db_items);
377
378            // The predicate was not met, if the number of items differs from the expected. So, we
379            // return what we have.
380            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
381                return Ok(items)
382            }
383        }
384
385        if let Some((in_memory_chain, in_memory_range)) = in_memory {
386            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
387                debug_assert!(num == block.number());
388                if let Some(item) = map_block_state_item(block, &mut predicate) {
389                    items.push(item);
390                } else {
391                    break
392                }
393            }
394        }
395
396        Ok(items)
397    }
398
399    /// This uses a given [`BlockState`] to initialize a state provider for that block.
400    fn block_state_provider_ref(
401        &self,
402        state: &BlockState<N::Primitives>,
403    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
404        let anchor_hash = state.anchor().hash;
405        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
406        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
407        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
408    }
409
410    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
411    ///
412    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
413    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
414    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
415    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
416        &self,
417        range: impl RangeBounds<BlockNumber>,
418        fetch_from_db: S,
419        fetch_from_block_state: M,
420    ) -> ProviderResult<Vec<R>>
421    where
422        S: FnOnce(
423            &DatabaseProviderRO<N::DB, N>,
424            RangeInclusive<TxNumber>,
425        ) -> ProviderResult<Vec<R>>,
426        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
427    {
428        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
429        let provider = &self.storage_provider;
430
431        // Get the last block number stored in the storage which does NOT overlap with in-memory
432        // chain.
433        let last_database_block_number = in_mem_chain
434            .last()
435            .map(|b| Ok(b.anchor().number))
436            .unwrap_or_else(|| provider.last_block_number())?;
437
438        // Get the next tx number for the last block stored in the storage, which marks the start of
439        // the in-memory state.
440        let last_block_body_index = provider
441            .block_body_indices(last_database_block_number)?
442            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
443        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
444
445        let (start, end) = self.convert_range_bounds(range, || {
446            in_mem_chain
447                .iter()
448                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
449                .sum::<u64>() +
450                last_block_body_index.last_tx_num()
451        });
452
453        if start > end {
454            return Ok(vec![])
455        }
456
457        let mut tx_range = start..=end;
458
459        // If the range is entirely before the first in-memory transaction number, fetch from
460        // storage
461        if *tx_range.end() < in_memory_tx_num {
462            return fetch_from_db(provider, tx_range);
463        }
464
465        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
466
467        // If the range spans storage and memory, get elements from storage first.
468        if *tx_range.start() < in_memory_tx_num {
469            // Determine the range that needs to be fetched from storage.
470            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
471
472            // Set the remaining transaction range for in-memory
473            tx_range = in_memory_tx_num..=*tx_range.end();
474
475            items.extend(fetch_from_db(provider, db_range)?);
476        }
477
478        // Iterate from the lowest block to the highest in-memory chain
479        for block_state in in_mem_chain.iter().rev() {
480            let block_tx_count =
481                block_state.block_ref().recovered_block().body().transactions().len();
482            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
483
484            // If the transaction range start is equal or higher than the next block first
485            // transaction, advance
486            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
487                in_memory_tx_num += block_tx_count as u64;
488                continue
489            }
490
491            // This should only be more than 0 once, in case of a partial range inside a block.
492            let skip = (tx_range.start() - in_memory_tx_num) as usize;
493
494            items.extend(fetch_from_block_state(
495                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
496                block_state,
497            )?);
498
499            in_memory_tx_num += block_tx_count as u64;
500
501            // Break if the range has been fully processed
502            if in_memory_tx_num > *tx_range.end() {
503                break
504            }
505
506            // Set updated range
507            tx_range = in_memory_tx_num..=*tx_range.end();
508        }
509
510        Ok(items)
511    }
512
513    /// Fetches data from either in-memory state or persistent storage by transaction
514    /// [`HashOrNumber`].
515    fn get_in_memory_or_storage_by_tx<S, M, R>(
516        &self,
517        id: HashOrNumber,
518        fetch_from_db: S,
519        fetch_from_block_state: M,
520    ) -> ProviderResult<Option<R>>
521    where
522        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
523        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
524    {
525        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
526        let provider = &self.storage_provider;
527
528        // Get the last block number stored in the database which does NOT overlap with in-memory
529        // chain.
530        let last_database_block_number = in_mem_chain
531            .last()
532            .map(|b| Ok(b.anchor().number))
533            .unwrap_or_else(|| provider.last_block_number())?;
534
535        // Get the next tx number for the last block stored in the database and consider it the
536        // first tx number of the in-memory state
537        let last_block_body_index = provider
538            .block_body_indices(last_database_block_number)?
539            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
540        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
541
542        // If the transaction number is less than the first in-memory transaction number, make a
543        // database lookup
544        if let HashOrNumber::Number(id) = id &&
545            id < in_memory_tx_num
546        {
547            return fetch_from_db(provider)
548        }
549
550        // Iterate from the lowest block to the highest
551        for block_state in in_mem_chain.iter().rev() {
552            let executed_block = block_state.block_ref();
553            let block = executed_block.recovered_block();
554
555            for tx_index in 0..block.body().transactions().len() {
556                match id {
557                    HashOrNumber::Hash(tx_hash) => {
558                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
559                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560                        }
561                    }
562                    HashOrNumber::Number(id) => {
563                        if id == in_memory_tx_num {
564                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565                        }
566                    }
567                }
568
569                in_memory_tx_num += 1;
570            }
571        }
572
573        // Not found in-memory, so check database.
574        if let HashOrNumber::Hash(_) = id {
575            return fetch_from_db(provider)
576        }
577
578        Ok(None)
579    }
580
581    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
582    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
583        &self,
584        id: BlockHashOrNumber,
585        fetch_from_db: S,
586        fetch_from_block_state: M,
587    ) -> ProviderResult<R>
588    where
589        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
590        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
591    {
592        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
593            return fetch_from_block_state(block_state)
594        }
595        fetch_from_db(&self.storage_provider)
596    }
597
598    /// Consumes the provider and returns a state provider for the specific block hash.
599    pub(crate) fn into_state_provider_at_block_hash(
600        self,
601        block_hash: BlockHash,
602    ) -> ProviderResult<StateProviderBox> {
603        let Self { storage_provider, head_block, .. } = self;
604        let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
605            let block_number = storage_provider
606                .block_number(block_hash)?
607                .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
608            storage_provider.try_into_history_at_block(block_number)
609        };
610        if let Some(Some(block_state)) =
611            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
612        {
613            let anchor_hash = block_state.anchor().hash;
614            let latest_historical = into_history_at_block_hash(anchor_hash)?;
615            return Ok(Box::new(block_state.state_provider(latest_historical)));
616        }
617        into_history_at_block_hash(block_hash)
618    }
619}
620
621impl<N: ProviderNodeTypes> ConsistentProvider<N> {
622    /// Ensures that the given block number is canonical (synced)
623    ///
624    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
625    /// out of range and would lead to invalid results, mainly during initial sync.
626    ///
627    /// Verifying the `block_number` would be expensive since we need to lookup sync table
628    /// Instead, we ensure that the `block_number` is within the range of the
629    /// [`Self::best_block_number`] which is updated when a block is synced.
630    #[inline]
631    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
632        let latest = self.best_block_number()?;
633        if block_number > latest {
634            Err(ProviderError::HeaderNotFound(block_number.into()))
635        } else {
636            Ok(())
637        }
638    }
639}
640
641impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
642    type Primitives = N::Primitives;
643}
644
645impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
646    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
647        self.storage_provider.static_file_provider()
648    }
649
650    fn get_static_file_writer(
651        &self,
652        block: BlockNumber,
653        segment: StaticFileSegment,
654    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
655        self.storage_provider.get_static_file_writer(block, segment)
656    }
657}
658
659impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
660    type Header = HeaderTy<N>;
661
662    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
663        self.get_in_memory_or_storage_by_block(
664            block_hash.into(),
665            |db_provider| db_provider.header(block_hash),
666            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
667        )
668    }
669
670    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
671        self.get_in_memory_or_storage_by_block(
672            num.into(),
673            |db_provider| db_provider.header_by_number(num),
674            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
675        )
676    }
677
678    fn headers_range(
679        &self,
680        range: impl RangeBounds<BlockNumber>,
681    ) -> ProviderResult<Vec<Self::Header>> {
682        self.get_in_memory_or_storage_by_block_range_while(
683            range,
684            |db_provider, range, _| db_provider.headers_range(range),
685            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
686            |_| true,
687        )
688    }
689
690    fn sealed_header(
691        &self,
692        number: BlockNumber,
693    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
694        self.get_in_memory_or_storage_by_block(
695            number.into(),
696            |db_provider| db_provider.sealed_header(number),
697            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
698        )
699    }
700
701    fn sealed_headers_range(
702        &self,
703        range: impl RangeBounds<BlockNumber>,
704    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
705        self.get_in_memory_or_storage_by_block_range_while(
706            range,
707            |db_provider, range, _| db_provider.sealed_headers_range(range),
708            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
709            |_| true,
710        )
711    }
712
713    fn sealed_headers_while(
714        &self,
715        range: impl RangeBounds<BlockNumber>,
716        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
717    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
718        self.get_in_memory_or_storage_by_block_range_while(
719            range,
720            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
721            |block_state, predicate| {
722                let header = block_state.block_ref().recovered_block().sealed_header();
723                predicate(header).then(|| header.clone())
724            },
725            predicate,
726        )
727    }
728}
729
730impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
731    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
732        self.get_in_memory_or_storage_by_block(
733            number.into(),
734            |db_provider| db_provider.block_hash(number),
735            |block_state| Ok(Some(block_state.hash())),
736        )
737    }
738
739    fn canonical_hashes_range(
740        &self,
741        start: BlockNumber,
742        end: BlockNumber,
743    ) -> ProviderResult<Vec<B256>> {
744        self.get_in_memory_or_storage_by_block_range_while(
745            start..end,
746            |db_provider, inclusive_range, _| {
747                db_provider
748                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
749            },
750            |block_state, _| Some(block_state.hash()),
751            |_| true,
752        )
753    }
754}
755
756impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
757    fn chain_info(&self) -> ProviderResult<ChainInfo> {
758        let best_number = self.best_block_number()?;
759        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
760    }
761
762    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
763        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
764    }
765
766    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
767        self.storage_provider.last_block_number()
768    }
769
770    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
771        self.get_in_memory_or_storage_by_block(
772            hash.into(),
773            |db_provider| db_provider.block_number(hash),
774            |block_state| Ok(Some(block_state.number())),
775        )
776    }
777}
778
779impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
780    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
781        Ok(self.canonical_in_memory_state.pending_block_num_hash())
782    }
783
784    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
785        Ok(self.canonical_in_memory_state.get_safe_num_hash())
786    }
787
788    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
789        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
790    }
791}
792
793impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
794    type Block = BlockTy<N>;
795
796    fn find_block_by_hash(
797        &self,
798        hash: B256,
799        source: BlockSource,
800    ) -> ProviderResult<Option<Self::Block>> {
801        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
802            let Some(block) = self.get_in_memory_or_storage_by_block(
803                hash.into(),
804                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
805                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
806            )?
807        {
808            return Ok(Some(block))
809        }
810
811        if matches!(source, BlockSource::Pending | BlockSource::Any) {
812            return Ok(self
813                .canonical_in_memory_state
814                .pending_block()
815                .filter(|b| b.hash() == hash)
816                .map(|b| b.into_block()))
817        }
818
819        Ok(None)
820    }
821
822    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
823        self.get_in_memory_or_storage_by_block(
824            id,
825            |db_provider| db_provider.block(id),
826            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
827        )
828    }
829
830    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
831        Ok(self.canonical_in_memory_state.pending_recovered_block())
832    }
833
834    fn pending_block_and_receipts(
835        &self,
836    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
837        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
838    }
839
840    /// Returns the block with senders with matching number or hash from database.
841    ///
842    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
843    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
844    ///
845    /// Returns `None` if block is not found.
846    fn recovered_block(
847        &self,
848        id: BlockHashOrNumber,
849        transaction_kind: TransactionVariant,
850    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
851        self.get_in_memory_or_storage_by_block(
852            id,
853            |db_provider| db_provider.recovered_block(id, transaction_kind),
854            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
855        )
856    }
857
858    fn sealed_block_with_senders(
859        &self,
860        id: BlockHashOrNumber,
861        transaction_kind: TransactionVariant,
862    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
863        self.get_in_memory_or_storage_by_block(
864            id,
865            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
866            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
867        )
868    }
869
870    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
871        self.get_in_memory_or_storage_by_block_range_while(
872            range,
873            |db_provider, range, _| db_provider.block_range(range),
874            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
875            |_| true,
876        )
877    }
878
879    fn block_with_senders_range(
880        &self,
881        range: RangeInclusive<BlockNumber>,
882    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
883        self.get_in_memory_or_storage_by_block_range_while(
884            range,
885            |db_provider, range, _| db_provider.block_with_senders_range(range),
886            |block_state, _| Some(block_state.block().recovered_block().clone()),
887            |_| true,
888        )
889    }
890
891    fn recovered_block_range(
892        &self,
893        range: RangeInclusive<BlockNumber>,
894    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
895        self.get_in_memory_or_storage_by_block_range_while(
896            range,
897            |db_provider, range, _| db_provider.recovered_block_range(range),
898            |block_state, _| Some(block_state.block().recovered_block().clone()),
899            |_| true,
900        )
901    }
902
903    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
904        self.get_in_memory_or_storage_by_tx(
905            id.into(),
906            |db_provider| db_provider.block_by_transaction_id(id),
907            |_, _, block_state| Ok(Some(block_state.number())),
908        )
909    }
910}
911
912impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
913    type Transaction = TxTy<N>;
914
915    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
916        self.get_in_memory_or_storage_by_tx(
917            tx_hash.into(),
918            |db_provider| db_provider.transaction_id(tx_hash),
919            |_, tx_number, _| Ok(Some(tx_number)),
920        )
921    }
922
923    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
924        self.get_in_memory_or_storage_by_tx(
925            id.into(),
926            |provider| provider.transaction_by_id(id),
927            |tx_index, _, block_state| {
928                Ok(block_state
929                    .block_ref()
930                    .recovered_block()
931                    .body()
932                    .transactions()
933                    .get(tx_index)
934                    .cloned())
935            },
936        )
937    }
938
939    fn transaction_by_id_unhashed(
940        &self,
941        id: TxNumber,
942    ) -> ProviderResult<Option<Self::Transaction>> {
943        self.get_in_memory_or_storage_by_tx(
944            id.into(),
945            |provider| provider.transaction_by_id_unhashed(id),
946            |tx_index, _, block_state| {
947                Ok(block_state
948                    .block_ref()
949                    .recovered_block()
950                    .body()
951                    .transactions()
952                    .get(tx_index)
953                    .cloned())
954            },
955        )
956    }
957
958    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
959        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
960            return Ok(Some(tx))
961        }
962
963        self.storage_provider.transaction_by_hash(hash)
964    }
965
966    fn transaction_by_hash_with_meta(
967        &self,
968        tx_hash: TxHash,
969    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
970        if let Some((tx, meta)) =
971            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
972        {
973            return Ok(Some((tx, meta)))
974        }
975
976        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
977    }
978
979    fn transactions_by_block(
980        &self,
981        id: BlockHashOrNumber,
982    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
983        self.get_in_memory_or_storage_by_block(
984            id,
985            |provider| provider.transactions_by_block(id),
986            |block_state| {
987                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
988            },
989        )
990    }
991
992    fn transactions_by_block_range(
993        &self,
994        range: impl RangeBounds<BlockNumber>,
995    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
996        self.get_in_memory_or_storage_by_block_range_while(
997            range,
998            |db_provider, range, _| db_provider.transactions_by_block_range(range),
999            |block_state, _| {
1000                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1001            },
1002            |_| true,
1003        )
1004    }
1005
1006    fn transactions_by_tx_range(
1007        &self,
1008        range: impl RangeBounds<TxNumber>,
1009    ) -> ProviderResult<Vec<Self::Transaction>> {
1010        self.get_in_memory_or_storage_by_tx_range(
1011            range,
1012            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1013            |index_range, block_state| {
1014                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1015                    .to_vec())
1016            },
1017        )
1018    }
1019
1020    fn senders_by_tx_range(
1021        &self,
1022        range: impl RangeBounds<TxNumber>,
1023    ) -> ProviderResult<Vec<Address>> {
1024        self.get_in_memory_or_storage_by_tx_range(
1025            range,
1026            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1027            |index_range, block_state| {
1028                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1029            },
1030        )
1031    }
1032
1033    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1034        self.get_in_memory_or_storage_by_tx(
1035            id.into(),
1036            |provider| provider.transaction_sender(id),
1037            |tx_index, _, block_state| {
1038                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1039            },
1040        )
1041    }
1042}
1043
1044impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1045    type Receipt = ReceiptTy<N>;
1046
1047    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1048        self.get_in_memory_or_storage_by_tx(
1049            id.into(),
1050            |provider| provider.receipt(id),
1051            |tx_index, _, block_state| {
1052                Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1053            },
1054        )
1055    }
1056
1057    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1058        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1059            let executed_block = block_state.block_ref();
1060            let block = executed_block.recovered_block();
1061            let receipts = block_state.executed_block_receipts_ref();
1062
1063            // assuming 1:1 correspondence between transactions and receipts
1064            debug_assert_eq!(
1065                block.body().transactions().len(),
1066                receipts.len(),
1067                "Mismatch between transaction and receipt count"
1068            );
1069
1070            if let Some(tx_index) =
1071                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1072            {
1073                // safe to use tx_index for receipts due to 1:1 correspondence
1074                return Ok(receipts.get(tx_index).cloned());
1075            }
1076        }
1077
1078        self.storage_provider.receipt_by_hash(hash)
1079    }
1080
1081    fn receipts_by_block(
1082        &self,
1083        block: BlockHashOrNumber,
1084    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1085        self.get_in_memory_or_storage_by_block(
1086            block,
1087            |db_provider| db_provider.receipts_by_block(block),
1088            |block_state| Ok(Some(block_state.executed_block_receipts())),
1089        )
1090    }
1091
1092    fn receipts_by_tx_range(
1093        &self,
1094        range: impl RangeBounds<TxNumber>,
1095    ) -> ProviderResult<Vec<Self::Receipt>> {
1096        self.get_in_memory_or_storage_by_tx_range(
1097            range,
1098            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1099            |index_range, block_state| {
1100                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1101            },
1102        )
1103    }
1104
1105    fn receipts_by_block_range(
1106        &self,
1107        block_range: RangeInclusive<BlockNumber>,
1108    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1109        self.storage_provider.receipts_by_block_range(block_range)
1110    }
1111}
1112
1113impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1114    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1115        match block {
1116            BlockId::Hash(rpc_block_hash) => {
1117                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1118                if receipts.is_none() &&
1119                    !rpc_block_hash.require_canonical.unwrap_or(false) &&
1120                    let Some(state) = self
1121                        .head_block
1122                        .as_ref()
1123                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1124                {
1125                    receipts = Some(state.executed_block_receipts());
1126                }
1127                Ok(receipts)
1128            }
1129            BlockId::Number(num_tag) => match num_tag {
1130                BlockNumberOrTag::Pending => Ok(self
1131                    .canonical_in_memory_state
1132                    .pending_state()
1133                    .map(|block_state| block_state.executed_block_receipts())),
1134                _ => {
1135                    if let Some(num) = self.convert_block_number(num_tag)? {
1136                        self.receipts_by_block(num.into())
1137                    } else {
1138                        Ok(None)
1139                    }
1140                }
1141            },
1142        }
1143    }
1144}
1145
1146impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1147    fn block_body_indices(
1148        &self,
1149        number: BlockNumber,
1150    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1151        self.get_in_memory_or_storage_by_block(
1152            number.into(),
1153            |db_provider| db_provider.block_body_indices(number),
1154            |block_state| {
1155                // Find the last block indices on database
1156                let last_storage_block_number = block_state.anchor().number;
1157                let mut stored_indices = self
1158                    .storage_provider
1159                    .block_body_indices(last_storage_block_number)?
1160                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1161
1162                // Prepare our block indices
1163                stored_indices.first_tx_num = stored_indices.next_tx_num();
1164                stored_indices.tx_count = 0;
1165
1166                // Iterate from the lowest block in memory until our target block
1167                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1168                    let block_tx_count =
1169                        state.block_ref().recovered_block().body().transactions().len() as u64;
1170                    if state.block_ref().recovered_block().number() == number {
1171                        stored_indices.tx_count = block_tx_count;
1172                    } else {
1173                        stored_indices.first_tx_num += block_tx_count;
1174                    }
1175                }
1176
1177                Ok(Some(stored_indices))
1178            },
1179        )
1180    }
1181
1182    fn block_body_indices_range(
1183        &self,
1184        range: RangeInclusive<BlockNumber>,
1185    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1186        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1187    }
1188}
1189
1190impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1191    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1192        self.storage_provider.get_stage_checkpoint(id)
1193    }
1194
1195    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1196        self.storage_provider.get_stage_checkpoint_progress(id)
1197    }
1198
1199    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1200        self.storage_provider.get_all_checkpoints()
1201    }
1202}
1203
1204impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1205    fn get_prune_checkpoint(
1206        &self,
1207        segment: PruneSegment,
1208    ) -> ProviderResult<Option<PruneCheckpoint>> {
1209        self.storage_provider.get_prune_checkpoint(segment)
1210    }
1211
1212    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1213        self.storage_provider.get_prune_checkpoints()
1214    }
1215}
1216
1217impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1218    type ChainSpec = N::ChainSpec;
1219
1220    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1221        ChainSpecProvider::chain_spec(&self.storage_provider)
1222    }
1223}
1224
1225impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1226    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1227        match id {
1228            BlockId::Number(num) => self.block_by_number_or_tag(num),
1229            BlockId::Hash(hash) => {
1230                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1231                // so not at the provider level?
1232                // if we decide to do this at a higher level, then we can make this an automatic
1233                // trait impl
1234                if Some(true) == hash.require_canonical {
1235                    // check the database, canonical blocks are only stored in the database
1236                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1237                } else {
1238                    self.block_by_hash(hash.block_hash)
1239                }
1240            }
1241        }
1242    }
1243
1244    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1245        Ok(match id {
1246            BlockNumberOrTag::Latest => {
1247                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1248            }
1249            BlockNumberOrTag::Finalized => {
1250                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1251            }
1252            BlockNumberOrTag::Safe => {
1253                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1254            }
1255            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1256            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1257
1258            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1259        })
1260    }
1261
1262    fn sealed_header_by_number_or_tag(
1263        &self,
1264        id: BlockNumberOrTag,
1265    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1266        match id {
1267            BlockNumberOrTag::Latest => {
1268                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1269            }
1270            BlockNumberOrTag::Finalized => {
1271                Ok(self.canonical_in_memory_state.get_finalized_header())
1272            }
1273            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1274            BlockNumberOrTag::Earliest => self
1275                .header_by_number(self.earliest_block_number()?)?
1276                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1277            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1278            BlockNumberOrTag::Number(num) => self
1279                .header_by_number(num)?
1280                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1281        }
1282    }
1283
1284    fn sealed_header_by_id(
1285        &self,
1286        id: BlockId,
1287    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1288        Ok(match id {
1289            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1290            BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1291        })
1292    }
1293
1294    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1295        Ok(match id {
1296            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1297            BlockId::Hash(hash) => self.header(hash.block_hash)?,
1298        })
1299    }
1300}
1301
1302impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1303    fn storage_changeset(
1304        &self,
1305        block_number: BlockNumber,
1306    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1307        if let Some(state) =
1308            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1309        {
1310            let changesets = state
1311                .block()
1312                .execution_output
1313                .state
1314                .reverts
1315                .clone()
1316                .to_plain_state_reverts()
1317                .storage
1318                .into_iter()
1319                .flatten()
1320                .flat_map(|revert: PlainStorageRevert| {
1321                    revert.storage_revert.into_iter().map(move |(key, value)| {
1322                        let plain_key = B256::from(key.to_be_bytes());
1323                        (
1324                            BlockNumberAddress((block_number, revert.address)),
1325                            StorageEntry { key: plain_key, value: value.to_previous_value() },
1326                        )
1327                    })
1328                })
1329                .collect();
1330            Ok(changesets)
1331        } else {
1332            // Perform checks on whether or not changesets exist for the block.
1333
1334            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1335            let storage_history_exists = self
1336                .storage_provider
1337                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1338                .and_then(|checkpoint| {
1339                    // return true if the block number is ahead of the prune checkpoint.
1340                    //
1341                    // The checkpoint stores the highest pruned block number, so we should make
1342                    // sure the block_number is strictly greater.
1343                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1344                })
1345                .unwrap_or(true);
1346
1347            if !storage_history_exists {
1348                return Err(ProviderError::StateAtBlockPruned(block_number))
1349            }
1350
1351            self.storage_provider.storage_changeset(block_number)
1352        }
1353    }
1354
1355    fn get_storage_before_block(
1356        &self,
1357        block_number: BlockNumber,
1358        address: Address,
1359        storage_key: B256,
1360    ) -> ProviderResult<Option<StorageEntry>> {
1361        if let Some(state) =
1362            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1363        {
1364            let changeset = state
1365                .block_ref()
1366                .execution_output
1367                .state
1368                .reverts
1369                .clone()
1370                .to_plain_state_reverts()
1371                .storage
1372                .into_iter()
1373                .flatten()
1374                .find_map(|revert: PlainStorageRevert| {
1375                    if revert.address != address {
1376                        return None
1377                    }
1378                    revert.storage_revert.into_iter().find_map(|(key, value)| {
1379                        let plain_key = B256::from(key.to_be_bytes());
1380                        (plain_key == storage_key).then(|| StorageEntry {
1381                            key: plain_key,
1382                            value: value.to_previous_value(),
1383                        })
1384                    })
1385                });
1386            Ok(changeset)
1387        } else {
1388            let storage_history_exists = self
1389                .storage_provider
1390                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1391                .and_then(|checkpoint| {
1392                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1393                })
1394                .unwrap_or(true);
1395
1396            if !storage_history_exists {
1397                return Err(ProviderError::StateAtBlockPruned(block_number))
1398            }
1399
1400            self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1401        }
1402    }
1403
1404    fn storage_changesets_range(
1405        &self,
1406        range: impl RangeBounds<BlockNumber>,
1407    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1408        let range = to_range(range);
1409        let mut changesets = Vec::new();
1410        let database_start = range.start;
1411        let mut database_end = range.end;
1412
1413        if let Some(head_block) = &self.head_block {
1414            database_end = head_block.anchor().number;
1415
1416            for state in head_block.chain() {
1417                let block_changesets = state
1418                    .block_ref()
1419                    .execution_output
1420                    .state
1421                    .reverts
1422                    .clone()
1423                    .to_plain_state_reverts()
1424                    .storage
1425                    .into_iter()
1426                    .flatten()
1427                    .flat_map(|revert: PlainStorageRevert| {
1428                        revert.storage_revert.into_iter().map(move |(key, value)| {
1429                            let plain_key = B256::from(key.to_be_bytes());
1430                            (
1431                                BlockNumberAddress((state.number(), revert.address)),
1432                                StorageEntry { key: plain_key, value: value.to_previous_value() },
1433                            )
1434                        })
1435                    });
1436
1437                changesets.extend(block_changesets);
1438            }
1439        }
1440
1441        if database_start < database_end {
1442            let storage_history_exists = self
1443                .storage_provider
1444                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1445                .and_then(|checkpoint| {
1446                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1447                })
1448                .unwrap_or(true);
1449
1450            if !storage_history_exists {
1451                return Err(ProviderError::StateAtBlockPruned(database_start))
1452            }
1453
1454            let db_changesets = self
1455                .storage_provider
1456                .storage_changesets_range(database_start..=database_end - 1)?;
1457            changesets.extend(db_changesets);
1458        }
1459
1460        changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1461
1462        Ok(changesets)
1463    }
1464
1465    fn storage_changeset_count(&self) -> ProviderResult<usize> {
1466        let mut count = 0;
1467        if let Some(head_block) = &self.head_block {
1468            for state in head_block.chain() {
1469                count += state
1470                    .block_ref()
1471                    .execution_output
1472                    .state
1473                    .reverts
1474                    .clone()
1475                    .to_plain_state_reverts()
1476                    .storage
1477                    .into_iter()
1478                    .flatten()
1479                    .map(|revert: PlainStorageRevert| revert.storage_revert.len())
1480                    .sum::<usize>();
1481            }
1482        }
1483
1484        count += self.storage_provider.storage_changeset_count()?;
1485
1486        Ok(count)
1487    }
1488}
1489
1490impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1491    fn account_block_changeset(
1492        &self,
1493        block_number: BlockNumber,
1494    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1495        if let Some(state) =
1496            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1497        {
1498            let changesets = state
1499                .block_ref()
1500                .execution_output
1501                .state
1502                .reverts
1503                .clone()
1504                .to_plain_state_reverts()
1505                .accounts
1506                .into_iter()
1507                .flatten()
1508                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1509                .collect();
1510            Ok(changesets)
1511        } else {
1512            // Perform checks on whether or not changesets exist for the block.
1513
1514            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1515            let account_history_exists = self
1516                .storage_provider
1517                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1518                .and_then(|checkpoint| {
1519                    // return true if the block number is ahead of the prune checkpoint.
1520                    //
1521                    // The checkpoint stores the highest pruned block number, so we should make
1522                    // sure the block_number is strictly greater.
1523                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1524                })
1525                .unwrap_or(true);
1526
1527            if !account_history_exists {
1528                return Err(ProviderError::StateAtBlockPruned(block_number))
1529            }
1530
1531            self.storage_provider.account_block_changeset(block_number)
1532        }
1533    }
1534
1535    fn get_account_before_block(
1536        &self,
1537        block_number: BlockNumber,
1538        address: Address,
1539    ) -> ProviderResult<Option<AccountBeforeTx>> {
1540        if let Some(state) =
1541            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1542        {
1543            // Search in-memory state for the account changeset
1544            let changeset = state
1545                .block_ref()
1546                .execution_output
1547                .state
1548                .reverts
1549                .clone()
1550                .to_plain_state_reverts()
1551                .accounts
1552                .into_iter()
1553                .flatten()
1554                .find(|(addr, _)| addr == &address)
1555                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1556            Ok(changeset)
1557        } else {
1558            // Perform checks on whether or not changesets exist for the block.
1559            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1560            let account_history_exists = self
1561                .storage_provider
1562                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1563                .and_then(|checkpoint| {
1564                    // return true if the block number is ahead of the prune checkpoint.
1565                    //
1566                    // The checkpoint stores the highest pruned block number, so we should make
1567                    // sure the block_number is strictly greater.
1568                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1569                })
1570                .unwrap_or(true);
1571
1572            if !account_history_exists {
1573                return Err(ProviderError::StateAtBlockPruned(block_number))
1574            }
1575
1576            // Delegate to the storage provider for database lookups
1577            self.storage_provider.get_account_before_block(block_number, address)
1578        }
1579    }
1580
1581    fn account_changesets_range(
1582        &self,
1583        range: impl core::ops::RangeBounds<BlockNumber>,
1584    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1585        let range = to_range(range);
1586        let mut changesets = Vec::new();
1587        let database_start = range.start;
1588        let mut database_end = range.end;
1589
1590        // Check which blocks in the range are in memory
1591        if let Some(head_block) = &self.head_block {
1592            // the anchor is the end of the db range
1593            database_end = head_block.anchor().number;
1594
1595            for state in head_block.chain() {
1596                // found block in memory, collect its changesets
1597                let block_changesets = state
1598                    .block_ref()
1599                    .execution_output
1600                    .state
1601                    .reverts
1602                    .clone()
1603                    .to_plain_state_reverts()
1604                    .accounts
1605                    .into_iter()
1606                    .flatten()
1607                    .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1608
1609                for changeset in block_changesets {
1610                    changesets.push((state.number(), changeset));
1611                }
1612            }
1613        }
1614
1615        // get changesets from database for remaining blocks
1616        if database_start < database_end {
1617            // check if account history is pruned for these blocks
1618            let account_history_exists = self
1619                .storage_provider
1620                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1621                .and_then(|checkpoint| {
1622                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1623                })
1624                .unwrap_or(true);
1625
1626            if !account_history_exists {
1627                return Err(ProviderError::StateAtBlockPruned(database_start))
1628            }
1629
1630            let db_changesets =
1631                self.storage_provider.account_changesets_range(database_start..database_end)?;
1632            changesets.extend(db_changesets);
1633        }
1634
1635        changesets.sort_by_key(|(block_num, _)| *block_num);
1636
1637        Ok(changesets)
1638    }
1639
1640    fn account_changeset_count(&self) -> ProviderResult<usize> {
1641        // Count changesets from in-memory state
1642        let mut count = 0;
1643        if let Some(head_block) = &self.head_block {
1644            for state in head_block.chain() {
1645                count += state
1646                    .block_ref()
1647                    .execution_output
1648                    .state
1649                    .reverts
1650                    .clone()
1651                    .to_plain_state_reverts()
1652                    .accounts
1653                    .len();
1654            }
1655        }
1656
1657        // Add changesets from storage provider
1658        count += self.storage_provider.account_changeset_count()?;
1659
1660        Ok(count)
1661    }
1662}
1663
1664impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1665    /// Get basic account information.
1666    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1667        // use latest state provider
1668        let state_provider = self.latest_ref()?;
1669        state_provider.basic_account(address)
1670    }
1671}
1672
1673impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1674    type Receipt = ReceiptTy<N>;
1675
1676    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1677    ///
1678    /// If data for the block does not exist, this will return [`None`].
1679    ///
1680    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1681    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1682    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1683    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1684    /// first place.
1685    fn get_state(
1686        &self,
1687        block: BlockNumber,
1688    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1689        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1690            let state = state.block_ref().execution_outcome().clone();
1691            Ok(Some(ExecutionOutcome::from((state, block))))
1692        } else {
1693            Self::get_state(self, block..=block)
1694        }
1695    }
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700    use crate::{
1701        providers::blockchain_provider::BlockchainProvider,
1702        test_utils::create_test_provider_factory, BlockWriter,
1703    };
1704    use alloy_eips::BlockHashOrNumber;
1705    use alloy_primitives::B256;
1706    use itertools::Itertools;
1707    use rand::Rng;
1708    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1709    use reth_db_api::models::AccountBeforeTx;
1710    use reth_ethereum_primitives::Block;
1711    use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1712    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1713    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1714    use reth_testing_utils::generators::{
1715        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1716    };
1717    use revm_database::BundleState;
1718    use std::{
1719        ops::{Bound, Range, RangeBounds},
1720        sync::Arc,
1721    };
1722
1723    const TEST_BLOCKS_COUNT: usize = 5;
1724
1725    fn random_blocks(
1726        rng: &mut impl Rng,
1727        database_blocks: usize,
1728        in_memory_blocks: usize,
1729        requests_count: Option<Range<u8>>,
1730        withdrawals_count: Option<Range<u8>>,
1731        tx_count: impl RangeBounds<u8>,
1732    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1733        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1734
1735        let tx_start = match tx_count.start_bound() {
1736            Bound::Included(&n) | Bound::Excluded(&n) => n,
1737            Bound::Unbounded => u8::MIN,
1738        };
1739        let tx_end = match tx_count.end_bound() {
1740            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1741            Bound::Unbounded => u8::MAX,
1742        };
1743
1744        let blocks = random_block_range(
1745            rng,
1746            0..=block_range,
1747            BlockRangeParams {
1748                parent: Some(B256::ZERO),
1749                tx_count: tx_start..tx_end,
1750                requests_count,
1751                withdrawals_count,
1752            },
1753        );
1754        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1755        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1756    }
1757
1758    #[test]
1759    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1760        // Initialize random number generator and provider factory
1761        let mut rng = generators::rng();
1762        let factory = create_test_provider_factory();
1763
1764        // Generate 10 random blocks and split into database and in-memory blocks
1765        let blocks = random_block_range(
1766            &mut rng,
1767            0..=10,
1768            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1769        );
1770        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1771
1772        // Insert first 5 blocks into the database
1773        let provider_rw = factory.provider_rw()?;
1774        for block in database_blocks {
1775            provider_rw.insert_block(
1776                &block.clone().try_recover().expect("failed to seal block with senders"),
1777            )?;
1778        }
1779        provider_rw.commit()?;
1780
1781        // Create a new provider
1782        let provider = BlockchainProvider::new(factory)?;
1783        let consistent_provider = provider.consistent_provider()?;
1784
1785        // Useful blocks
1786        let first_db_block = database_blocks.first().unwrap();
1787        let first_in_mem_block = in_memory_blocks.first().unwrap();
1788        let last_in_mem_block = in_memory_blocks.last().unwrap();
1789
1790        // No block in memory before setting in memory state
1791        assert_eq!(
1792            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1793            None
1794        );
1795        assert_eq!(
1796            consistent_provider
1797                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1798            None
1799        );
1800        // No pending block in memory
1801        assert_eq!(
1802            consistent_provider
1803                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1804            None
1805        );
1806
1807        // Insert first block into the in-memory state
1808        let in_memory_block_senders =
1809            first_in_mem_block.senders().expect("failed to recover senders");
1810        let chain = NewCanonicalChain::Commit {
1811            new: vec![ExecutedBlock {
1812                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1813                    first_in_mem_block.clone(),
1814                    in_memory_block_senders,
1815                )),
1816                ..Default::default()
1817            }],
1818        };
1819        consistent_provider.canonical_in_memory_state.update_chain(chain);
1820        let consistent_provider = provider.consistent_provider()?;
1821
1822        // Now the block should be found in memory
1823        assert_eq!(
1824            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1825            Some(first_in_mem_block.clone().into_block())
1826        );
1827        assert_eq!(
1828            consistent_provider
1829                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1830            Some(first_in_mem_block.clone().into_block())
1831        );
1832
1833        // Find the first block in database by hash
1834        assert_eq!(
1835            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1836            Some(first_db_block.clone().into_block())
1837        );
1838        assert_eq!(
1839            consistent_provider
1840                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1841            Some(first_db_block.clone().into_block())
1842        );
1843
1844        // No pending block in database
1845        assert_eq!(
1846            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1847            None
1848        );
1849
1850        // Insert the last block into the pending state
1851        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1852            recovered_block: Arc::new(RecoveredBlock::new_sealed(
1853                last_in_mem_block.clone(),
1854                Default::default(),
1855            )),
1856            ..Default::default()
1857        });
1858
1859        // Now the last block should be found in memory
1860        assert_eq!(
1861            consistent_provider
1862                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1863            Some(last_in_mem_block.clone_block())
1864        );
1865
1866        Ok(())
1867    }
1868
1869    #[test]
1870    fn test_block_reader_block() -> eyre::Result<()> {
1871        // Initialize random number generator and provider factory
1872        let mut rng = generators::rng();
1873        let factory = create_test_provider_factory();
1874
1875        // Generate 10 random blocks and split into database and in-memory blocks
1876        let blocks = random_block_range(
1877            &mut rng,
1878            0..=10,
1879            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1880        );
1881        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1882
1883        // Insert first 5 blocks into the database
1884        let provider_rw = factory.provider_rw()?;
1885        for block in database_blocks {
1886            provider_rw.insert_block(
1887                &block.clone().try_recover().expect("failed to seal block with senders"),
1888            )?;
1889        }
1890        provider_rw.commit()?;
1891
1892        // Create a new provider
1893        let provider = BlockchainProvider::new(factory)?;
1894        let consistent_provider = provider.consistent_provider()?;
1895
1896        // First in memory block
1897        let first_in_mem_block = in_memory_blocks.first().unwrap();
1898        // First database block
1899        let first_db_block = database_blocks.first().unwrap();
1900
1901        // First in memory block should not be found yet as not integrated to the in-memory state
1902        assert_eq!(
1903            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1904            None
1905        );
1906        assert_eq!(
1907            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1908            None
1909        );
1910
1911        // Insert first block into the in-memory state
1912        let in_memory_block_senders =
1913            first_in_mem_block.senders().expect("failed to recover senders");
1914        let chain = NewCanonicalChain::Commit {
1915            new: vec![ExecutedBlock {
1916                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1917                    first_in_mem_block.clone(),
1918                    in_memory_block_senders,
1919                )),
1920                ..Default::default()
1921            }],
1922        };
1923        consistent_provider.canonical_in_memory_state.update_chain(chain);
1924
1925        let consistent_provider = provider.consistent_provider()?;
1926
1927        // First in memory block should be found
1928        assert_eq!(
1929            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1930            Some(first_in_mem_block.clone().into_block())
1931        );
1932        assert_eq!(
1933            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1934            Some(first_in_mem_block.clone().into_block())
1935        );
1936
1937        // First database block should be found
1938        assert_eq!(
1939            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1940            Some(first_db_block.clone().into_block())
1941        );
1942        assert_eq!(
1943            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1944            Some(first_db_block.clone().into_block())
1945        );
1946
1947        Ok(())
1948    }
1949
1950    #[test]
1951    fn test_changeset_reader() -> eyre::Result<()> {
1952        let mut rng = generators::rng();
1953
1954        let (database_blocks, in_memory_blocks) =
1955            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1956
1957        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1958        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1959        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1960
1961        let accounts = random_eoa_accounts(&mut rng, 2);
1962
1963        let (database_changesets, database_state) = random_changeset_range(
1964            &mut rng,
1965            &database_blocks,
1966            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1967            0..0,
1968            0..0,
1969        );
1970        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1971            &mut rng,
1972            &in_memory_blocks,
1973            database_state
1974                .iter()
1975                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1976            0..0,
1977            0..0,
1978        );
1979
1980        let factory = create_test_provider_factory();
1981
1982        let provider_rw = factory.provider_rw()?;
1983        provider_rw.append_blocks_with_state(
1984            database_blocks
1985                .into_iter()
1986                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1987                .collect(),
1988            &ExecutionOutcome {
1989                bundle: BundleState::new(
1990                    database_state.into_iter().map(|(address, (account, _))| {
1991                        (address, None, Some(account.into()), Default::default())
1992                    }),
1993                    database_changesets.iter().map(|block_changesets| {
1994                        block_changesets.iter().map(|(address, account, _)| {
1995                            (*address, Some(Some((*account).into())), [])
1996                        })
1997                    }),
1998                    Vec::new(),
1999                ),
2000                first_block: first_database_block,
2001                ..Default::default()
2002            },
2003            Default::default(),
2004        )?;
2005        provider_rw.commit()?;
2006
2007        let provider = BlockchainProvider::new(factory)?;
2008
2009        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
2010        let chain = NewCanonicalChain::Commit {
2011            new: vec![in_memory_blocks
2012                .first()
2013                .map(|block| {
2014                    let senders = block.senders().expect("failed to recover senders");
2015                    ExecutedBlock {
2016                        recovered_block: Arc::new(RecoveredBlock::new_sealed(
2017                            block.clone(),
2018                            senders,
2019                        )),
2020                        execution_output: Arc::new(BlockExecutionOutput {
2021                            state: BundleState::new(
2022                                in_memory_state.into_iter().map(|(address, (account, _))| {
2023                                    (address, None, Some(account.into()), Default::default())
2024                                }),
2025                                [in_memory_changesets.iter().map(|(address, account, _)| {
2026                                    (*address, Some(Some((*account).into())), Vec::new())
2027                                })],
2028                                [],
2029                            ),
2030                            result: BlockExecutionResult {
2031                                receipts: Default::default(),
2032                                requests: Default::default(),
2033                                gas_used: 0,
2034                                blob_gas_used: 0,
2035                            },
2036                        }),
2037                        ..Default::default()
2038                    }
2039                })
2040                .unwrap()],
2041        };
2042        provider.canonical_in_memory_state.update_chain(chain);
2043
2044        let consistent_provider = provider.consistent_provider()?;
2045
2046        assert_eq!(
2047            consistent_provider.account_block_changeset(last_database_block).unwrap(),
2048            database_changesets
2049                .into_iter()
2050                .next_back()
2051                .unwrap()
2052                .into_iter()
2053                .sorted_by_key(|(address, _, _)| *address)
2054                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2055                .collect::<Vec<_>>()
2056        );
2057        assert_eq!(
2058            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2059            in_memory_changesets
2060                .into_iter()
2061                .sorted_by_key(|(address, _, _)| *address)
2062                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2063                .collect::<Vec<_>>()
2064        );
2065
2066        Ok(())
2067    }
2068    #[test]
2069    fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
2070        use alloy_primitives::U256;
2071        use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2072        use reth_primitives_traits::StorageEntry;
2073        use reth_storage_api::StorageSettingsCache;
2074        use std::collections::HashMap;
2075
2076        let address = alloy_primitives::Address::with_last_byte(1);
2077        let account = reth_primitives_traits::Account {
2078            nonce: 1,
2079            balance: U256::from(1000),
2080            bytecode_hash: None,
2081        };
2082        let slot = U256::from(0x42);
2083        let slot_b256 = B256::from(slot);
2084
2085        let mut rng = generators::rng();
2086        let factory = create_test_provider_factory();
2087        factory.set_storage_settings_cache(StorageSettings::v1());
2088
2089        let blocks = random_block_range(
2090            &mut rng,
2091            0..=1,
2092            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2093        );
2094
2095        let provider_rw = factory.provider_rw()?;
2096        provider_rw.append_blocks_with_state(
2097            blocks
2098                .into_iter()
2099                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2100                .collect(),
2101            &ExecutionOutcome {
2102                bundle: BundleState::new(
2103                    [(address, None, Some(account.into()), {
2104                        let mut s = HashMap::default();
2105                        s.insert(slot, (U256::ZERO, U256::from(100)));
2106                        s
2107                    })],
2108                    [
2109                        Vec::new(),
2110                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2111                    ],
2112                    [],
2113                ),
2114                first_block: 0,
2115                ..Default::default()
2116            },
2117            Default::default(),
2118        )?;
2119
2120        provider_rw.tx_ref().put::<tables::PlainStorageState>(
2121            address,
2122            StorageEntry { key: slot_b256, value: U256::from(100) },
2123        )?;
2124        provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
2125
2126        provider_rw.commit()?;
2127
2128        let provider = BlockchainProvider::new(factory)?;
2129        let consistent_provider = provider.consistent_provider()?;
2130
2131        let outcome =
2132            consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2133
2134        let state = &outcome.bundle.state;
2135        let account_state = state.get(&address).expect("should have account in bundle state");
2136        let storage = &account_state.storage;
2137
2138        let storage_slot = storage.get(&slot).expect("should have the slot in storage");
2139
2140        assert_eq!(
2141            storage_slot.present_value,
2142            U256::from(100),
2143            "present_value should be 100 (the actual value in PlainStorageState)"
2144        );
2145
2146        Ok(())
2147    }
2148
2149    #[test]
2150    fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
2151        use alloy_primitives::U256;
2152        use reth_db_api::models::StorageSettings;
2153        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2154        use std::collections::HashMap;
2155
2156        let mut rng = generators::rng();
2157        let factory = create_test_provider_factory();
2158        factory.set_storage_settings_cache(StorageSettings::v1());
2159
2160        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2161
2162        let address = alloy_primitives::Address::with_last_byte(1);
2163        let account = reth_primitives_traits::Account {
2164            nonce: 1,
2165            balance: U256::from(1000),
2166            bytecode_hash: None,
2167        };
2168        let slot = U256::from(0x42);
2169
2170        let provider_rw = factory.provider_rw()?;
2171        provider_rw.append_blocks_with_state(
2172            database_blocks
2173                .into_iter()
2174                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2175                .collect(),
2176            &ExecutionOutcome {
2177                bundle: BundleState::new(
2178                    [(address, None, Some(account.into()), {
2179                        let mut s = HashMap::default();
2180                        s.insert(slot, (U256::ZERO, U256::from(100)));
2181                        s
2182                    })],
2183                    [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2184                    [],
2185                ),
2186                first_block: 0,
2187                ..Default::default()
2188            },
2189            Default::default(),
2190        )?;
2191        provider_rw.commit()?;
2192
2193        let provider = BlockchainProvider::new(factory)?;
2194
2195        let in_mem_block = in_memory_blocks.first().unwrap();
2196        let senders = in_mem_block.senders().expect("failed to recover senders");
2197        let chain = NewCanonicalChain::Commit {
2198            new: vec![ExecutedBlock {
2199                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2200                    in_mem_block.clone(),
2201                    senders,
2202                )),
2203                execution_output: Arc::new(BlockExecutionOutput {
2204                    state: BundleState::new(
2205                        [(address, None, Some(account.into()), {
2206                            let mut s = HashMap::default();
2207                            s.insert(slot, (U256::from(100), U256::from(200)));
2208                            s
2209                        })],
2210                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2211                        [],
2212                    ),
2213                    result: BlockExecutionResult {
2214                        receipts: Default::default(),
2215                        requests: Default::default(),
2216                        gas_used: 0,
2217                        blob_gas_used: 0,
2218                    },
2219                }),
2220                ..Default::default()
2221            }],
2222        };
2223        provider.canonical_in_memory_state.update_chain(chain);
2224
2225        let consistent_provider = provider.consistent_provider()?;
2226
2227        let db_changeset = consistent_provider.storage_changeset(0)?;
2228        let mem_changeset = consistent_provider.storage_changeset(1)?;
2229
2230        let slot_b256 = B256::from(slot);
2231
2232        assert_eq!(db_changeset.len(), 1);
2233        assert_eq!(mem_changeset.len(), 1);
2234
2235        let db_key = db_changeset[0].1.key;
2236        let mem_key = mem_changeset[0].1.key;
2237
2238        assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2239        assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2240        assert_eq!(
2241            db_key, mem_key,
2242            "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2243        );
2244
2245        Ok(())
2246    }
2247
2248    #[test]
2249    fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2250        use alloy_primitives::U256;
2251        use reth_db_api::models::StorageSettings;
2252        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2253        use std::collections::HashMap;
2254
2255        let mut rng = generators::rng();
2256        let factory = create_test_provider_factory();
2257        factory.set_storage_settings_cache(StorageSettings::v1());
2258
2259        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2260
2261        let address = alloy_primitives::Address::with_last_byte(1);
2262        let account = reth_primitives_traits::Account {
2263            nonce: 1,
2264            balance: U256::from(1000),
2265            bytecode_hash: None,
2266        };
2267        let slot = U256::from(0x42);
2268
2269        let provider_rw = factory.provider_rw()?;
2270        provider_rw.append_blocks_with_state(
2271            database_blocks
2272                .into_iter()
2273                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2274                .collect(),
2275            &ExecutionOutcome {
2276                bundle: BundleState::new(
2277                    [(address, None, Some(account.into()), {
2278                        let mut s = HashMap::default();
2279                        s.insert(slot, (U256::ZERO, U256::from(100)));
2280                        s
2281                    })],
2282                    vec![
2283                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2284                        vec![],
2285                    ],
2286                    [],
2287                ),
2288                first_block: 0,
2289                ..Default::default()
2290            },
2291            Default::default(),
2292        )?;
2293        provider_rw.commit()?;
2294
2295        let provider = BlockchainProvider::new(factory)?;
2296
2297        let in_mem_block = in_memory_blocks.first().unwrap();
2298        let senders = in_mem_block.senders().expect("failed to recover senders");
2299        let chain = NewCanonicalChain::Commit {
2300            new: vec![ExecutedBlock {
2301                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2302                    in_mem_block.clone(),
2303                    senders,
2304                )),
2305                execution_output: Arc::new(BlockExecutionOutput {
2306                    state: BundleState::new(
2307                        [(address, None, Some(account.into()), {
2308                            let mut s = HashMap::default();
2309                            s.insert(slot, (U256::from(100), U256::from(200)));
2310                            s
2311                        })],
2312                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2313                        [],
2314                    ),
2315                    result: BlockExecutionResult {
2316                        receipts: Default::default(),
2317                        requests: Default::default(),
2318                        gas_used: 0,
2319                        blob_gas_used: 0,
2320                    },
2321                }),
2322                ..Default::default()
2323            }],
2324        };
2325        provider.canonical_in_memory_state.update_chain(chain);
2326
2327        let consistent_provider = provider.consistent_provider()?;
2328
2329        let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2330
2331        assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2332
2333        let slot_b256 = B256::from(slot);
2334        let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key).collect();
2335
2336        assert_eq!(
2337            keys[0], keys[1],
2338            "same logical slot should produce identical keys whether from DB or memory"
2339        );
2340        assert_eq!(
2341            keys[0], slot_b256,
2342            "keys should be plain/unhashed when use_hashed_state is false"
2343        );
2344
2345        Ok(())
2346    }
2347}