Skip to main content

reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4    to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5    BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8    TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12    eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13    HashOrNumber,
14};
15use alloy_primitives::{
16    map::{hash_map, HashMap},
17    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{
25    Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry, StorageSlotKey,
26};
27use reth_prune_types::{PruneCheckpoint, PruneSegment};
28use reth_stages_types::{StageCheckpoint, StageId};
29use reth_static_file_types::StaticFileSegment;
30use reth_storage_api::{
31    BlockBodyIndicesProvider, ChangesetEntry, DatabaseProviderFactory, NodePrimitivesProvider,
32    StateProvider, StateProviderBox, StorageChangeSetReader, StorageSettingsCache,
33    TryIntoHistoricalStateProvider,
34};
35use reth_storage_errors::provider::ProviderResult;
36use revm_database::states::PlainStorageRevert;
37use std::{
38    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
39    sync::Arc,
40};
41use tracing::trace;
42
43/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
44/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
45/// this provider.
46///
47/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
48/// time-out.
49#[derive(Debug)]
50#[doc(hidden)] // triggers ICE for `cargo docs`
51pub struct ConsistentProvider<N: ProviderNodeTypes> {
52    /// Storage provider.
53    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
54    /// Head block at time of [`Self`] creation
55    head_block: Option<Arc<BlockState<N::Primitives>>>,
56    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
57    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
58}
59
60impl<N: ProviderNodeTypes> ConsistentProvider<N> {
61    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
62    ///
63    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
64    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
65    /// view of memory and database.
66    pub fn new(
67        storage_provider_factory: ProviderFactory<N>,
68        state: CanonicalInMemoryState<N::Primitives>,
69    ) -> ProviderResult<Self> {
70        // Each one provides a snapshot at the time of instantiation, but its order matters.
71        //
72        // If we acquire first the database provider, it's possible that before the in-memory chain
73        // snapshot is instantiated, it will flush blocks to disk. This would
74        // mean that our database provider would not have access to the flushed blocks (since it's
75        // working under an older view), while the in-memory state may have deleted them
76        // entirely. Resulting in gaps on the range.
77        let head_block = state.head_state();
78        let storage_provider = storage_provider_factory.database_provider_ro()?;
79        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
80    }
81
82    // Helper function to convert range bounds
83    fn convert_range_bounds<T>(
84        &self,
85        range: impl RangeBounds<T>,
86        end_unbounded: impl FnOnce() -> T,
87    ) -> (T, T)
88    where
89        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
90    {
91        let start = match range.start_bound() {
92            Bound::Included(&n) => n,
93            Bound::Excluded(&n) => n + T::from(1u8),
94            Bound::Unbounded => T::from(0u8),
95        };
96
97        let end = match range.end_bound() {
98            Bound::Included(&n) => n,
99            Bound::Excluded(&n) => n - T::from(1u8),
100            Bound::Unbounded => end_unbounded(),
101        };
102
103        (start, end)
104    }
105
106    /// Storage provider for latest block
107    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
108        trace!(target: "providers::blockchain", "Getting latest block state provider");
109
110        // use latest state provider if the head state exists
111        if let Some(state) = &self.head_block {
112            trace!(target: "providers::blockchain", "Using head state for latest state provider");
113            Ok(self.block_state_provider_ref(state)?.boxed())
114        } else {
115            trace!(target: "providers::blockchain", "Using database state for latest state provider");
116            Ok(self.storage_provider.latest())
117        }
118    }
119
120    fn history_by_block_hash_ref<'a>(
121        &'a self,
122        block_hash: BlockHash,
123    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
124        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
125
126        self.get_in_memory_or_storage_by_block(
127            block_hash.into(),
128            |_| self.storage_provider.history_by_block_hash(block_hash),
129            |block_state| {
130                let state_provider = self.block_state_provider_ref(block_state)?;
131                Ok(Box::new(state_provider))
132            },
133        )
134    }
135
136    /// Returns a state provider indexed by the given block number or tag.
137    fn state_by_block_number_ref<'a>(
138        &'a self,
139        number: BlockNumber,
140    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
141        let hash =
142            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
143        self.history_by_block_hash_ref(hash)
144    }
145
146    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
147    ///
148    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
149    pub fn get_state(
150        &self,
151        range: RangeInclusive<BlockNumber>,
152    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
153        if range.is_empty() {
154            return Ok(None)
155        }
156        let start_block_number = *range.start();
157        let end_block_number = *range.end();
158
159        // We are not removing block meta as it is used to get block changesets.
160        let mut block_bodies = Vec::new();
161        for block_num in range.clone() {
162            let block_body = self
163                .block_body_indices(block_num)?
164                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
165            block_bodies.push((block_num, block_body))
166        }
167
168        // get transaction receipts
169        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
170        else {
171            return Ok(None)
172        };
173        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
174            return Ok(None)
175        };
176
177        let mut account_changeset = Vec::new();
178        for block_num in range.clone() {
179            let changeset =
180                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
181            account_changeset.extend(changeset);
182        }
183
184        let mut storage_changeset = Vec::new();
185        for block_num in range {
186            let changeset = self.storage_changeset(block_num)?;
187            storage_changeset.extend(changeset);
188        }
189
190        let (state, reverts) =
191            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
192
193        let mut receipt_iter =
194            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
195
196        let mut receipts = Vec::with_capacity(block_bodies.len());
197        // loop break if we are at the end of the blocks.
198        for (_, block_body) in block_bodies {
199            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
200            for tx_num in block_body.tx_num_range() {
201                let receipt = receipt_iter
202                    .next()
203                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
204                block_receipts.push(receipt);
205            }
206            receipts.push(block_receipts);
207        }
208
209        Ok(Some(ExecutionOutcome::new_init(
210            state,
211            reverts,
212            // We skip new contracts since we never delete them from the database
213            Vec::new(),
214            receipts,
215            start_block_number,
216            Vec::new(),
217        )))
218    }
219
220    /// Populate a [`BundleStateInit`] and [`RevertsInit`] based on the given storage and account
221    /// changesets.
222    ///
223    /// When `use_hashed_state` is enabled, storage changeset keys are already hashed, so current
224    /// values are read directly from [`reth_db_api::tables::HashedStorages`]. Otherwise, values
225    /// are read via [`StateProvider::storage`] which queries plain state tables.
226    fn populate_bundle_state(
227        &self,
228        account_changeset: Vec<(u64, AccountBeforeTx)>,
229        storage_changeset: Vec<(BlockNumberAddress, ChangesetEntry)>,
230        block_range_end: BlockNumber,
231    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
232        let mut state: BundleStateInit = HashMap::default();
233        let mut reverts: RevertsInit = HashMap::default();
234        let state_provider = self.state_by_block_number_ref(block_range_end)?;
235
236        // add account changeset changes
237        for (block_number, account_before) in account_changeset.into_iter().rev() {
238            let AccountBeforeTx { info: old_info, address } = account_before;
239            match state.entry(address) {
240                hash_map::Entry::Vacant(entry) => {
241                    let new_info = state_provider.basic_account(&address)?;
242                    entry.insert((old_info, new_info, HashMap::default()));
243                }
244                hash_map::Entry::Occupied(mut entry) => {
245                    // overwrite old account state.
246                    entry.get_mut().0 = old_info;
247                }
248            }
249            // insert old info into reverts.
250            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
251        }
252
253        // add storage changeset changes
254        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
255            let BlockNumberAddress((block_number, address)) = block_and_address;
256            // get account state or insert from plain state.
257            let account_state = match state.entry(address) {
258                hash_map::Entry::Vacant(entry) => {
259                    let present_info = state_provider.basic_account(&address)?;
260                    entry.insert((present_info, present_info, HashMap::default()))
261                }
262                hash_map::Entry::Occupied(entry) => entry.into_mut(),
263            };
264
265            // match storage.
266            match account_state.2.entry(old_storage.key.as_b256()) {
267                hash_map::Entry::Vacant(entry) => {
268                    let new_storage_value = match old_storage.key {
269                        StorageSlotKey::Hashed(_) => state_provider
270                            .storage_by_hashed_key(address, old_storage.key.as_b256())?
271                            .unwrap_or_default(),
272                        StorageSlotKey::Plain(_) => state_provider
273                            .storage(address, old_storage.key.as_b256())?
274                            .unwrap_or_default(),
275                    };
276                    entry.insert((old_storage.value, new_storage_value));
277                }
278                hash_map::Entry::Occupied(mut entry) => {
279                    entry.get_mut().0 = old_storage.value;
280                }
281            };
282
283            reverts
284                .entry(block_number)
285                .or_default()
286                .entry(address)
287                .or_default()
288                .1
289                .push(StorageEntry::from(old_storage));
290        }
291
292        Ok((state, reverts))
293    }
294
295    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
296    /// is met.
297    ///
298    /// Creates a snapshot of the in-memory chain state and database provider to prevent
299    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
300    /// recent in-memory blocks in case of overlaps.
301    ///
302    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
303    ///   user to retrieve the required items from the database using [`RangeInclusive`].
304    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
305    ///   state, allowing for selection or filtering for the desired data.
306    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
307        &self,
308        range: impl RangeBounds<BlockNumber>,
309        fetch_db_range: F,
310        map_block_state_item: G,
311        mut predicate: P,
312    ) -> ProviderResult<Vec<T>>
313    where
314        F: FnOnce(
315            &DatabaseProviderRO<N::DB, N>,
316            RangeInclusive<BlockNumber>,
317            &mut P,
318        ) -> ProviderResult<Vec<T>>,
319        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
320        P: FnMut(&T) -> bool,
321    {
322        // Each one provides a snapshot at the time of instantiation, but its order matters.
323        //
324        // If we acquire first the database provider, it's possible that before the in-memory chain
325        // snapshot is instantiated, it will flush blocks to disk. This would
326        // mean that our database provider would not have access to the flushed blocks (since it's
327        // working under an older view), while the in-memory state may have deleted them
328        // entirely. Resulting in gaps on the range.
329        let mut in_memory_chain =
330            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
331        let db_provider = &self.storage_provider;
332
333        let (start, end) = self.convert_range_bounds(range, || {
334            // the first block is the highest one.
335            in_memory_chain
336                .first()
337                .map(|b| b.number())
338                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
339        });
340
341        if start > end {
342            return Ok(vec![])
343        }
344
345        // Split range into storage_range and in-memory range. If the in-memory range is not
346        // necessary drop it early.
347        //
348        // The last block of `in_memory_chain` is the lowest block number.
349        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
350            Some(lowest_memory_block) if lowest_memory_block <= end => {
351                let highest_memory_block =
352                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
353
354                // Database will for a time overlap with in-memory-chain blocks. In
355                // case of a re-org, it can mean that the database blocks are of a forked chain, and
356                // so, we should prioritize the in-memory overlapped blocks.
357                let in_memory_range =
358                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
359
360                // If requested range is in the middle of the in-memory range, remove the necessary
361                // lowest blocks
362                in_memory_chain.truncate(
363                    in_memory_chain
364                        .len()
365                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
366                );
367
368                let storage_range =
369                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
370
371                (Some((in_memory_chain, in_memory_range)), storage_range)
372            }
373            _ => {
374                // Drop the in-memory chain so we don't hold blocks in memory.
375                drop(in_memory_chain);
376
377                (None, Some(start..=end))
378            }
379        };
380
381        let mut items = Vec::with_capacity((end - start + 1) as usize);
382
383        if let Some(storage_range) = storage_range {
384            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
385            items.append(&mut db_items);
386
387            // The predicate was not met, if the number of items differs from the expected. So, we
388            // return what we have.
389            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
390                return Ok(items)
391            }
392        }
393
394        if let Some((in_memory_chain, in_memory_range)) = in_memory {
395            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
396                debug_assert!(num == block.number());
397                if let Some(item) = map_block_state_item(block, &mut predicate) {
398                    items.push(item);
399                } else {
400                    break
401                }
402            }
403        }
404
405        Ok(items)
406    }
407
408    /// This uses a given [`BlockState`] to initialize a state provider for that block.
409    fn block_state_provider_ref(
410        &self,
411        state: &BlockState<N::Primitives>,
412    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
413        let anchor_hash = state.anchor().hash;
414        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
415        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
416        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
417    }
418
419    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
420    ///
421    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
422    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
423    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
424    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
425        &self,
426        range: impl RangeBounds<BlockNumber>,
427        fetch_from_db: S,
428        fetch_from_block_state: M,
429    ) -> ProviderResult<Vec<R>>
430    where
431        S: FnOnce(
432            &DatabaseProviderRO<N::DB, N>,
433            RangeInclusive<TxNumber>,
434        ) -> ProviderResult<Vec<R>>,
435        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
436    {
437        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
438        let provider = &self.storage_provider;
439
440        // Get the last block number stored in the storage which does NOT overlap with in-memory
441        // chain.
442        let last_database_block_number = in_mem_chain
443            .last()
444            .map(|b| Ok(b.anchor().number))
445            .unwrap_or_else(|| provider.last_block_number())?;
446
447        // Get the next tx number for the last block stored in the storage, which marks the start of
448        // the in-memory state.
449        let last_block_body_index = provider
450            .block_body_indices(last_database_block_number)?
451            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
452        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
453
454        let (start, end) = self.convert_range_bounds(range, || {
455            in_mem_chain
456                .iter()
457                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
458                .sum::<u64>() +
459                last_block_body_index.last_tx_num()
460        });
461
462        if start > end {
463            return Ok(vec![])
464        }
465
466        let mut tx_range = start..=end;
467
468        // If the range is entirely before the first in-memory transaction number, fetch from
469        // storage
470        if *tx_range.end() < in_memory_tx_num {
471            return fetch_from_db(provider, tx_range);
472        }
473
474        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
475
476        // If the range spans storage and memory, get elements from storage first.
477        if *tx_range.start() < in_memory_tx_num {
478            // Determine the range that needs to be fetched from storage.
479            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
480
481            // Set the remaining transaction range for in-memory
482            tx_range = in_memory_tx_num..=*tx_range.end();
483
484            items.extend(fetch_from_db(provider, db_range)?);
485        }
486
487        // Iterate from the lowest block to the highest in-memory chain
488        for block_state in in_mem_chain.iter().rev() {
489            let block_tx_count =
490                block_state.block_ref().recovered_block().body().transactions().len();
491            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
492
493            // If the transaction range start is equal or higher than the next block first
494            // transaction, advance
495            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
496                in_memory_tx_num += block_tx_count as u64;
497                continue
498            }
499
500            // This should only be more than 0 once, in case of a partial range inside a block.
501            let skip = (tx_range.start() - in_memory_tx_num) as usize;
502
503            items.extend(fetch_from_block_state(
504                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
505                block_state,
506            )?);
507
508            in_memory_tx_num += block_tx_count as u64;
509
510            // Break if the range has been fully processed
511            if in_memory_tx_num > *tx_range.end() {
512                break
513            }
514
515            // Set updated range
516            tx_range = in_memory_tx_num..=*tx_range.end();
517        }
518
519        Ok(items)
520    }
521
522    /// Fetches data from either in-memory state or persistent storage by transaction
523    /// [`HashOrNumber`].
524    fn get_in_memory_or_storage_by_tx<S, M, R>(
525        &self,
526        id: HashOrNumber,
527        fetch_from_db: S,
528        fetch_from_block_state: M,
529    ) -> ProviderResult<Option<R>>
530    where
531        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
532        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
533    {
534        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
535        let provider = &self.storage_provider;
536
537        // Get the last block number stored in the database which does NOT overlap with in-memory
538        // chain.
539        let last_database_block_number = in_mem_chain
540            .last()
541            .map(|b| Ok(b.anchor().number))
542            .unwrap_or_else(|| provider.last_block_number())?;
543
544        // Get the next tx number for the last block stored in the database and consider it the
545        // first tx number of the in-memory state
546        let last_block_body_index = provider
547            .block_body_indices(last_database_block_number)?
548            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
549        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
550
551        // If the transaction number is less than the first in-memory transaction number, make a
552        // database lookup
553        if let HashOrNumber::Number(id) = id &&
554            id < in_memory_tx_num
555        {
556            return fetch_from_db(provider)
557        }
558
559        // Iterate from the lowest block to the highest
560        for block_state in in_mem_chain.iter().rev() {
561            let executed_block = block_state.block_ref();
562            let block = executed_block.recovered_block();
563
564            for tx_index in 0..block.body().transactions().len() {
565                match id {
566                    HashOrNumber::Hash(tx_hash) => {
567                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
568                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
569                        }
570                    }
571                    HashOrNumber::Number(id) => {
572                        if id == in_memory_tx_num {
573                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
574                        }
575                    }
576                }
577
578                in_memory_tx_num += 1;
579            }
580        }
581
582        // Not found in-memory, so check database.
583        if let HashOrNumber::Hash(_) = id {
584            return fetch_from_db(provider)
585        }
586
587        Ok(None)
588    }
589
590    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
591    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
592        &self,
593        id: BlockHashOrNumber,
594        fetch_from_db: S,
595        fetch_from_block_state: M,
596    ) -> ProviderResult<R>
597    where
598        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
599        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
600    {
601        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
602            return fetch_from_block_state(block_state)
603        }
604        fetch_from_db(&self.storage_provider)
605    }
606
607    /// Consumes the provider and returns a state provider for the specific block hash.
608    pub(crate) fn into_state_provider_at_block_hash(
609        self,
610        block_hash: BlockHash,
611    ) -> ProviderResult<StateProviderBox> {
612        let Self { storage_provider, head_block, .. } = self;
613        let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
614            let block_number = storage_provider
615                .block_number(block_hash)?
616                .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
617            storage_provider.try_into_history_at_block(block_number)
618        };
619        if let Some(Some(block_state)) =
620            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
621        {
622            let anchor_hash = block_state.anchor().hash;
623            let latest_historical = into_history_at_block_hash(anchor_hash)?;
624            return Ok(Box::new(block_state.state_provider(latest_historical)));
625        }
626        into_history_at_block_hash(block_hash)
627    }
628}
629
630impl<N: ProviderNodeTypes> ConsistentProvider<N> {
631    /// Ensures that the given block number is canonical (synced)
632    ///
633    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
634    /// out of range and would lead to invalid results, mainly during initial sync.
635    ///
636    /// Verifying the `block_number` would be expensive since we need to lookup sync table
637    /// Instead, we ensure that the `block_number` is within the range of the
638    /// [`Self::best_block_number`] which is updated when a block is synced.
639    #[inline]
640    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
641        let latest = self.best_block_number()?;
642        if block_number > latest {
643            Err(ProviderError::HeaderNotFound(block_number.into()))
644        } else {
645            Ok(())
646        }
647    }
648}
649
650impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
651    type Primitives = N::Primitives;
652}
653
654impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
655    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
656        self.storage_provider.static_file_provider()
657    }
658
659    fn get_static_file_writer(
660        &self,
661        block: BlockNumber,
662        segment: StaticFileSegment,
663    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
664        self.storage_provider.get_static_file_writer(block, segment)
665    }
666}
667
668impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
669    type Header = HeaderTy<N>;
670
671    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
672        self.get_in_memory_or_storage_by_block(
673            block_hash.into(),
674            |db_provider| db_provider.header(block_hash),
675            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
676        )
677    }
678
679    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
680        self.get_in_memory_or_storage_by_block(
681            num.into(),
682            |db_provider| db_provider.header_by_number(num),
683            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
684        )
685    }
686
687    fn headers_range(
688        &self,
689        range: impl RangeBounds<BlockNumber>,
690    ) -> ProviderResult<Vec<Self::Header>> {
691        self.get_in_memory_or_storage_by_block_range_while(
692            range,
693            |db_provider, range, _| db_provider.headers_range(range),
694            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
695            |_| true,
696        )
697    }
698
699    fn sealed_header(
700        &self,
701        number: BlockNumber,
702    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
703        self.get_in_memory_or_storage_by_block(
704            number.into(),
705            |db_provider| db_provider.sealed_header(number),
706            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
707        )
708    }
709
710    fn sealed_headers_range(
711        &self,
712        range: impl RangeBounds<BlockNumber>,
713    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
714        self.get_in_memory_or_storage_by_block_range_while(
715            range,
716            |db_provider, range, _| db_provider.sealed_headers_range(range),
717            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
718            |_| true,
719        )
720    }
721
722    fn sealed_headers_while(
723        &self,
724        range: impl RangeBounds<BlockNumber>,
725        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
726    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
727        self.get_in_memory_or_storage_by_block_range_while(
728            range,
729            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
730            |block_state, predicate| {
731                let header = block_state.block_ref().recovered_block().sealed_header();
732                predicate(header).then(|| header.clone())
733            },
734            predicate,
735        )
736    }
737}
738
739impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
740    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
741        self.get_in_memory_or_storage_by_block(
742            number.into(),
743            |db_provider| db_provider.block_hash(number),
744            |block_state| Ok(Some(block_state.hash())),
745        )
746    }
747
748    fn canonical_hashes_range(
749        &self,
750        start: BlockNumber,
751        end: BlockNumber,
752    ) -> ProviderResult<Vec<B256>> {
753        self.get_in_memory_or_storage_by_block_range_while(
754            start..end,
755            |db_provider, inclusive_range, _| {
756                db_provider
757                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
758            },
759            |block_state, _| Some(block_state.hash()),
760            |_| true,
761        )
762    }
763}
764
765impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
766    fn chain_info(&self) -> ProviderResult<ChainInfo> {
767        let best_number = self.best_block_number()?;
768        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
769    }
770
771    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
772        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
773    }
774
775    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
776        self.storage_provider.last_block_number()
777    }
778
779    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
780        self.get_in_memory_or_storage_by_block(
781            hash.into(),
782            |db_provider| db_provider.block_number(hash),
783            |block_state| Ok(Some(block_state.number())),
784        )
785    }
786}
787
788impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
789    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
790        Ok(self.canonical_in_memory_state.pending_block_num_hash())
791    }
792
793    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
794        Ok(self.canonical_in_memory_state.get_safe_num_hash())
795    }
796
797    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
798        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
799    }
800}
801
802impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
803    type Block = BlockTy<N>;
804
805    fn find_block_by_hash(
806        &self,
807        hash: B256,
808        source: BlockSource,
809    ) -> ProviderResult<Option<Self::Block>> {
810        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
811            let Some(block) = self.get_in_memory_or_storage_by_block(
812                hash.into(),
813                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
814                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
815            )?
816        {
817            return Ok(Some(block))
818        }
819
820        if matches!(source, BlockSource::Pending | BlockSource::Any) {
821            return Ok(self
822                .canonical_in_memory_state
823                .pending_block()
824                .filter(|b| b.hash() == hash)
825                .map(|b| b.into_block()))
826        }
827
828        Ok(None)
829    }
830
831    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
832        self.get_in_memory_or_storage_by_block(
833            id,
834            |db_provider| db_provider.block(id),
835            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
836        )
837    }
838
839    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
840        Ok(self.canonical_in_memory_state.pending_recovered_block())
841    }
842
843    fn pending_block_and_receipts(
844        &self,
845    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
846        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
847    }
848
849    /// Returns the block with senders with matching number or hash from database.
850    ///
851    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
852    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
853    ///
854    /// Returns `None` if block is not found.
855    fn recovered_block(
856        &self,
857        id: BlockHashOrNumber,
858        transaction_kind: TransactionVariant,
859    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
860        self.get_in_memory_or_storage_by_block(
861            id,
862            |db_provider| db_provider.recovered_block(id, transaction_kind),
863            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
864        )
865    }
866
867    fn sealed_block_with_senders(
868        &self,
869        id: BlockHashOrNumber,
870        transaction_kind: TransactionVariant,
871    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
872        self.get_in_memory_or_storage_by_block(
873            id,
874            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
875            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
876        )
877    }
878
879    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
880        self.get_in_memory_or_storage_by_block_range_while(
881            range,
882            |db_provider, range, _| db_provider.block_range(range),
883            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
884            |_| true,
885        )
886    }
887
888    fn block_with_senders_range(
889        &self,
890        range: RangeInclusive<BlockNumber>,
891    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
892        self.get_in_memory_or_storage_by_block_range_while(
893            range,
894            |db_provider, range, _| db_provider.block_with_senders_range(range),
895            |block_state, _| Some(block_state.block().recovered_block().clone()),
896            |_| true,
897        )
898    }
899
900    fn recovered_block_range(
901        &self,
902        range: RangeInclusive<BlockNumber>,
903    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
904        self.get_in_memory_or_storage_by_block_range_while(
905            range,
906            |db_provider, range, _| db_provider.recovered_block_range(range),
907            |block_state, _| Some(block_state.block().recovered_block().clone()),
908            |_| true,
909        )
910    }
911
912    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
913        self.get_in_memory_or_storage_by_tx(
914            id.into(),
915            |db_provider| db_provider.block_by_transaction_id(id),
916            |_, _, block_state| Ok(Some(block_state.number())),
917        )
918    }
919}
920
921impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
922    type Transaction = TxTy<N>;
923
924    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
925        self.get_in_memory_or_storage_by_tx(
926            tx_hash.into(),
927            |db_provider| db_provider.transaction_id(tx_hash),
928            |_, tx_number, _| Ok(Some(tx_number)),
929        )
930    }
931
932    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
933        self.get_in_memory_or_storage_by_tx(
934            id.into(),
935            |provider| provider.transaction_by_id(id),
936            |tx_index, _, block_state| {
937                Ok(block_state
938                    .block_ref()
939                    .recovered_block()
940                    .body()
941                    .transactions()
942                    .get(tx_index)
943                    .cloned())
944            },
945        )
946    }
947
948    fn transaction_by_id_unhashed(
949        &self,
950        id: TxNumber,
951    ) -> ProviderResult<Option<Self::Transaction>> {
952        self.get_in_memory_or_storage_by_tx(
953            id.into(),
954            |provider| provider.transaction_by_id_unhashed(id),
955            |tx_index, _, block_state| {
956                Ok(block_state
957                    .block_ref()
958                    .recovered_block()
959                    .body()
960                    .transactions()
961                    .get(tx_index)
962                    .cloned())
963            },
964        )
965    }
966
967    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
968        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
969            return Ok(Some(tx))
970        }
971
972        self.storage_provider.transaction_by_hash(hash)
973    }
974
975    fn transaction_by_hash_with_meta(
976        &self,
977        tx_hash: TxHash,
978    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
979        if let Some((tx, meta)) =
980            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
981        {
982            return Ok(Some((tx, meta)))
983        }
984
985        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
986    }
987
988    fn transactions_by_block(
989        &self,
990        id: BlockHashOrNumber,
991    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
992        self.get_in_memory_or_storage_by_block(
993            id,
994            |provider| provider.transactions_by_block(id),
995            |block_state| {
996                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
997            },
998        )
999    }
1000
1001    fn transactions_by_block_range(
1002        &self,
1003        range: impl RangeBounds<BlockNumber>,
1004    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1005        self.get_in_memory_or_storage_by_block_range_while(
1006            range,
1007            |db_provider, range, _| db_provider.transactions_by_block_range(range),
1008            |block_state, _| {
1009                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1010            },
1011            |_| true,
1012        )
1013    }
1014
1015    fn transactions_by_tx_range(
1016        &self,
1017        range: impl RangeBounds<TxNumber>,
1018    ) -> ProviderResult<Vec<Self::Transaction>> {
1019        self.get_in_memory_or_storage_by_tx_range(
1020            range,
1021            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1022            |index_range, block_state| {
1023                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1024                    .to_vec())
1025            },
1026        )
1027    }
1028
1029    fn senders_by_tx_range(
1030        &self,
1031        range: impl RangeBounds<TxNumber>,
1032    ) -> ProviderResult<Vec<Address>> {
1033        self.get_in_memory_or_storage_by_tx_range(
1034            range,
1035            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1036            |index_range, block_state| {
1037                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1038            },
1039        )
1040    }
1041
1042    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1043        self.get_in_memory_or_storage_by_tx(
1044            id.into(),
1045            |provider| provider.transaction_sender(id),
1046            |tx_index, _, block_state| {
1047                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1048            },
1049        )
1050    }
1051}
1052
1053impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1054    type Receipt = ReceiptTy<N>;
1055
1056    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1057        self.get_in_memory_or_storage_by_tx(
1058            id.into(),
1059            |provider| provider.receipt(id),
1060            |tx_index, _, block_state| {
1061                Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1062            },
1063        )
1064    }
1065
1066    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1067        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1068            let executed_block = block_state.block_ref();
1069            let block = executed_block.recovered_block();
1070            let receipts = block_state.executed_block_receipts_ref();
1071
1072            // assuming 1:1 correspondence between transactions and receipts
1073            debug_assert_eq!(
1074                block.body().transactions().len(),
1075                receipts.len(),
1076                "Mismatch between transaction and receipt count"
1077            );
1078
1079            if let Some(tx_index) =
1080                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1081            {
1082                // safe to use tx_index for receipts due to 1:1 correspondence
1083                return Ok(receipts.get(tx_index).cloned());
1084            }
1085        }
1086
1087        self.storage_provider.receipt_by_hash(hash)
1088    }
1089
1090    fn receipts_by_block(
1091        &self,
1092        block: BlockHashOrNumber,
1093    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1094        self.get_in_memory_or_storage_by_block(
1095            block,
1096            |db_provider| db_provider.receipts_by_block(block),
1097            |block_state| Ok(Some(block_state.executed_block_receipts())),
1098        )
1099    }
1100
1101    fn receipts_by_tx_range(
1102        &self,
1103        range: impl RangeBounds<TxNumber>,
1104    ) -> ProviderResult<Vec<Self::Receipt>> {
1105        self.get_in_memory_or_storage_by_tx_range(
1106            range,
1107            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1108            |index_range, block_state| {
1109                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1110            },
1111        )
1112    }
1113
1114    fn receipts_by_block_range(
1115        &self,
1116        block_range: RangeInclusive<BlockNumber>,
1117    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1118        self.storage_provider.receipts_by_block_range(block_range)
1119    }
1120}
1121
1122impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1123    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1124        match block {
1125            BlockId::Hash(rpc_block_hash) => {
1126                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1127                if receipts.is_none() &&
1128                    !rpc_block_hash.require_canonical.unwrap_or(false) &&
1129                    let Some(state) = self
1130                        .head_block
1131                        .as_ref()
1132                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1133                {
1134                    receipts = Some(state.executed_block_receipts());
1135                }
1136                Ok(receipts)
1137            }
1138            BlockId::Number(num_tag) => match num_tag {
1139                BlockNumberOrTag::Pending => Ok(self
1140                    .canonical_in_memory_state
1141                    .pending_state()
1142                    .map(|block_state| block_state.executed_block_receipts())),
1143                _ => {
1144                    if let Some(num) = self.convert_block_number(num_tag)? {
1145                        self.receipts_by_block(num.into())
1146                    } else {
1147                        Ok(None)
1148                    }
1149                }
1150            },
1151        }
1152    }
1153}
1154
1155impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1156    fn block_body_indices(
1157        &self,
1158        number: BlockNumber,
1159    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1160        self.get_in_memory_or_storage_by_block(
1161            number.into(),
1162            |db_provider| db_provider.block_body_indices(number),
1163            |block_state| {
1164                // Find the last block indices on database
1165                let last_storage_block_number = block_state.anchor().number;
1166                let mut stored_indices = self
1167                    .storage_provider
1168                    .block_body_indices(last_storage_block_number)?
1169                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1170
1171                // Prepare our block indices
1172                stored_indices.first_tx_num = stored_indices.next_tx_num();
1173                stored_indices.tx_count = 0;
1174
1175                // Iterate from the lowest block in memory until our target block
1176                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1177                    let block_tx_count =
1178                        state.block_ref().recovered_block().body().transactions().len() as u64;
1179                    if state.block_ref().recovered_block().number() == number {
1180                        stored_indices.tx_count = block_tx_count;
1181                    } else {
1182                        stored_indices.first_tx_num += block_tx_count;
1183                    }
1184                }
1185
1186                Ok(Some(stored_indices))
1187            },
1188        )
1189    }
1190
1191    fn block_body_indices_range(
1192        &self,
1193        range: RangeInclusive<BlockNumber>,
1194    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1195        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1196    }
1197}
1198
1199impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1200    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1201        self.storage_provider.get_stage_checkpoint(id)
1202    }
1203
1204    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1205        self.storage_provider.get_stage_checkpoint_progress(id)
1206    }
1207
1208    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1209        self.storage_provider.get_all_checkpoints()
1210    }
1211}
1212
1213impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1214    fn get_prune_checkpoint(
1215        &self,
1216        segment: PruneSegment,
1217    ) -> ProviderResult<Option<PruneCheckpoint>> {
1218        self.storage_provider.get_prune_checkpoint(segment)
1219    }
1220
1221    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1222        self.storage_provider.get_prune_checkpoints()
1223    }
1224}
1225
1226impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1227    type ChainSpec = N::ChainSpec;
1228
1229    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1230        ChainSpecProvider::chain_spec(&self.storage_provider)
1231    }
1232}
1233
1234impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1235    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1236        match id {
1237            BlockId::Number(num) => self.block_by_number_or_tag(num),
1238            BlockId::Hash(hash) => {
1239                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1240                // so not at the provider level?
1241                // if we decide to do this at a higher level, then we can make this an automatic
1242                // trait impl
1243                if Some(true) == hash.require_canonical {
1244                    // check the database, canonical blocks are only stored in the database
1245                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1246                } else {
1247                    self.block_by_hash(hash.block_hash)
1248                }
1249            }
1250        }
1251    }
1252
1253    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1254        Ok(match id {
1255            BlockNumberOrTag::Latest => {
1256                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1257            }
1258            BlockNumberOrTag::Finalized => {
1259                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1260            }
1261            BlockNumberOrTag::Safe => {
1262                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1263            }
1264            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1265            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1266
1267            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1268        })
1269    }
1270
1271    fn sealed_header_by_number_or_tag(
1272        &self,
1273        id: BlockNumberOrTag,
1274    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1275        match id {
1276            BlockNumberOrTag::Latest => {
1277                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1278            }
1279            BlockNumberOrTag::Finalized => {
1280                Ok(self.canonical_in_memory_state.get_finalized_header())
1281            }
1282            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1283            BlockNumberOrTag::Earliest => self
1284                .header_by_number(self.earliest_block_number()?)?
1285                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1286            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1287            BlockNumberOrTag::Number(num) => self
1288                .header_by_number(num)?
1289                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1290        }
1291    }
1292
1293    fn sealed_header_by_id(
1294        &self,
1295        id: BlockId,
1296    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1297        Ok(match id {
1298            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1299            BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1300        })
1301    }
1302
1303    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1304        Ok(match id {
1305            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1306            BlockId::Hash(hash) => self.header(hash.block_hash)?,
1307        })
1308    }
1309}
1310
1311impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1312    fn storage_changeset(
1313        &self,
1314        block_number: BlockNumber,
1315    ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1316        let use_hashed = self.storage_provider.cached_storage_settings().use_hashed_state();
1317        if let Some(state) =
1318            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1319        {
1320            let changesets = state
1321                .block()
1322                .execution_output
1323                .state
1324                .reverts
1325                .clone()
1326                .to_plain_state_reverts()
1327                .storage
1328                .into_iter()
1329                .flatten()
1330                .flat_map(|revert: PlainStorageRevert| {
1331                    revert.storage_revert.into_iter().map(move |(key, value)| {
1332                        let tagged_key = StorageSlotKey::from_u256(key).to_changeset(use_hashed);
1333                        (
1334                            BlockNumberAddress((block_number, revert.address)),
1335                            ChangesetEntry { key: tagged_key, value: value.to_previous_value() },
1336                        )
1337                    })
1338                })
1339                .collect();
1340            Ok(changesets)
1341        } else {
1342            // Perform checks on whether or not changesets exist for the block.
1343
1344            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1345            let storage_history_exists = self
1346                .storage_provider
1347                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1348                .and_then(|checkpoint| {
1349                    // return true if the block number is ahead of the prune checkpoint.
1350                    //
1351                    // The checkpoint stores the highest pruned block number, so we should make
1352                    // sure the block_number is strictly greater.
1353                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1354                })
1355                .unwrap_or(true);
1356
1357            if !storage_history_exists {
1358                return Err(ProviderError::StateAtBlockPruned(block_number))
1359            }
1360
1361            self.storage_provider.storage_changeset(block_number)
1362        }
1363    }
1364
1365    fn get_storage_before_block(
1366        &self,
1367        block_number: BlockNumber,
1368        address: Address,
1369        storage_key: B256,
1370    ) -> ProviderResult<Option<ChangesetEntry>> {
1371        let use_hashed = self.storage_provider.cached_storage_settings().use_hashed_state();
1372        if let Some(state) =
1373            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1374        {
1375            let changeset = state
1376                .block_ref()
1377                .execution_output
1378                .state
1379                .reverts
1380                .clone()
1381                .to_plain_state_reverts()
1382                .storage
1383                .into_iter()
1384                .flatten()
1385                .find_map(|revert: PlainStorageRevert| {
1386                    if revert.address != address {
1387                        return None
1388                    }
1389                    revert.storage_revert.into_iter().find_map(|(key, value)| {
1390                        let tagged_key = StorageSlotKey::from_u256(key).to_changeset(use_hashed);
1391                        (tagged_key.as_b256() == storage_key).then(|| ChangesetEntry {
1392                            key: tagged_key,
1393                            value: value.to_previous_value(),
1394                        })
1395                    })
1396                });
1397            Ok(changeset)
1398        } else {
1399            let storage_history_exists = self
1400                .storage_provider
1401                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1402                .and_then(|checkpoint| {
1403                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1404                })
1405                .unwrap_or(true);
1406
1407            if !storage_history_exists {
1408                return Err(ProviderError::StateAtBlockPruned(block_number))
1409            }
1410
1411            self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1412        }
1413    }
1414
1415    fn storage_changesets_range(
1416        &self,
1417        range: impl RangeBounds<BlockNumber>,
1418    ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1419        let range = to_range(range);
1420        let mut changesets = Vec::new();
1421        let database_start = range.start;
1422        let mut database_end = range.end;
1423
1424        let use_hashed = self.storage_provider.cached_storage_settings().use_hashed_state();
1425
1426        if let Some(head_block) = &self.head_block {
1427            database_end = head_block.anchor().number;
1428
1429            let chain = head_block.chain().collect::<Vec<_>>();
1430            for state in chain {
1431                let block_changesets = state
1432                    .block_ref()
1433                    .execution_output
1434                    .state
1435                    .reverts
1436                    .clone()
1437                    .to_plain_state_reverts()
1438                    .storage
1439                    .into_iter()
1440                    .flatten()
1441                    .flat_map(|revert: PlainStorageRevert| {
1442                        revert.storage_revert.into_iter().map(move |(key, value)| {
1443                            let tagged_key =
1444                                StorageSlotKey::from_u256(key).to_changeset(use_hashed);
1445                            (
1446                                BlockNumberAddress((state.number(), revert.address)),
1447                                ChangesetEntry {
1448                                    key: tagged_key,
1449                                    value: value.to_previous_value(),
1450                                },
1451                            )
1452                        })
1453                    });
1454
1455                changesets.extend(block_changesets);
1456            }
1457        }
1458
1459        if database_start < database_end {
1460            let storage_history_exists = self
1461                .storage_provider
1462                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1463                .and_then(|checkpoint| {
1464                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1465                })
1466                .unwrap_or(true);
1467
1468            if !storage_history_exists {
1469                return Err(ProviderError::StateAtBlockPruned(database_start))
1470            }
1471
1472            let db_changesets = self
1473                .storage_provider
1474                .storage_changesets_range(database_start..=database_end - 1)?;
1475            changesets.extend(db_changesets);
1476        }
1477
1478        changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1479
1480        Ok(changesets)
1481    }
1482
1483    fn storage_changeset_count(&self) -> ProviderResult<usize> {
1484        let mut count = 0;
1485        if let Some(head_block) = &self.head_block {
1486            for state in head_block.chain() {
1487                count += state
1488                    .block_ref()
1489                    .execution_output
1490                    .state
1491                    .reverts
1492                    .clone()
1493                    .to_plain_state_reverts()
1494                    .storage
1495                    .into_iter()
1496                    .flatten()
1497                    .map(|revert: PlainStorageRevert| revert.storage_revert.len())
1498                    .sum::<usize>();
1499            }
1500        }
1501
1502        count += self.storage_provider.storage_changeset_count()?;
1503
1504        Ok(count)
1505    }
1506}
1507
1508impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1509    fn account_block_changeset(
1510        &self,
1511        block_number: BlockNumber,
1512    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1513        if let Some(state) =
1514            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1515        {
1516            let changesets = state
1517                .block_ref()
1518                .execution_output
1519                .state
1520                .reverts
1521                .clone()
1522                .to_plain_state_reverts()
1523                .accounts
1524                .into_iter()
1525                .flatten()
1526                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1527                .collect();
1528            Ok(changesets)
1529        } else {
1530            // Perform checks on whether or not changesets exist for the block.
1531
1532            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1533            let account_history_exists = self
1534                .storage_provider
1535                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1536                .and_then(|checkpoint| {
1537                    // return true if the block number is ahead of the prune checkpoint.
1538                    //
1539                    // The checkpoint stores the highest pruned block number, so we should make
1540                    // sure the block_number is strictly greater.
1541                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1542                })
1543                .unwrap_or(true);
1544
1545            if !account_history_exists {
1546                return Err(ProviderError::StateAtBlockPruned(block_number))
1547            }
1548
1549            self.storage_provider.account_block_changeset(block_number)
1550        }
1551    }
1552
1553    fn get_account_before_block(
1554        &self,
1555        block_number: BlockNumber,
1556        address: Address,
1557    ) -> ProviderResult<Option<AccountBeforeTx>> {
1558        if let Some(state) =
1559            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1560        {
1561            // Search in-memory state for the account changeset
1562            let changeset = state
1563                .block_ref()
1564                .execution_output
1565                .state
1566                .reverts
1567                .clone()
1568                .to_plain_state_reverts()
1569                .accounts
1570                .into_iter()
1571                .flatten()
1572                .find(|(addr, _)| addr == &address)
1573                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1574            Ok(changeset)
1575        } else {
1576            // Perform checks on whether or not changesets exist for the block.
1577            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1578            let account_history_exists = self
1579                .storage_provider
1580                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1581                .and_then(|checkpoint| {
1582                    // return true if the block number is ahead of the prune checkpoint.
1583                    //
1584                    // The checkpoint stores the highest pruned block number, so we should make
1585                    // sure the block_number is strictly greater.
1586                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1587                })
1588                .unwrap_or(true);
1589
1590            if !account_history_exists {
1591                return Err(ProviderError::StateAtBlockPruned(block_number))
1592            }
1593
1594            // Delegate to the storage provider for database lookups
1595            self.storage_provider.get_account_before_block(block_number, address)
1596        }
1597    }
1598
1599    fn account_changesets_range(
1600        &self,
1601        range: impl core::ops::RangeBounds<BlockNumber>,
1602    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1603        let range = to_range(range);
1604        let mut changesets = Vec::new();
1605        let database_start = range.start;
1606        let mut database_end = range.end;
1607
1608        // Check which blocks in the range are in memory
1609        if let Some(head_block) = &self.head_block {
1610            // the anchor is the end of the db range
1611            database_end = head_block.anchor().number;
1612
1613            let chain = head_block.chain().collect::<Vec<_>>();
1614            for state in chain {
1615                // found block in memory, collect its changesets
1616                let block_changesets = state
1617                    .block_ref()
1618                    .execution_output
1619                    .state
1620                    .reverts
1621                    .clone()
1622                    .to_plain_state_reverts()
1623                    .accounts
1624                    .into_iter()
1625                    .flatten()
1626                    .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1627
1628                for changeset in block_changesets {
1629                    changesets.push((state.number(), changeset));
1630                }
1631            }
1632        }
1633
1634        // get changesets from database for remaining blocks
1635        if database_start < database_end {
1636            // check if account history is pruned for these blocks
1637            let account_history_exists = self
1638                .storage_provider
1639                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1640                .and_then(|checkpoint| {
1641                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1642                })
1643                .unwrap_or(true);
1644
1645            if !account_history_exists {
1646                return Err(ProviderError::StateAtBlockPruned(database_start))
1647            }
1648
1649            let db_changesets =
1650                self.storage_provider.account_changesets_range(database_start..database_end)?;
1651            changesets.extend(db_changesets);
1652        }
1653
1654        changesets.sort_by_key(|(block_num, _)| *block_num);
1655
1656        Ok(changesets)
1657    }
1658
1659    fn account_changeset_count(&self) -> ProviderResult<usize> {
1660        // Count changesets from in-memory state
1661        let mut count = 0;
1662        if let Some(head_block) = &self.head_block {
1663            for state in head_block.chain() {
1664                count += state
1665                    .block_ref()
1666                    .execution_output
1667                    .state
1668                    .reverts
1669                    .clone()
1670                    .to_plain_state_reverts()
1671                    .accounts
1672                    .len();
1673            }
1674        }
1675
1676        // Add changesets from storage provider
1677        count += self.storage_provider.account_changeset_count()?;
1678
1679        Ok(count)
1680    }
1681}
1682
1683impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1684    /// Get basic account information.
1685    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1686        // use latest state provider
1687        let state_provider = self.latest_ref()?;
1688        state_provider.basic_account(address)
1689    }
1690}
1691
1692impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1693    type Receipt = ReceiptTy<N>;
1694
1695    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1696    ///
1697    /// If data for the block does not exist, this will return [`None`].
1698    ///
1699    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1700    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1701    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1702    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1703    /// first place.
1704    fn get_state(
1705        &self,
1706        block: BlockNumber,
1707    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1708        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1709            let state = state.block_ref().execution_outcome().clone();
1710            Ok(Some(ExecutionOutcome::from((state, block))))
1711        } else {
1712            Self::get_state(self, block..=block)
1713        }
1714    }
1715}
1716
1717#[cfg(test)]
1718mod tests {
1719    use crate::{
1720        providers::blockchain_provider::BlockchainProvider,
1721        test_utils::create_test_provider_factory, BlockWriter,
1722    };
1723    use alloy_eips::BlockHashOrNumber;
1724    use alloy_primitives::B256;
1725    use itertools::Itertools;
1726    use rand::Rng;
1727    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1728    use reth_db_api::models::AccountBeforeTx;
1729    use reth_ethereum_primitives::Block;
1730    use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1731    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1732    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1733    use reth_testing_utils::generators::{
1734        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1735    };
1736    use revm_database::BundleState;
1737    use std::{
1738        ops::{Bound, Range, RangeBounds},
1739        sync::Arc,
1740    };
1741
1742    const TEST_BLOCKS_COUNT: usize = 5;
1743
1744    fn random_blocks(
1745        rng: &mut impl Rng,
1746        database_blocks: usize,
1747        in_memory_blocks: usize,
1748        requests_count: Option<Range<u8>>,
1749        withdrawals_count: Option<Range<u8>>,
1750        tx_count: impl RangeBounds<u8>,
1751    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1752        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1753
1754        let tx_start = match tx_count.start_bound() {
1755            Bound::Included(&n) | Bound::Excluded(&n) => n,
1756            Bound::Unbounded => u8::MIN,
1757        };
1758        let tx_end = match tx_count.end_bound() {
1759            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1760            Bound::Unbounded => u8::MAX,
1761        };
1762
1763        let blocks = random_block_range(
1764            rng,
1765            0..=block_range,
1766            BlockRangeParams {
1767                parent: Some(B256::ZERO),
1768                tx_count: tx_start..tx_end,
1769                requests_count,
1770                withdrawals_count,
1771            },
1772        );
1773        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1774        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1775    }
1776
1777    #[test]
1778    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1779        // Initialize random number generator and provider factory
1780        let mut rng = generators::rng();
1781        let factory = create_test_provider_factory();
1782
1783        // Generate 10 random blocks and split into database and in-memory blocks
1784        let blocks = random_block_range(
1785            &mut rng,
1786            0..=10,
1787            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1788        );
1789        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1790
1791        // Insert first 5 blocks into the database
1792        let provider_rw = factory.provider_rw()?;
1793        for block in database_blocks {
1794            provider_rw.insert_block(
1795                &block.clone().try_recover().expect("failed to seal block with senders"),
1796            )?;
1797        }
1798        provider_rw.commit()?;
1799
1800        // Create a new provider
1801        let provider = BlockchainProvider::new(factory)?;
1802        let consistent_provider = provider.consistent_provider()?;
1803
1804        // Useful blocks
1805        let first_db_block = database_blocks.first().unwrap();
1806        let first_in_mem_block = in_memory_blocks.first().unwrap();
1807        let last_in_mem_block = in_memory_blocks.last().unwrap();
1808
1809        // No block in memory before setting in memory state
1810        assert_eq!(
1811            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1812            None
1813        );
1814        assert_eq!(
1815            consistent_provider
1816                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1817            None
1818        );
1819        // No pending block in memory
1820        assert_eq!(
1821            consistent_provider
1822                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1823            None
1824        );
1825
1826        // Insert first block into the in-memory state
1827        let in_memory_block_senders =
1828            first_in_mem_block.senders().expect("failed to recover senders");
1829        let chain = NewCanonicalChain::Commit {
1830            new: vec![ExecutedBlock {
1831                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1832                    first_in_mem_block.clone(),
1833                    in_memory_block_senders,
1834                )),
1835                ..Default::default()
1836            }],
1837        };
1838        consistent_provider.canonical_in_memory_state.update_chain(chain);
1839        let consistent_provider = provider.consistent_provider()?;
1840
1841        // Now the block should be found in memory
1842        assert_eq!(
1843            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1844            Some(first_in_mem_block.clone().into_block())
1845        );
1846        assert_eq!(
1847            consistent_provider
1848                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1849            Some(first_in_mem_block.clone().into_block())
1850        );
1851
1852        // Find the first block in database by hash
1853        assert_eq!(
1854            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1855            Some(first_db_block.clone().into_block())
1856        );
1857        assert_eq!(
1858            consistent_provider
1859                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1860            Some(first_db_block.clone().into_block())
1861        );
1862
1863        // No pending block in database
1864        assert_eq!(
1865            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1866            None
1867        );
1868
1869        // Insert the last block into the pending state
1870        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1871            recovered_block: Arc::new(RecoveredBlock::new_sealed(
1872                last_in_mem_block.clone(),
1873                Default::default(),
1874            )),
1875            ..Default::default()
1876        });
1877
1878        // Now the last block should be found in memory
1879        assert_eq!(
1880            consistent_provider
1881                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1882            Some(last_in_mem_block.clone_block())
1883        );
1884
1885        Ok(())
1886    }
1887
1888    #[test]
1889    fn test_block_reader_block() -> eyre::Result<()> {
1890        // Initialize random number generator and provider factory
1891        let mut rng = generators::rng();
1892        let factory = create_test_provider_factory();
1893
1894        // Generate 10 random blocks and split into database and in-memory blocks
1895        let blocks = random_block_range(
1896            &mut rng,
1897            0..=10,
1898            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1899        );
1900        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1901
1902        // Insert first 5 blocks into the database
1903        let provider_rw = factory.provider_rw()?;
1904        for block in database_blocks {
1905            provider_rw.insert_block(
1906                &block.clone().try_recover().expect("failed to seal block with senders"),
1907            )?;
1908        }
1909        provider_rw.commit()?;
1910
1911        // Create a new provider
1912        let provider = BlockchainProvider::new(factory)?;
1913        let consistent_provider = provider.consistent_provider()?;
1914
1915        // First in memory block
1916        let first_in_mem_block = in_memory_blocks.first().unwrap();
1917        // First database block
1918        let first_db_block = database_blocks.first().unwrap();
1919
1920        // First in memory block should not be found yet as not integrated to the in-memory state
1921        assert_eq!(
1922            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1923            None
1924        );
1925        assert_eq!(
1926            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1927            None
1928        );
1929
1930        // Insert first block into the in-memory state
1931        let in_memory_block_senders =
1932            first_in_mem_block.senders().expect("failed to recover senders");
1933        let chain = NewCanonicalChain::Commit {
1934            new: vec![ExecutedBlock {
1935                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1936                    first_in_mem_block.clone(),
1937                    in_memory_block_senders,
1938                )),
1939                ..Default::default()
1940            }],
1941        };
1942        consistent_provider.canonical_in_memory_state.update_chain(chain);
1943
1944        let consistent_provider = provider.consistent_provider()?;
1945
1946        // First in memory block should be found
1947        assert_eq!(
1948            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1949            Some(first_in_mem_block.clone().into_block())
1950        );
1951        assert_eq!(
1952            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1953            Some(first_in_mem_block.clone().into_block())
1954        );
1955
1956        // First database block should be found
1957        assert_eq!(
1958            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1959            Some(first_db_block.clone().into_block())
1960        );
1961        assert_eq!(
1962            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1963            Some(first_db_block.clone().into_block())
1964        );
1965
1966        Ok(())
1967    }
1968
1969    #[test]
1970    fn test_changeset_reader() -> eyre::Result<()> {
1971        let mut rng = generators::rng();
1972
1973        let (database_blocks, in_memory_blocks) =
1974            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1975
1976        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1977        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1978        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1979
1980        let accounts = random_eoa_accounts(&mut rng, 2);
1981
1982        let (database_changesets, database_state) = random_changeset_range(
1983            &mut rng,
1984            &database_blocks,
1985            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1986            0..0,
1987            0..0,
1988        );
1989        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1990            &mut rng,
1991            &in_memory_blocks,
1992            database_state
1993                .iter()
1994                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1995            0..0,
1996            0..0,
1997        );
1998
1999        let factory = create_test_provider_factory();
2000
2001        let provider_rw = factory.provider_rw()?;
2002        provider_rw.append_blocks_with_state(
2003            database_blocks
2004                .into_iter()
2005                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2006                .collect(),
2007            &ExecutionOutcome {
2008                bundle: BundleState::new(
2009                    database_state.into_iter().map(|(address, (account, _))| {
2010                        (address, None, Some(account.into()), Default::default())
2011                    }),
2012                    database_changesets.iter().map(|block_changesets| {
2013                        block_changesets.iter().map(|(address, account, _)| {
2014                            (*address, Some(Some((*account).into())), [])
2015                        })
2016                    }),
2017                    Vec::new(),
2018                ),
2019                first_block: first_database_block,
2020                ..Default::default()
2021            },
2022            Default::default(),
2023        )?;
2024        provider_rw.commit()?;
2025
2026        let provider = BlockchainProvider::new(factory)?;
2027
2028        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
2029        let chain = NewCanonicalChain::Commit {
2030            new: vec![in_memory_blocks
2031                .first()
2032                .map(|block| {
2033                    let senders = block.senders().expect("failed to recover senders");
2034                    ExecutedBlock {
2035                        recovered_block: Arc::new(RecoveredBlock::new_sealed(
2036                            block.clone(),
2037                            senders,
2038                        )),
2039                        execution_output: Arc::new(BlockExecutionOutput {
2040                            state: BundleState::new(
2041                                in_memory_state.into_iter().map(|(address, (account, _))| {
2042                                    (address, None, Some(account.into()), Default::default())
2043                                }),
2044                                [in_memory_changesets.iter().map(|(address, account, _)| {
2045                                    (*address, Some(Some((*account).into())), Vec::new())
2046                                })],
2047                                [],
2048                            ),
2049                            result: BlockExecutionResult {
2050                                receipts: Default::default(),
2051                                requests: Default::default(),
2052                                gas_used: 0,
2053                                blob_gas_used: 0,
2054                            },
2055                        }),
2056                        ..Default::default()
2057                    }
2058                })
2059                .unwrap()],
2060        };
2061        provider.canonical_in_memory_state.update_chain(chain);
2062
2063        let consistent_provider = provider.consistent_provider()?;
2064
2065        assert_eq!(
2066            consistent_provider.account_block_changeset(last_database_block).unwrap(),
2067            database_changesets
2068                .into_iter()
2069                .next_back()
2070                .unwrap()
2071                .into_iter()
2072                .sorted_by_key(|(address, _, _)| *address)
2073                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2074                .collect::<Vec<_>>()
2075        );
2076        assert_eq!(
2077            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2078            in_memory_changesets
2079                .into_iter()
2080                .sorted_by_key(|(address, _, _)| *address)
2081                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2082                .collect::<Vec<_>>()
2083        );
2084
2085        Ok(())
2086    }
2087
2088    #[test]
2089    fn test_get_state_storage_value_hashed_state() -> eyre::Result<()> {
2090        use alloy_primitives::{keccak256, U256};
2091        use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2092        use reth_primitives_traits::StorageEntry;
2093        use reth_storage_api::StorageSettingsCache;
2094        use std::collections::HashMap;
2095
2096        let address = alloy_primitives::Address::with_last_byte(1);
2097        let account = reth_primitives_traits::Account {
2098            nonce: 1,
2099            balance: U256::from(1000),
2100            bytecode_hash: None,
2101        };
2102        let slot = U256::from(0x42);
2103        let slot_b256 = B256::from(slot);
2104        let hashed_address = keccak256(address);
2105        let hashed_slot = keccak256(slot_b256);
2106
2107        let mut rng = generators::rng();
2108        let factory = create_test_provider_factory();
2109        factory.set_storage_settings_cache(StorageSettings::v2());
2110
2111        let blocks = random_block_range(
2112            &mut rng,
2113            0..=1,
2114            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2115        );
2116
2117        let provider_rw = factory.provider_rw()?;
2118        provider_rw.append_blocks_with_state(
2119            blocks
2120                .into_iter()
2121                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2122                .collect(),
2123            &ExecutionOutcome {
2124                bundle: BundleState::new(
2125                    [(address, None, Some(account.into()), {
2126                        let mut s = HashMap::default();
2127                        s.insert(slot, (U256::ZERO, U256::from(100)));
2128                        s
2129                    })],
2130                    [
2131                        Vec::new(),
2132                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2133                    ],
2134                    [],
2135                ),
2136                first_block: 0,
2137                ..Default::default()
2138            },
2139            Default::default(),
2140        )?;
2141
2142        provider_rw.tx_ref().put::<tables::HashedStorages>(
2143            hashed_address,
2144            StorageEntry { key: hashed_slot, value: U256::from(100) },
2145        )?;
2146        provider_rw.tx_ref().put::<tables::HashedAccounts>(hashed_address, account)?;
2147
2148        provider_rw.commit()?;
2149
2150        let provider = BlockchainProvider::new(factory)?;
2151        let consistent_provider = provider.consistent_provider()?;
2152
2153        let outcome =
2154            consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2155
2156        let state = &outcome.bundle.state;
2157        let account_state = state.get(&address).expect("should have account in bundle state");
2158        let storage = &account_state.storage;
2159
2160        let slot_as_u256 = U256::from_be_bytes(*hashed_slot);
2161        let storage_slot = storage.get(&slot_as_u256).expect("should have the slot in storage");
2162
2163        assert_eq!(
2164            storage_slot.present_value,
2165            U256::from(100),
2166            "present_value should be 100 (the actual value in HashedStorages)"
2167        );
2168
2169        Ok(())
2170    }
2171
2172    #[test]
2173    #[cfg(all(unix, feature = "rocksdb"))]
2174    fn test_get_state_storage_value_hashed_state_historical() -> eyre::Result<()> {
2175        use alloy_primitives::{keccak256, U256};
2176        use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2177        use reth_primitives_traits::StorageEntry;
2178        use reth_storage_api::StorageSettingsCache;
2179        use std::collections::HashMap;
2180
2181        let address = alloy_primitives::Address::with_last_byte(1);
2182        let account = reth_primitives_traits::Account {
2183            nonce: 1,
2184            balance: U256::from(1000),
2185            bytecode_hash: None,
2186        };
2187        let slot = U256::from(0x42);
2188        let slot_b256 = B256::from(slot);
2189        let hashed_address = keccak256(address);
2190        let hashed_slot = keccak256(slot_b256);
2191
2192        let mut rng = generators::rng();
2193        let factory = create_test_provider_factory();
2194        factory.set_storage_settings_cache(StorageSettings::v2());
2195
2196        let blocks = random_block_range(
2197            &mut rng,
2198            0..=3,
2199            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2200        );
2201
2202        let provider_rw = factory.provider_rw()?;
2203        provider_rw.append_blocks_with_state(
2204            blocks
2205                .into_iter()
2206                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2207                .collect(),
2208            &ExecutionOutcome {
2209                bundle: BundleState::new(
2210                    [(address, None, Some(account.into()), {
2211                        let mut s = HashMap::default();
2212                        s.insert(slot, (U256::ZERO, U256::from(300)));
2213                        s
2214                    })],
2215                    [
2216                        Vec::new(),
2217                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2218                        vec![(address, Some(Some(account.into())), vec![(slot, U256::from(100))])],
2219                        vec![(address, Some(Some(account.into())), vec![(slot, U256::from(200))])],
2220                    ],
2221                    [],
2222                ),
2223                first_block: 0,
2224                ..Default::default()
2225            },
2226            Default::default(),
2227        )?;
2228
2229        provider_rw.tx_ref().put::<tables::HashedStorages>(
2230            hashed_address,
2231            StorageEntry { key: hashed_slot, value: U256::from(300) },
2232        )?;
2233        provider_rw.tx_ref().put::<tables::HashedAccounts>(hashed_address, account)?;
2234
2235        provider_rw.commit()?;
2236
2237        let provider = BlockchainProvider::new(factory)?;
2238        let consistent_provider = provider.consistent_provider()?;
2239
2240        let outcome =
2241            consistent_provider.get_state(1..=2)?.expect("should return execution outcome");
2242
2243        let state = &outcome.bundle.state;
2244        let account_state = state.get(&address).expect("should have account in bundle state");
2245        let storage = &account_state.storage;
2246
2247        let slot_as_u256 = U256::from_be_bytes(*hashed_slot);
2248        let storage_slot = storage.get(&slot_as_u256).expect("should have the slot in storage");
2249
2250        assert_eq!(
2251            storage_slot.present_value,
2252            U256::from(200),
2253            "present_value should be 200 (the value at block 2, not 300 which is the latest)"
2254        );
2255
2256        Ok(())
2257    }
2258
2259    #[test]
2260    fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
2261        use alloy_primitives::U256;
2262        use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2263        use reth_primitives_traits::StorageEntry;
2264        use reth_storage_api::StorageSettingsCache;
2265        use std::collections::HashMap;
2266
2267        let address = alloy_primitives::Address::with_last_byte(1);
2268        let account = reth_primitives_traits::Account {
2269            nonce: 1,
2270            balance: U256::from(1000),
2271            bytecode_hash: None,
2272        };
2273        let slot = U256::from(0x42);
2274        let slot_b256 = B256::from(slot);
2275
2276        let mut rng = generators::rng();
2277        let factory = create_test_provider_factory();
2278        factory.set_storage_settings_cache(StorageSettings::v1());
2279
2280        let blocks = random_block_range(
2281            &mut rng,
2282            0..=1,
2283            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2284        );
2285
2286        let provider_rw = factory.provider_rw()?;
2287        provider_rw.append_blocks_with_state(
2288            blocks
2289                .into_iter()
2290                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2291                .collect(),
2292            &ExecutionOutcome {
2293                bundle: BundleState::new(
2294                    [(address, None, Some(account.into()), {
2295                        let mut s = HashMap::default();
2296                        s.insert(slot, (U256::ZERO, U256::from(100)));
2297                        s
2298                    })],
2299                    [
2300                        Vec::new(),
2301                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2302                    ],
2303                    [],
2304                ),
2305                first_block: 0,
2306                ..Default::default()
2307            },
2308            Default::default(),
2309        )?;
2310
2311        provider_rw.tx_ref().put::<tables::PlainStorageState>(
2312            address,
2313            StorageEntry { key: slot_b256, value: U256::from(100) },
2314        )?;
2315        provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
2316
2317        provider_rw.commit()?;
2318
2319        let provider = BlockchainProvider::new(factory)?;
2320        let consistent_provider = provider.consistent_provider()?;
2321
2322        let outcome =
2323            consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2324
2325        let state = &outcome.bundle.state;
2326        let account_state = state.get(&address).expect("should have account in bundle state");
2327        let storage = &account_state.storage;
2328
2329        let storage_slot = storage.get(&slot).expect("should have the slot in storage");
2330
2331        assert_eq!(
2332            storage_slot.present_value,
2333            U256::from(100),
2334            "present_value should be 100 (the actual value in PlainStorageState)"
2335        );
2336
2337        Ok(())
2338    }
2339
2340    #[test]
2341    fn test_storage_changeset_consistent_keys_hashed_state() -> eyre::Result<()> {
2342        use alloy_primitives::{keccak256, U256};
2343        use reth_db_api::models::StorageSettings;
2344        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2345        use std::collections::HashMap;
2346
2347        let mut rng = generators::rng();
2348        let factory = create_test_provider_factory();
2349        factory.set_storage_settings_cache(StorageSettings::v2());
2350
2351        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2352
2353        let address = alloy_primitives::Address::with_last_byte(1);
2354        let account = reth_primitives_traits::Account {
2355            nonce: 1,
2356            balance: U256::from(1000),
2357            bytecode_hash: None,
2358        };
2359        let slot = U256::from(0x42);
2360
2361        let provider_rw = factory.provider_rw()?;
2362        provider_rw.append_blocks_with_state(
2363            database_blocks
2364                .into_iter()
2365                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2366                .collect(),
2367            &ExecutionOutcome {
2368                bundle: BundleState::new(
2369                    [(address, None, Some(account.into()), {
2370                        let mut s = HashMap::default();
2371                        s.insert(slot, (U256::ZERO, U256::from(100)));
2372                        s
2373                    })],
2374                    [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2375                    [],
2376                ),
2377                first_block: 0,
2378                ..Default::default()
2379            },
2380            Default::default(),
2381        )?;
2382        provider_rw.commit()?;
2383
2384        let provider = BlockchainProvider::new(factory)?;
2385
2386        let in_mem_block = in_memory_blocks.first().unwrap();
2387        let senders = in_mem_block.senders().expect("failed to recover senders");
2388        let chain = NewCanonicalChain::Commit {
2389            new: vec![ExecutedBlock {
2390                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2391                    in_mem_block.clone(),
2392                    senders,
2393                )),
2394                execution_output: Arc::new(BlockExecutionOutput {
2395                    state: BundleState::new(
2396                        [(address, None, Some(account.into()), {
2397                            let mut s = HashMap::default();
2398                            s.insert(slot, (U256::from(100), U256::from(200)));
2399                            s
2400                        })],
2401                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2402                        [],
2403                    ),
2404                    result: BlockExecutionResult {
2405                        receipts: Default::default(),
2406                        requests: Default::default(),
2407                        gas_used: 0,
2408                        blob_gas_used: 0,
2409                    },
2410                }),
2411                ..Default::default()
2412            }],
2413        };
2414        provider.canonical_in_memory_state.update_chain(chain);
2415
2416        let consistent_provider = provider.consistent_provider()?;
2417
2418        let db_changeset = consistent_provider.storage_changeset(0)?;
2419        let mem_changeset = consistent_provider.storage_changeset(1)?;
2420
2421        let slot_b256 = B256::from(slot);
2422        let _hashed_slot_b256 = keccak256(slot_b256);
2423
2424        assert_eq!(db_changeset.len(), 1);
2425        assert_eq!(mem_changeset.len(), 1);
2426
2427        let db_key = db_changeset[0].1.key;
2428        let mem_key = mem_changeset[0].1.key;
2429
2430        assert_eq!(
2431            db_key, mem_key,
2432            "DB and in-memory changesets should return the same key format (hashed) for the same logical slot"
2433        );
2434
2435        Ok(())
2436    }
2437
2438    #[test]
2439    fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
2440        use alloy_primitives::U256;
2441        use reth_db_api::models::StorageSettings;
2442        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2443        use std::collections::HashMap;
2444
2445        let mut rng = generators::rng();
2446        let factory = create_test_provider_factory();
2447        factory.set_storage_settings_cache(StorageSettings::v1());
2448
2449        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2450
2451        let address = alloy_primitives::Address::with_last_byte(1);
2452        let account = reth_primitives_traits::Account {
2453            nonce: 1,
2454            balance: U256::from(1000),
2455            bytecode_hash: None,
2456        };
2457        let slot = U256::from(0x42);
2458
2459        let provider_rw = factory.provider_rw()?;
2460        provider_rw.append_blocks_with_state(
2461            database_blocks
2462                .into_iter()
2463                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2464                .collect(),
2465            &ExecutionOutcome {
2466                bundle: BundleState::new(
2467                    [(address, None, Some(account.into()), {
2468                        let mut s = HashMap::default();
2469                        s.insert(slot, (U256::ZERO, U256::from(100)));
2470                        s
2471                    })],
2472                    [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2473                    [],
2474                ),
2475                first_block: 0,
2476                ..Default::default()
2477            },
2478            Default::default(),
2479        )?;
2480        provider_rw.commit()?;
2481
2482        let provider = BlockchainProvider::new(factory)?;
2483
2484        let in_mem_block = in_memory_blocks.first().unwrap();
2485        let senders = in_mem_block.senders().expect("failed to recover senders");
2486        let chain = NewCanonicalChain::Commit {
2487            new: vec![ExecutedBlock {
2488                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2489                    in_mem_block.clone(),
2490                    senders,
2491                )),
2492                execution_output: Arc::new(BlockExecutionOutput {
2493                    state: BundleState::new(
2494                        [(address, None, Some(account.into()), {
2495                            let mut s = HashMap::default();
2496                            s.insert(slot, (U256::from(100), U256::from(200)));
2497                            s
2498                        })],
2499                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2500                        [],
2501                    ),
2502                    result: BlockExecutionResult {
2503                        receipts: Default::default(),
2504                        requests: Default::default(),
2505                        gas_used: 0,
2506                        blob_gas_used: 0,
2507                    },
2508                }),
2509                ..Default::default()
2510            }],
2511        };
2512        provider.canonical_in_memory_state.update_chain(chain);
2513
2514        let consistent_provider = provider.consistent_provider()?;
2515
2516        let db_changeset = consistent_provider.storage_changeset(0)?;
2517        let mem_changeset = consistent_provider.storage_changeset(1)?;
2518
2519        let slot_b256 = B256::from(slot);
2520
2521        assert_eq!(db_changeset.len(), 1);
2522        assert_eq!(mem_changeset.len(), 1);
2523
2524        let db_key = db_changeset[0].1.key.as_b256();
2525        let mem_key = mem_changeset[0].1.key.as_b256();
2526
2527        assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2528        assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2529        assert_eq!(
2530            db_key, mem_key,
2531            "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2532        );
2533
2534        Ok(())
2535    }
2536
2537    #[test]
2538    fn test_storage_changesets_range_consistent_keys_hashed_state() -> eyre::Result<()> {
2539        use alloy_primitives::U256;
2540        use reth_db_api::models::StorageSettings;
2541        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2542        use std::collections::HashMap;
2543
2544        let mut rng = generators::rng();
2545        let factory = create_test_provider_factory();
2546        factory.set_storage_settings_cache(StorageSettings::v2());
2547
2548        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2549
2550        let address = alloy_primitives::Address::with_last_byte(1);
2551        let account = reth_primitives_traits::Account {
2552            nonce: 1,
2553            balance: U256::from(1000),
2554            bytecode_hash: None,
2555        };
2556        let slot = U256::from(0x42);
2557
2558        let provider_rw = factory.provider_rw()?;
2559        provider_rw.append_blocks_with_state(
2560            database_blocks
2561                .into_iter()
2562                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2563                .collect(),
2564            &ExecutionOutcome {
2565                bundle: BundleState::new(
2566                    [(address, None, Some(account.into()), {
2567                        let mut s = HashMap::default();
2568                        s.insert(slot, (U256::ZERO, U256::from(100)));
2569                        s
2570                    })],
2571                    vec![
2572                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2573                        vec![],
2574                    ],
2575                    [],
2576                ),
2577                first_block: 0,
2578                ..Default::default()
2579            },
2580            Default::default(),
2581        )?;
2582        provider_rw.commit()?;
2583
2584        let provider = BlockchainProvider::new(factory)?;
2585
2586        let in_mem_block = in_memory_blocks.first().unwrap();
2587        let senders = in_mem_block.senders().expect("failed to recover senders");
2588        let chain = NewCanonicalChain::Commit {
2589            new: vec![ExecutedBlock {
2590                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2591                    in_mem_block.clone(),
2592                    senders,
2593                )),
2594                execution_output: Arc::new(BlockExecutionOutput {
2595                    state: BundleState::new(
2596                        [(address, None, Some(account.into()), {
2597                            let mut s = HashMap::default();
2598                            s.insert(slot, (U256::from(100), U256::from(200)));
2599                            s
2600                        })],
2601                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2602                        [],
2603                    ),
2604                    result: BlockExecutionResult {
2605                        receipts: Default::default(),
2606                        requests: Default::default(),
2607                        gas_used: 0,
2608                        blob_gas_used: 0,
2609                    },
2610                }),
2611                ..Default::default()
2612            }],
2613        };
2614        provider.canonical_in_memory_state.update_chain(chain);
2615
2616        let consistent_provider = provider.consistent_provider()?;
2617
2618        let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2619
2620        assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2621
2622        let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key.as_b256()).collect();
2623
2624        assert_eq!(
2625            keys[0], keys[1],
2626            "same logical slot should produce identical keys whether from DB or memory"
2627        );
2628
2629        Ok(())
2630    }
2631
2632    #[test]
2633    fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2634        use alloy_primitives::U256;
2635        use reth_db_api::models::StorageSettings;
2636        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2637        use std::collections::HashMap;
2638
2639        let mut rng = generators::rng();
2640        let factory = create_test_provider_factory();
2641        factory.set_storage_settings_cache(StorageSettings::v1());
2642
2643        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2644
2645        let address = alloy_primitives::Address::with_last_byte(1);
2646        let account = reth_primitives_traits::Account {
2647            nonce: 1,
2648            balance: U256::from(1000),
2649            bytecode_hash: None,
2650        };
2651        let slot = U256::from(0x42);
2652
2653        let provider_rw = factory.provider_rw()?;
2654        provider_rw.append_blocks_with_state(
2655            database_blocks
2656                .into_iter()
2657                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2658                .collect(),
2659            &ExecutionOutcome {
2660                bundle: BundleState::new(
2661                    [(address, None, Some(account.into()), {
2662                        let mut s = HashMap::default();
2663                        s.insert(slot, (U256::ZERO, U256::from(100)));
2664                        s
2665                    })],
2666                    vec![
2667                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2668                        vec![],
2669                    ],
2670                    [],
2671                ),
2672                first_block: 0,
2673                ..Default::default()
2674            },
2675            Default::default(),
2676        )?;
2677        provider_rw.commit()?;
2678
2679        let provider = BlockchainProvider::new(factory)?;
2680
2681        let in_mem_block = in_memory_blocks.first().unwrap();
2682        let senders = in_mem_block.senders().expect("failed to recover senders");
2683        let chain = NewCanonicalChain::Commit {
2684            new: vec![ExecutedBlock {
2685                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2686                    in_mem_block.clone(),
2687                    senders,
2688                )),
2689                execution_output: Arc::new(BlockExecutionOutput {
2690                    state: BundleState::new(
2691                        [(address, None, Some(account.into()), {
2692                            let mut s = HashMap::default();
2693                            s.insert(slot, (U256::from(100), U256::from(200)));
2694                            s
2695                        })],
2696                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2697                        [],
2698                    ),
2699                    result: BlockExecutionResult {
2700                        receipts: Default::default(),
2701                        requests: Default::default(),
2702                        gas_used: 0,
2703                        blob_gas_used: 0,
2704                    },
2705                }),
2706                ..Default::default()
2707            }],
2708        };
2709        provider.canonical_in_memory_state.update_chain(chain);
2710
2711        let consistent_provider = provider.consistent_provider()?;
2712
2713        let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2714
2715        assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2716
2717        let slot_b256 = B256::from(slot);
2718        let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key.as_b256()).collect();
2719
2720        assert_eq!(
2721            keys[0], keys[1],
2722            "same logical slot should produce identical keys whether from DB or memory"
2723        );
2724        assert_eq!(
2725            keys[0], slot_b256,
2726            "keys should be plain/unhashed when use_hashed_state is false"
2727        );
2728
2729        Ok(())
2730    }
2731}