Skip to main content

reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4    to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5    BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8    TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12    eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13    HashOrNumber,
14};
15use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
16use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
17use reth_chainspec::ChainInfo;
18use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
19use reth_execution_types::ExecutionOutcome;
20use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
21use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
22use reth_prune_types::{PruneCheckpoint, PruneSegment};
23use reth_stages_types::{StageCheckpoint, StageId};
24use reth_static_file_types::StaticFileSegment;
25use reth_storage_api::{
26    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
27    StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
28};
29use reth_storage_errors::provider::ProviderResult;
30use revm_database::states::PlainStorageRevert;
31use std::{
32    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
33    sync::Arc,
34};
35use tracing::trace;
36
37/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
38/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
39/// this provider.
40///
41/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
42/// time-out.
43#[derive(Debug)]
44#[doc(hidden)] // triggers ICE for `cargo docs`
45pub struct ConsistentProvider<N: ProviderNodeTypes> {
46    /// Storage provider.
47    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
48    /// Head block at time of [`Self`] creation
49    head_block: Option<Arc<BlockState<N::Primitives>>>,
50    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
51    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
52}
53
54impl<N: ProviderNodeTypes> ConsistentProvider<N> {
55    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
56    ///
57    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
58    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
59    /// view of memory and database.
60    pub fn new(
61        storage_provider_factory: ProviderFactory<N>,
62        state: CanonicalInMemoryState<N::Primitives>,
63    ) -> ProviderResult<Self> {
64        // Each one provides a snapshot at the time of instantiation, but its order matters.
65        //
66        // If we acquire first the database provider, it's possible that before the in-memory chain
67        // snapshot is instantiated, it will flush blocks to disk. This would
68        // mean that our database provider would not have access to the flushed blocks (since it's
69        // working under an older view), while the in-memory state may have deleted them
70        // entirely. Resulting in gaps on the range.
71        let head_block = state.head_state();
72        let storage_provider = storage_provider_factory.database_provider_ro()?;
73        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
74    }
75
76    // Helper function to convert range bounds
77    fn convert_range_bounds<T>(
78        &self,
79        range: impl RangeBounds<T>,
80        end_unbounded: impl FnOnce() -> T,
81    ) -> (T, T)
82    where
83        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
84    {
85        let start = match range.start_bound() {
86            Bound::Included(&n) => n,
87            Bound::Excluded(&n) => n + T::from(1u8),
88            Bound::Unbounded => T::from(0u8),
89        };
90
91        let end = match range.end_bound() {
92            Bound::Included(&n) => n,
93            Bound::Excluded(&n) => n - T::from(1u8),
94            Bound::Unbounded => end_unbounded(),
95        };
96
97        (start, end)
98    }
99
100    /// Storage provider for latest block
101    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
102        trace!(target: "providers::blockchain", "Getting latest block state provider");
103
104        // use latest state provider if the head state exists
105        if let Some(state) = &self.head_block {
106            trace!(target: "providers::blockchain", "Using head state for latest state provider");
107            Ok(self.block_state_provider_ref(state)?.boxed())
108        } else {
109            trace!(target: "providers::blockchain", "Using database state for latest state provider");
110            Ok(self.storage_provider.latest())
111        }
112    }
113
114    fn history_by_block_hash_ref<'a>(
115        &'a self,
116        block_hash: BlockHash,
117    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
118        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
119
120        self.get_in_memory_or_storage_by_block(
121            block_hash.into(),
122            |_| self.storage_provider.history_by_block_hash(block_hash),
123            |block_state| {
124                let state_provider = self.block_state_provider_ref(block_state)?;
125                Ok(Box::new(state_provider))
126            },
127        )
128    }
129
130    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
131    /// is met.
132    ///
133    /// Creates a snapshot of the in-memory chain state and database provider to prevent
134    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
135    /// recent in-memory blocks in case of overlaps.
136    ///
137    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
138    ///   user to retrieve the required items from the database using [`RangeInclusive`].
139    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
140    ///   state, allowing for selection or filtering for the desired data.
141    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
142        &self,
143        range: impl RangeBounds<BlockNumber>,
144        fetch_db_range: F,
145        map_block_state_item: G,
146        mut predicate: P,
147    ) -> ProviderResult<Vec<T>>
148    where
149        F: FnOnce(
150            &DatabaseProviderRO<N::DB, N>,
151            RangeInclusive<BlockNumber>,
152            &mut P,
153        ) -> ProviderResult<Vec<T>>,
154        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
155        P: FnMut(&T) -> bool,
156    {
157        // Each one provides a snapshot at the time of instantiation, but its order matters.
158        //
159        // If we acquire first the database provider, it's possible that before the in-memory chain
160        // snapshot is instantiated, it will flush blocks to disk. This would
161        // mean that our database provider would not have access to the flushed blocks (since it's
162        // working under an older view), while the in-memory state may have deleted them
163        // entirely. Resulting in gaps on the range.
164        let mut in_memory_chain =
165            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
166        let db_provider = &self.storage_provider;
167
168        let (start, end) = self.convert_range_bounds(range, || {
169            // the first block is the highest one.
170            in_memory_chain
171                .first()
172                .map(|b| b.number())
173                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
174        });
175
176        if start > end {
177            return Ok(vec![])
178        }
179
180        // Split range into storage_range and in-memory range. If the in-memory range is not
181        // necessary drop it early.
182        //
183        // The last block of `in_memory_chain` is the lowest block number.
184        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
185            Some(lowest_memory_block) if lowest_memory_block <= end => {
186                let highest_memory_block =
187                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
188
189                // Database will for a time overlap with in-memory-chain blocks. In
190                // case of a re-org, it can mean that the database blocks are of a forked chain, and
191                // so, we should prioritize the in-memory overlapped blocks.
192                let in_memory_range =
193                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
194
195                // If requested range is in the middle of the in-memory range, remove the necessary
196                // lowest blocks
197                in_memory_chain.truncate(
198                    in_memory_chain
199                        .len()
200                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
201                );
202
203                let storage_range =
204                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
205
206                (Some((in_memory_chain, in_memory_range)), storage_range)
207            }
208            _ => {
209                // Drop the in-memory chain so we don't hold blocks in memory.
210                drop(in_memory_chain);
211
212                (None, Some(start..=end))
213            }
214        };
215
216        let mut items = Vec::with_capacity((end - start + 1) as usize);
217
218        if let Some(storage_range) = storage_range {
219            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
220            items.append(&mut db_items);
221
222            // The predicate was not met, if the number of items differs from the expected. So, we
223            // return what we have.
224            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
225                return Ok(items)
226            }
227        }
228
229        if let Some((in_memory_chain, in_memory_range)) = in_memory {
230            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
231                debug_assert!(num == block.number());
232                if let Some(item) = map_block_state_item(block, &mut predicate) {
233                    items.push(item);
234                } else {
235                    break
236                }
237            }
238        }
239
240        Ok(items)
241    }
242
243    /// This uses a given [`BlockState`] to initialize a state provider for that block.
244    fn block_state_provider_ref(
245        &self,
246        state: &BlockState<N::Primitives>,
247    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
248        let anchor_hash = state.anchor().hash;
249        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
250        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
251        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
252    }
253
254    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
255    ///
256    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
257    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
258    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
259    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
260        &self,
261        range: impl RangeBounds<BlockNumber>,
262        fetch_from_db: S,
263        fetch_from_block_state: M,
264    ) -> ProviderResult<Vec<R>>
265    where
266        S: FnOnce(
267            &DatabaseProviderRO<N::DB, N>,
268            RangeInclusive<TxNumber>,
269        ) -> ProviderResult<Vec<R>>,
270        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
271    {
272        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
273        let provider = &self.storage_provider;
274
275        // Get the last block number stored in the storage which does NOT overlap with in-memory
276        // chain.
277        let last_database_block_number = in_mem_chain
278            .last()
279            .map(|b| Ok(b.anchor().number))
280            .unwrap_or_else(|| provider.last_block_number())?;
281
282        // Get the next tx number for the last block stored in the storage, which marks the start of
283        // the in-memory state.
284        let last_block_body_index = provider
285            .block_body_indices(last_database_block_number)?
286            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
287        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
288
289        let (start, end) = self.convert_range_bounds(range, || {
290            in_mem_chain
291                .iter()
292                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
293                .sum::<u64>() +
294                last_block_body_index.last_tx_num()
295        });
296
297        if start > end {
298            return Ok(vec![])
299        }
300
301        let mut tx_range = start..=end;
302
303        // If the range is entirely before the first in-memory transaction number, fetch from
304        // storage
305        if *tx_range.end() < in_memory_tx_num {
306            return fetch_from_db(provider, tx_range);
307        }
308
309        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
310
311        // If the range spans storage and memory, get elements from storage first.
312        if *tx_range.start() < in_memory_tx_num {
313            // Determine the range that needs to be fetched from storage.
314            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
315
316            // Set the remaining transaction range for in-memory
317            tx_range = in_memory_tx_num..=*tx_range.end();
318
319            items.extend(fetch_from_db(provider, db_range)?);
320        }
321
322        // Iterate from the lowest block to the highest in-memory chain
323        for block_state in in_mem_chain.iter().rev() {
324            let block_tx_count =
325                block_state.block_ref().recovered_block().body().transactions().len();
326            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
327
328            // If the transaction range start is equal or higher than the next block first
329            // transaction, advance
330            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
331                in_memory_tx_num += block_tx_count as u64;
332                continue
333            }
334
335            // This should only be more than 0 once, in case of a partial range inside a block.
336            let skip = (tx_range.start() - in_memory_tx_num) as usize;
337
338            items.extend(fetch_from_block_state(
339                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
340                block_state,
341            )?);
342
343            in_memory_tx_num += block_tx_count as u64;
344
345            // Break if the range has been fully processed
346            if in_memory_tx_num > *tx_range.end() {
347                break
348            }
349
350            // Set updated range
351            tx_range = in_memory_tx_num..=*tx_range.end();
352        }
353
354        Ok(items)
355    }
356
357    /// Fetches data from either in-memory state or persistent storage by transaction
358    /// [`HashOrNumber`].
359    fn get_in_memory_or_storage_by_tx<S, M, R>(
360        &self,
361        id: HashOrNumber,
362        fetch_from_db: S,
363        fetch_from_block_state: M,
364    ) -> ProviderResult<Option<R>>
365    where
366        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
367        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
368    {
369        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
370        let provider = &self.storage_provider;
371
372        // Get the last block number stored in the database which does NOT overlap with in-memory
373        // chain.
374        let last_database_block_number = in_mem_chain
375            .last()
376            .map(|b| Ok(b.anchor().number))
377            .unwrap_or_else(|| provider.last_block_number())?;
378
379        // Get the next tx number for the last block stored in the database and consider it the
380        // first tx number of the in-memory state
381        let last_block_body_index = provider
382            .block_body_indices(last_database_block_number)?
383            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
384        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
385
386        // If the transaction number is less than the first in-memory transaction number, make a
387        // database lookup
388        if let HashOrNumber::Number(id) = id &&
389            id < in_memory_tx_num
390        {
391            return fetch_from_db(provider)
392        }
393
394        // Iterate from the lowest block to the highest
395        for block_state in in_mem_chain.iter().rev() {
396            let executed_block = block_state.block_ref();
397            let block = executed_block.recovered_block();
398
399            for tx_index in 0..block.body().transactions().len() {
400                match id {
401                    HashOrNumber::Hash(tx_hash) => {
402                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
403                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
404                        }
405                    }
406                    HashOrNumber::Number(id) => {
407                        if id == in_memory_tx_num {
408                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
409                        }
410                    }
411                }
412
413                in_memory_tx_num += 1;
414            }
415        }
416
417        // Not found in-memory, so check database.
418        if let HashOrNumber::Hash(_) = id {
419            return fetch_from_db(provider)
420        }
421
422        Ok(None)
423    }
424
425    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
426    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
427        &self,
428        id: BlockHashOrNumber,
429        fetch_from_db: S,
430        fetch_from_block_state: M,
431    ) -> ProviderResult<R>
432    where
433        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
434        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
435    {
436        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
437            return fetch_from_block_state(block_state)
438        }
439        fetch_from_db(&self.storage_provider)
440    }
441
442    /// Consumes the provider and returns a state provider for the specific block hash.
443    pub(crate) fn into_state_provider_at_block_hash(
444        self,
445        block_hash: BlockHash,
446    ) -> ProviderResult<StateProviderBox> {
447        // Resolve block number and verify it's canonical before destructuring self
448        let block_number =
449            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
450        self.ensure_canonical_block(block_number)?;
451
452        let Self { storage_provider, head_block, .. } = self;
453        if let Some(Some(block_state)) =
454            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
455        {
456            let anchor_hash = block_state.anchor().hash;
457            let block_number = storage_provider
458                .block_number(anchor_hash)?
459                .ok_or(ProviderError::BlockHashNotFound(anchor_hash))?;
460            let latest_historical = storage_provider.try_into_history_at_block(block_number)?;
461            return Ok(Box::new(block_state.state_provider(latest_historical)));
462        }
463        storage_provider.try_into_history_at_block(block_number)
464    }
465}
466
467impl<N: ProviderNodeTypes> ConsistentProvider<N> {
468    /// Ensures that the given block number is canonical (synced)
469    ///
470    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
471    /// out of range and would lead to invalid results, mainly during initial sync.
472    ///
473    /// Verifying the `block_number` would be expensive since we need to lookup sync table
474    /// Instead, we ensure that the `block_number` is within the range of the
475    /// [`Self::best_block_number`] which is updated when a block is synced.
476    #[inline]
477    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
478        let latest = self.best_block_number()?;
479        if block_number > latest {
480            Err(ProviderError::HeaderNotFound(block_number.into()))
481        } else {
482            Ok(())
483        }
484    }
485}
486
487impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
488    type Primitives = N::Primitives;
489}
490
491impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
492    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
493        self.storage_provider.static_file_provider()
494    }
495
496    fn get_static_file_writer(
497        &self,
498        block: BlockNumber,
499        segment: StaticFileSegment,
500    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
501        self.storage_provider.get_static_file_writer(block, segment)
502    }
503}
504
505impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
506    type Header = HeaderTy<N>;
507
508    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
509        self.get_in_memory_or_storage_by_block(
510            block_hash.into(),
511            |db_provider| db_provider.header(block_hash),
512            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
513        )
514    }
515
516    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
517        self.get_in_memory_or_storage_by_block(
518            num.into(),
519            |db_provider| db_provider.header_by_number(num),
520            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
521        )
522    }
523
524    fn headers_range(
525        &self,
526        range: impl RangeBounds<BlockNumber>,
527    ) -> ProviderResult<Vec<Self::Header>> {
528        self.get_in_memory_or_storage_by_block_range_while(
529            range,
530            |db_provider, range, _| db_provider.headers_range(range),
531            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
532            |_| true,
533        )
534    }
535
536    fn sealed_header(
537        &self,
538        number: BlockNumber,
539    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
540        self.get_in_memory_or_storage_by_block(
541            number.into(),
542            |db_provider| db_provider.sealed_header(number),
543            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
544        )
545    }
546
547    fn sealed_headers_range(
548        &self,
549        range: impl RangeBounds<BlockNumber>,
550    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
551        self.get_in_memory_or_storage_by_block_range_while(
552            range,
553            |db_provider, range, _| db_provider.sealed_headers_range(range),
554            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
555            |_| true,
556        )
557    }
558
559    fn sealed_headers_while(
560        &self,
561        range: impl RangeBounds<BlockNumber>,
562        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
563    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
564        self.get_in_memory_or_storage_by_block_range_while(
565            range,
566            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
567            |block_state, predicate| {
568                let header = block_state.block_ref().recovered_block().sealed_header();
569                predicate(header).then(|| header.clone())
570            },
571            predicate,
572        )
573    }
574}
575
576impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
577    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
578        self.get_in_memory_or_storage_by_block(
579            number.into(),
580            |db_provider| db_provider.block_hash(number),
581            |block_state| Ok(Some(block_state.hash())),
582        )
583    }
584
585    fn canonical_hashes_range(
586        &self,
587        start: BlockNumber,
588        end: BlockNumber,
589    ) -> ProviderResult<Vec<B256>> {
590        self.get_in_memory_or_storage_by_block_range_while(
591            start..end,
592            |db_provider, inclusive_range, _| {
593                db_provider
594                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
595            },
596            |block_state, _| Some(block_state.hash()),
597            |_| true,
598        )
599    }
600}
601
602impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
603    fn chain_info(&self) -> ProviderResult<ChainInfo> {
604        let best_number = self.best_block_number()?;
605        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
606    }
607
608    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
609        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
610    }
611
612    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
613        self.storage_provider.last_block_number()
614    }
615
616    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
617        self.get_in_memory_or_storage_by_block(
618            hash.into(),
619            |db_provider| db_provider.block_number(hash),
620            |block_state| Ok(Some(block_state.number())),
621        )
622    }
623}
624
625impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
626    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
627        Ok(self.canonical_in_memory_state.pending_block_num_hash())
628    }
629
630    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
631        Ok(self.canonical_in_memory_state.get_safe_num_hash())
632    }
633
634    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
635        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
636    }
637}
638
639impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
640    type Block = BlockTy<N>;
641
642    fn find_block_by_hash(
643        &self,
644        hash: B256,
645        source: BlockSource,
646    ) -> ProviderResult<Option<Self::Block>> {
647        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
648            let Some(block) = self.get_in_memory_or_storage_by_block(
649                hash.into(),
650                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
651                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
652            )?
653        {
654            return Ok(Some(block))
655        }
656
657        if matches!(source, BlockSource::Pending | BlockSource::Any) {
658            return Ok(self
659                .canonical_in_memory_state
660                .pending_block()
661                .filter(|b| b.hash() == hash)
662                .map(|b| b.into_block()))
663        }
664
665        Ok(None)
666    }
667
668    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
669        self.get_in_memory_or_storage_by_block(
670            id,
671            |db_provider| db_provider.block(id),
672            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
673        )
674    }
675
676    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
677        Ok(self.canonical_in_memory_state.pending_recovered_block())
678    }
679
680    fn pending_block_and_receipts(
681        &self,
682    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
683        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
684    }
685
686    /// Returns the block with senders with matching number or hash from database.
687    ///
688    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
689    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
690    ///
691    /// Returns `None` if block is not found.
692    fn recovered_block(
693        &self,
694        id: BlockHashOrNumber,
695        transaction_kind: TransactionVariant,
696    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
697        self.get_in_memory_or_storage_by_block(
698            id,
699            |db_provider| db_provider.recovered_block(id, transaction_kind),
700            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
701        )
702    }
703
704    fn sealed_block_with_senders(
705        &self,
706        id: BlockHashOrNumber,
707        transaction_kind: TransactionVariant,
708    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
709        self.get_in_memory_or_storage_by_block(
710            id,
711            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
712            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
713        )
714    }
715
716    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
717        self.get_in_memory_or_storage_by_block_range_while(
718            range,
719            |db_provider, range, _| db_provider.block_range(range),
720            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
721            |_| true,
722        )
723    }
724
725    fn block_with_senders_range(
726        &self,
727        range: RangeInclusive<BlockNumber>,
728    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
729        self.get_in_memory_or_storage_by_block_range_while(
730            range,
731            |db_provider, range, _| db_provider.block_with_senders_range(range),
732            |block_state, _| Some(block_state.block().recovered_block().clone()),
733            |_| true,
734        )
735    }
736
737    fn recovered_block_range(
738        &self,
739        range: RangeInclusive<BlockNumber>,
740    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
741        self.get_in_memory_or_storage_by_block_range_while(
742            range,
743            |db_provider, range, _| db_provider.recovered_block_range(range),
744            |block_state, _| Some(block_state.block().recovered_block().clone()),
745            |_| true,
746        )
747    }
748
749    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
750        self.get_in_memory_or_storage_by_tx(
751            id.into(),
752            |db_provider| db_provider.block_by_transaction_id(id),
753            |_, _, block_state| Ok(Some(block_state.number())),
754        )
755    }
756}
757
758impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
759    type Transaction = TxTy<N>;
760
761    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
762        self.get_in_memory_or_storage_by_tx(
763            tx_hash.into(),
764            |db_provider| db_provider.transaction_id(tx_hash),
765            |_, tx_number, _| Ok(Some(tx_number)),
766        )
767    }
768
769    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
770        self.get_in_memory_or_storage_by_tx(
771            id.into(),
772            |provider| provider.transaction_by_id(id),
773            |tx_index, _, block_state| {
774                Ok(block_state
775                    .block_ref()
776                    .recovered_block()
777                    .body()
778                    .transactions()
779                    .get(tx_index)
780                    .cloned())
781            },
782        )
783    }
784
785    fn transaction_by_id_unhashed(
786        &self,
787        id: TxNumber,
788    ) -> ProviderResult<Option<Self::Transaction>> {
789        self.get_in_memory_or_storage_by_tx(
790            id.into(),
791            |provider| provider.transaction_by_id_unhashed(id),
792            |tx_index, _, block_state| {
793                Ok(block_state
794                    .block_ref()
795                    .recovered_block()
796                    .body()
797                    .transactions()
798                    .get(tx_index)
799                    .cloned())
800            },
801        )
802    }
803
804    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
805        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
806            return Ok(Some(tx))
807        }
808
809        self.storage_provider.transaction_by_hash(hash)
810    }
811
812    fn transaction_by_hash_with_meta(
813        &self,
814        tx_hash: TxHash,
815    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
816        if let Some((tx, meta)) =
817            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
818        {
819            return Ok(Some((tx, meta)))
820        }
821
822        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
823    }
824
825    fn transactions_by_block(
826        &self,
827        id: BlockHashOrNumber,
828    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
829        self.get_in_memory_or_storage_by_block(
830            id,
831            |provider| provider.transactions_by_block(id),
832            |block_state| {
833                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
834            },
835        )
836    }
837
838    fn transactions_by_block_range(
839        &self,
840        range: impl RangeBounds<BlockNumber>,
841    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
842        self.get_in_memory_or_storage_by_block_range_while(
843            range,
844            |db_provider, range, _| db_provider.transactions_by_block_range(range),
845            |block_state, _| {
846                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
847            },
848            |_| true,
849        )
850    }
851
852    fn transactions_by_tx_range(
853        &self,
854        range: impl RangeBounds<TxNumber>,
855    ) -> ProviderResult<Vec<Self::Transaction>> {
856        self.get_in_memory_or_storage_by_tx_range(
857            range,
858            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
859            |index_range, block_state| {
860                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
861                    .to_vec())
862            },
863        )
864    }
865
866    fn senders_by_tx_range(
867        &self,
868        range: impl RangeBounds<TxNumber>,
869    ) -> ProviderResult<Vec<Address>> {
870        self.get_in_memory_or_storage_by_tx_range(
871            range,
872            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
873            |index_range, block_state| {
874                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
875            },
876        )
877    }
878
879    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
880        self.get_in_memory_or_storage_by_tx(
881            id.into(),
882            |provider| provider.transaction_sender(id),
883            |tx_index, _, block_state| {
884                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
885            },
886        )
887    }
888}
889
890impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
891    type Receipt = ReceiptTy<N>;
892
893    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
894        self.get_in_memory_or_storage_by_tx(
895            id.into(),
896            |provider| provider.receipt(id),
897            |tx_index, _, block_state| {
898                Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
899            },
900        )
901    }
902
903    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
904        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
905            let executed_block = block_state.block_ref();
906            let block = executed_block.recovered_block();
907            let receipts = block_state.executed_block_receipts_ref();
908
909            // assuming 1:1 correspondence between transactions and receipts
910            debug_assert_eq!(
911                block.body().transactions().len(),
912                receipts.len(),
913                "Mismatch between transaction and receipt count"
914            );
915
916            if let Some(tx_index) =
917                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
918            {
919                // safe to use tx_index for receipts due to 1:1 correspondence
920                return Ok(receipts.get(tx_index).cloned());
921            }
922        }
923
924        self.storage_provider.receipt_by_hash(hash)
925    }
926
927    fn receipts_by_block(
928        &self,
929        block: BlockHashOrNumber,
930    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
931        self.get_in_memory_or_storage_by_block(
932            block,
933            |db_provider| db_provider.receipts_by_block(block),
934            |block_state| Ok(Some(block_state.executed_block_receipts())),
935        )
936    }
937
938    fn receipts_by_tx_range(
939        &self,
940        range: impl RangeBounds<TxNumber>,
941    ) -> ProviderResult<Vec<Self::Receipt>> {
942        self.get_in_memory_or_storage_by_tx_range(
943            range,
944            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
945            |index_range, block_state| {
946                Ok(block_state.executed_block_receipts_ref()[index_range].to_vec())
947            },
948        )
949    }
950
951    fn receipts_by_block_range(
952        &self,
953        block_range: RangeInclusive<BlockNumber>,
954    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
955        self.storage_provider.receipts_by_block_range(block_range)
956    }
957}
958
959impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
960    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
961        match block {
962            BlockId::Hash(rpc_block_hash) => {
963                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
964                if receipts.is_none() &&
965                    !rpc_block_hash.require_canonical.unwrap_or(false) &&
966                    let Some(state) = self
967                        .head_block
968                        .as_ref()
969                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
970                {
971                    receipts = Some(state.executed_block_receipts());
972                }
973                Ok(receipts)
974            }
975            BlockId::Number(num_tag) => match num_tag {
976                BlockNumberOrTag::Pending => Ok(self
977                    .canonical_in_memory_state
978                    .pending_state()
979                    .map(|block_state| block_state.executed_block_receipts())),
980                _ => {
981                    if let Some(num) = self.convert_block_number(num_tag)? {
982                        self.receipts_by_block(num.into())
983                    } else {
984                        Ok(None)
985                    }
986                }
987            },
988        }
989    }
990}
991
992impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
993    fn block_body_indices(
994        &self,
995        number: BlockNumber,
996    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
997        self.get_in_memory_or_storage_by_block(
998            number.into(),
999            |db_provider| db_provider.block_body_indices(number),
1000            |block_state| {
1001                // Find the last block indices on database
1002                let last_storage_block_number = block_state.anchor().number;
1003                let mut stored_indices = self
1004                    .storage_provider
1005                    .block_body_indices(last_storage_block_number)?
1006                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1007
1008                // Prepare our block indices
1009                stored_indices.first_tx_num = stored_indices.next_tx_num();
1010                stored_indices.tx_count = 0;
1011
1012                // Iterate from the lowest block in memory until our target block
1013                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1014                    let block_tx_count =
1015                        state.block_ref().recovered_block().body().transactions().len() as u64;
1016                    if state.block_ref().recovered_block().number() == number {
1017                        stored_indices.tx_count = block_tx_count;
1018                    } else {
1019                        stored_indices.first_tx_num += block_tx_count;
1020                    }
1021                }
1022
1023                Ok(Some(stored_indices))
1024            },
1025        )
1026    }
1027
1028    fn block_body_indices_range(
1029        &self,
1030        range: RangeInclusive<BlockNumber>,
1031    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1032        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1033    }
1034}
1035
1036impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1037    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1038        self.storage_provider.get_stage_checkpoint(id)
1039    }
1040
1041    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1042        self.storage_provider.get_stage_checkpoint_progress(id)
1043    }
1044
1045    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1046        self.storage_provider.get_all_checkpoints()
1047    }
1048}
1049
1050impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1051    fn get_prune_checkpoint(
1052        &self,
1053        segment: PruneSegment,
1054    ) -> ProviderResult<Option<PruneCheckpoint>> {
1055        self.storage_provider.get_prune_checkpoint(segment)
1056    }
1057
1058    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1059        self.storage_provider.get_prune_checkpoints()
1060    }
1061}
1062
1063impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1064    type ChainSpec = N::ChainSpec;
1065
1066    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1067        ChainSpecProvider::chain_spec(&self.storage_provider)
1068    }
1069}
1070
1071impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1072    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1073        match id {
1074            BlockId::Number(num) => self.block_by_number_or_tag(num),
1075            BlockId::Hash(hash) => {
1076                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1077                // so not at the provider level?
1078                // if we decide to do this at a higher level, then we can make this an automatic
1079                // trait impl
1080                if Some(true) == hash.require_canonical {
1081                    // check the database, canonical blocks are only stored in the database
1082                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1083                } else {
1084                    self.block_by_hash(hash.block_hash)
1085                }
1086            }
1087        }
1088    }
1089
1090    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1091        Ok(match id {
1092            BlockNumberOrTag::Latest => {
1093                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1094            }
1095            BlockNumberOrTag::Finalized => {
1096                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1097            }
1098            BlockNumberOrTag::Safe => {
1099                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1100            }
1101            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1102            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1103
1104            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1105        })
1106    }
1107
1108    fn sealed_header_by_number_or_tag(
1109        &self,
1110        id: BlockNumberOrTag,
1111    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1112        match id {
1113            BlockNumberOrTag::Latest => {
1114                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1115            }
1116            BlockNumberOrTag::Finalized => {
1117                Ok(self.canonical_in_memory_state.get_finalized_header())
1118            }
1119            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1120            BlockNumberOrTag::Earliest => self
1121                .header_by_number(self.earliest_block_number()?)?
1122                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1123            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1124            BlockNumberOrTag::Number(num) => self
1125                .header_by_number(num)?
1126                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1127        }
1128    }
1129
1130    fn sealed_header_by_id(
1131        &self,
1132        id: BlockId,
1133    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1134        Ok(match id {
1135            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1136            BlockId::Hash(hash) => self
1137                .header(hash.block_hash)?
1138                .map(|header| SealedHeader::new(header, hash.block_hash)),
1139        })
1140    }
1141
1142    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1143        Ok(match id {
1144            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1145            BlockId::Hash(hash) => self.header(hash.block_hash)?,
1146        })
1147    }
1148}
1149
1150impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1151    fn storage_changeset(
1152        &self,
1153        block_number: BlockNumber,
1154    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1155        if let Some(state) =
1156            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1157        {
1158            let changesets = state
1159                .block()
1160                .execution_output
1161                .state
1162                .reverts
1163                .to_plain_state_reverts()
1164                .storage
1165                .into_iter()
1166                .flatten()
1167                .flat_map(|revert: PlainStorageRevert| {
1168                    revert.storage_revert.into_iter().map(move |(key, value)| {
1169                        let plain_key = B256::from(key.to_be_bytes());
1170                        (
1171                            BlockNumberAddress((block_number, revert.address)),
1172                            StorageEntry { key: plain_key, value: value.to_previous_value() },
1173                        )
1174                    })
1175                })
1176                .collect();
1177            Ok(changesets)
1178        } else {
1179            // Perform checks on whether or not changesets exist for the block.
1180
1181            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1182            let storage_history_exists = self
1183                .storage_provider
1184                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1185                .and_then(|checkpoint| {
1186                    // return true if the block number is ahead of the prune checkpoint.
1187                    //
1188                    // The checkpoint stores the highest pruned block number, so we should make
1189                    // sure the block_number is strictly greater.
1190                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1191                })
1192                .unwrap_or(true);
1193
1194            if !storage_history_exists {
1195                return Err(ProviderError::StateAtBlockPruned(block_number))
1196            }
1197
1198            self.storage_provider.storage_changeset(block_number)
1199        }
1200    }
1201
1202    fn get_storage_before_block(
1203        &self,
1204        block_number: BlockNumber,
1205        address: Address,
1206        storage_key: B256,
1207    ) -> ProviderResult<Option<StorageEntry>> {
1208        if let Some(state) =
1209            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1210        {
1211            let changeset = state
1212                .block_ref()
1213                .execution_output
1214                .state
1215                .reverts
1216                .to_plain_state_reverts()
1217                .storage
1218                .into_iter()
1219                .flatten()
1220                .find_map(|revert: PlainStorageRevert| {
1221                    if revert.address != address {
1222                        return None
1223                    }
1224                    revert.storage_revert.into_iter().find_map(|(key, value)| {
1225                        let plain_key = B256::from(key.to_be_bytes());
1226                        (plain_key == storage_key).then(|| StorageEntry {
1227                            key: plain_key,
1228                            value: value.to_previous_value(),
1229                        })
1230                    })
1231                });
1232            Ok(changeset)
1233        } else {
1234            let storage_history_exists = self
1235                .storage_provider
1236                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1237                .and_then(|checkpoint| {
1238                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1239                })
1240                .unwrap_or(true);
1241
1242            if !storage_history_exists {
1243                return Err(ProviderError::StateAtBlockPruned(block_number))
1244            }
1245
1246            self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1247        }
1248    }
1249
1250    fn storage_changesets_range(
1251        &self,
1252        range: impl RangeBounds<BlockNumber>,
1253    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1254        let range = to_range(range);
1255        let mut changesets = Vec::new();
1256        let database_start = range.start;
1257        let mut database_end = range.end;
1258
1259        if let Some(head_block) = &self.head_block {
1260            database_end = head_block.anchor().number;
1261
1262            for state in head_block.chain() {
1263                let block_changesets = state
1264                    .block_ref()
1265                    .execution_output
1266                    .state
1267                    .reverts
1268                    .to_plain_state_reverts()
1269                    .storage
1270                    .into_iter()
1271                    .flatten()
1272                    .flat_map(|revert: PlainStorageRevert| {
1273                        revert.storage_revert.into_iter().map(move |(key, value)| {
1274                            let plain_key = B256::from(key.to_be_bytes());
1275                            (
1276                                BlockNumberAddress((state.number(), revert.address)),
1277                                StorageEntry { key: plain_key, value: value.to_previous_value() },
1278                            )
1279                        })
1280                    });
1281
1282                changesets.extend(block_changesets);
1283            }
1284        }
1285
1286        if database_start < database_end {
1287            let storage_history_exists = self
1288                .storage_provider
1289                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1290                .and_then(|checkpoint| {
1291                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1292                })
1293                .unwrap_or(true);
1294
1295            if !storage_history_exists {
1296                return Err(ProviderError::StateAtBlockPruned(database_start))
1297            }
1298
1299            let db_changesets = self
1300                .storage_provider
1301                .storage_changesets_range(database_start..=database_end - 1)?;
1302            changesets.extend(db_changesets);
1303        }
1304
1305        changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1306
1307        Ok(changesets)
1308    }
1309}
1310
1311impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1312    fn account_block_changeset(
1313        &self,
1314        block_number: BlockNumber,
1315    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1316        if let Some(state) =
1317            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1318        {
1319            let changesets = state
1320                .block_ref()
1321                .execution_output
1322                .state
1323                .reverts
1324                .to_plain_state_reverts()
1325                .accounts
1326                .into_iter()
1327                .flatten()
1328                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1329                .collect();
1330            Ok(changesets)
1331        } else {
1332            // Perform checks on whether or not changesets exist for the block.
1333
1334            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1335            let account_history_exists = self
1336                .storage_provider
1337                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1338                .and_then(|checkpoint| {
1339                    // return true if the block number is ahead of the prune checkpoint.
1340                    //
1341                    // The checkpoint stores the highest pruned block number, so we should make
1342                    // sure the block_number is strictly greater.
1343                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1344                })
1345                .unwrap_or(true);
1346
1347            if !account_history_exists {
1348                return Err(ProviderError::StateAtBlockPruned(block_number))
1349            }
1350
1351            self.storage_provider.account_block_changeset(block_number)
1352        }
1353    }
1354
1355    fn get_account_before_block(
1356        &self,
1357        block_number: BlockNumber,
1358        address: Address,
1359    ) -> ProviderResult<Option<AccountBeforeTx>> {
1360        if let Some(state) =
1361            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1362        {
1363            // Search in-memory state for the account changeset
1364            let changeset = state
1365                .block_ref()
1366                .execution_output
1367                .state
1368                .reverts
1369                .to_plain_state_reverts()
1370                .accounts
1371                .into_iter()
1372                .flatten()
1373                .find(|(addr, _)| addr == &address)
1374                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1375            Ok(changeset)
1376        } else {
1377            // Perform checks on whether or not changesets exist for the block.
1378            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1379            let account_history_exists = self
1380                .storage_provider
1381                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1382                .and_then(|checkpoint| {
1383                    // return true if the block number is ahead of the prune checkpoint.
1384                    //
1385                    // The checkpoint stores the highest pruned block number, so we should make
1386                    // sure the block_number is strictly greater.
1387                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1388                })
1389                .unwrap_or(true);
1390
1391            if !account_history_exists {
1392                return Err(ProviderError::StateAtBlockPruned(block_number))
1393            }
1394
1395            // Delegate to the storage provider for database lookups
1396            self.storage_provider.get_account_before_block(block_number, address)
1397        }
1398    }
1399
1400    fn account_changesets_range(
1401        &self,
1402        range: impl core::ops::RangeBounds<BlockNumber>,
1403    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1404        let range = to_range(range);
1405        let mut changesets = Vec::new();
1406        let database_start = range.start;
1407        let mut database_end = range.end;
1408
1409        // Check which blocks in the range are in memory
1410        if let Some(head_block) = &self.head_block {
1411            // the anchor is the end of the db range
1412            database_end = head_block.anchor().number;
1413
1414            for state in head_block.chain() {
1415                // found block in memory, collect its changesets
1416                let block_changesets = state
1417                    .block_ref()
1418                    .execution_output
1419                    .state
1420                    .reverts
1421                    .to_plain_state_reverts()
1422                    .accounts
1423                    .into_iter()
1424                    .flatten()
1425                    .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1426
1427                for changeset in block_changesets {
1428                    changesets.push((state.number(), changeset));
1429                }
1430            }
1431        }
1432
1433        // get changesets from database for remaining blocks
1434        if database_start < database_end {
1435            // check if account history is pruned for these blocks
1436            let account_history_exists = self
1437                .storage_provider
1438                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1439                .and_then(|checkpoint| {
1440                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1441                })
1442                .unwrap_or(true);
1443
1444            if !account_history_exists {
1445                return Err(ProviderError::StateAtBlockPruned(database_start))
1446            }
1447
1448            let db_changesets =
1449                self.storage_provider.account_changesets_range(database_start..database_end)?;
1450            changesets.extend(db_changesets);
1451        }
1452
1453        changesets.sort_by_key(|(block_num, _)| *block_num);
1454
1455        Ok(changesets)
1456    }
1457}
1458
1459impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1460    /// Get basic account information.
1461    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1462        // use latest state provider
1463        let state_provider = self.latest_ref()?;
1464        state_provider.basic_account(address)
1465    }
1466}
1467
1468impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1469    type Receipt = ReceiptTy<N>;
1470
1471    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1472    ///
1473    /// If data for the block does not exist, this will return [`None`].
1474    ///
1475    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1476    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1477    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1478    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1479    /// first place.
1480    fn get_state(
1481        &self,
1482        block: BlockNumber,
1483    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1484        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1485            let state = state.block_ref().execution_outcome().clone();
1486            Ok(Some(ExecutionOutcome::from((state, block))))
1487        } else {
1488            self.storage_provider.get_state(block)
1489        }
1490    }
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495    use crate::{
1496        providers::blockchain_provider::BlockchainProvider,
1497        test_utils::create_test_provider_factory, BlockWriter,
1498    };
1499    use alloy_eips::BlockHashOrNumber;
1500    use alloy_primitives::B256;
1501    use itertools::Itertools;
1502    use rand::Rng;
1503    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1504    use reth_db_api::models::AccountBeforeTx;
1505    use reth_ethereum_primitives::Block;
1506    use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1507    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1508    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader, StateReader};
1509    use reth_testing_utils::generators::{
1510        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1511    };
1512    use revm_database::BundleState;
1513    use std::{
1514        ops::{Bound, Range, RangeBounds},
1515        sync::Arc,
1516    };
1517
1518    const TEST_BLOCKS_COUNT: usize = 5;
1519
1520    fn random_blocks(
1521        rng: &mut impl Rng,
1522        database_blocks: usize,
1523        in_memory_blocks: usize,
1524        requests_count: Option<Range<u8>>,
1525        withdrawals_count: Option<Range<u8>>,
1526        tx_count: impl RangeBounds<u8>,
1527    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1528        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1529
1530        let tx_start = match tx_count.start_bound() {
1531            Bound::Included(&n) | Bound::Excluded(&n) => n,
1532            Bound::Unbounded => u8::MIN,
1533        };
1534        let tx_end = match tx_count.end_bound() {
1535            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1536            Bound::Unbounded => u8::MAX,
1537        };
1538
1539        let blocks = random_block_range(
1540            rng,
1541            0..=block_range,
1542            BlockRangeParams {
1543                parent: Some(B256::ZERO),
1544                tx_count: tx_start..tx_end,
1545                requests_count,
1546                withdrawals_count,
1547            },
1548        );
1549        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1550        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1551    }
1552
1553    #[test]
1554    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1555        // Initialize random number generator and provider factory
1556        let mut rng = generators::rng();
1557        let factory = create_test_provider_factory();
1558
1559        // Generate 10 random blocks and split into database and in-memory blocks
1560        let blocks = random_block_range(
1561            &mut rng,
1562            0..=10,
1563            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1564        );
1565        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1566
1567        // Insert first 5 blocks into the database
1568        let provider_rw = factory.provider_rw()?;
1569        for block in database_blocks {
1570            provider_rw.insert_block(
1571                &block.clone().try_recover().expect("failed to seal block with senders"),
1572            )?;
1573        }
1574        provider_rw.commit()?;
1575
1576        // Create a new provider
1577        let provider = BlockchainProvider::new(factory)?;
1578        let consistent_provider = provider.consistent_provider()?;
1579
1580        // Useful blocks
1581        let first_db_block = database_blocks.first().unwrap();
1582        let first_in_mem_block = in_memory_blocks.first().unwrap();
1583        let last_in_mem_block = in_memory_blocks.last().unwrap();
1584
1585        // No block in memory before setting in memory state
1586        assert_eq!(
1587            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1588            None
1589        );
1590        assert_eq!(
1591            consistent_provider
1592                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1593            None
1594        );
1595        // No pending block in memory
1596        assert_eq!(
1597            consistent_provider
1598                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1599            None
1600        );
1601
1602        // Insert first block into the in-memory state
1603        let in_memory_block_senders =
1604            first_in_mem_block.senders().expect("failed to recover senders");
1605        let chain = NewCanonicalChain::Commit {
1606            new: vec![ExecutedBlock {
1607                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1608                    first_in_mem_block.clone(),
1609                    in_memory_block_senders,
1610                )),
1611                ..Default::default()
1612            }],
1613        };
1614        consistent_provider.canonical_in_memory_state.update_chain(chain);
1615        let consistent_provider = provider.consistent_provider()?;
1616
1617        // Now the block should be found in memory
1618        assert_eq!(
1619            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1620            Some(first_in_mem_block.clone().into_block())
1621        );
1622        assert_eq!(
1623            consistent_provider
1624                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1625            Some(first_in_mem_block.clone().into_block())
1626        );
1627
1628        // Find the first block in database by hash
1629        assert_eq!(
1630            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1631            Some(first_db_block.clone().into_block())
1632        );
1633        assert_eq!(
1634            consistent_provider
1635                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1636            Some(first_db_block.clone().into_block())
1637        );
1638
1639        // No pending block in database
1640        assert_eq!(
1641            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1642            None
1643        );
1644
1645        // Insert the last block into the pending state
1646        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1647            recovered_block: Arc::new(RecoveredBlock::new_sealed(
1648                last_in_mem_block.clone(),
1649                Default::default(),
1650            )),
1651            ..Default::default()
1652        });
1653
1654        // Now the last block should be found in memory
1655        assert_eq!(
1656            consistent_provider
1657                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1658            Some(last_in_mem_block.clone_block())
1659        );
1660
1661        Ok(())
1662    }
1663
1664    #[test]
1665    fn test_block_reader_block() -> eyre::Result<()> {
1666        // Initialize random number generator and provider factory
1667        let mut rng = generators::rng();
1668        let factory = create_test_provider_factory();
1669
1670        // Generate 10 random blocks and split into database and in-memory blocks
1671        let blocks = random_block_range(
1672            &mut rng,
1673            0..=10,
1674            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1675        );
1676        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1677
1678        // Insert first 5 blocks into the database
1679        let provider_rw = factory.provider_rw()?;
1680        for block in database_blocks {
1681            provider_rw.insert_block(
1682                &block.clone().try_recover().expect("failed to seal block with senders"),
1683            )?;
1684        }
1685        provider_rw.commit()?;
1686
1687        // Create a new provider
1688        let provider = BlockchainProvider::new(factory)?;
1689        let consistent_provider = provider.consistent_provider()?;
1690
1691        // First in memory block
1692        let first_in_mem_block = in_memory_blocks.first().unwrap();
1693        // First database block
1694        let first_db_block = database_blocks.first().unwrap();
1695
1696        // First in memory block should not be found yet as not integrated to the in-memory state
1697        assert_eq!(
1698            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1699            None
1700        );
1701        assert_eq!(
1702            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1703            None
1704        );
1705
1706        // Insert first block into the in-memory state
1707        let in_memory_block_senders =
1708            first_in_mem_block.senders().expect("failed to recover senders");
1709        let chain = NewCanonicalChain::Commit {
1710            new: vec![ExecutedBlock {
1711                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1712                    first_in_mem_block.clone(),
1713                    in_memory_block_senders,
1714                )),
1715                ..Default::default()
1716            }],
1717        };
1718        consistent_provider.canonical_in_memory_state.update_chain(chain);
1719
1720        let consistent_provider = provider.consistent_provider()?;
1721
1722        // First in memory block should be found
1723        assert_eq!(
1724            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1725            Some(first_in_mem_block.clone().into_block())
1726        );
1727        assert_eq!(
1728            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1729            Some(first_in_mem_block.clone().into_block())
1730        );
1731
1732        // First database block should be found
1733        assert_eq!(
1734            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1735            Some(first_db_block.clone().into_block())
1736        );
1737        assert_eq!(
1738            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1739            Some(first_db_block.clone().into_block())
1740        );
1741
1742        Ok(())
1743    }
1744
1745    #[test]
1746    fn test_changeset_reader() -> eyre::Result<()> {
1747        let mut rng = generators::rng();
1748
1749        let (database_blocks, in_memory_blocks) =
1750            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1751
1752        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1753        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1754        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1755
1756        let accounts = random_eoa_accounts(&mut rng, 2);
1757
1758        let (database_changesets, database_state) = random_changeset_range(
1759            &mut rng,
1760            &database_blocks,
1761            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1762            0..0,
1763            0..0,
1764        );
1765        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1766            &mut rng,
1767            &in_memory_blocks,
1768            database_state
1769                .iter()
1770                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1771            0..0,
1772            0..0,
1773        );
1774
1775        let factory = create_test_provider_factory();
1776
1777        let provider_rw = factory.provider_rw()?;
1778        provider_rw.append_blocks_with_state(
1779            database_blocks
1780                .into_iter()
1781                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1782                .collect(),
1783            &ExecutionOutcome {
1784                bundle: BundleState::new(
1785                    database_state.into_iter().map(|(address, (account, _))| {
1786                        (address, None, Some(account.into()), Default::default())
1787                    }),
1788                    database_changesets.iter().map(|block_changesets| {
1789                        block_changesets.iter().map(|(address, account, _)| {
1790                            (*address, Some(Some((*account).into())), [])
1791                        })
1792                    }),
1793                    Vec::new(),
1794                ),
1795                first_block: first_database_block,
1796                ..Default::default()
1797            },
1798            Default::default(),
1799        )?;
1800        provider_rw.commit()?;
1801
1802        let provider = BlockchainProvider::new(factory)?;
1803
1804        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1805        let chain = NewCanonicalChain::Commit {
1806            new: vec![in_memory_blocks
1807                .first()
1808                .map(|block| {
1809                    let senders = block.senders().expect("failed to recover senders");
1810                    ExecutedBlock {
1811                        recovered_block: Arc::new(RecoveredBlock::new_sealed(
1812                            block.clone(),
1813                            senders,
1814                        )),
1815                        execution_output: Arc::new(BlockExecutionOutput {
1816                            state: BundleState::new(
1817                                in_memory_state.into_iter().map(|(address, (account, _))| {
1818                                    (address, None, Some(account.into()), Default::default())
1819                                }),
1820                                [in_memory_changesets.iter().map(|(address, account, _)| {
1821                                    (*address, Some(Some((*account).into())), Vec::new())
1822                                })],
1823                                [],
1824                            ),
1825                            result: BlockExecutionResult {
1826                                receipts: Default::default(),
1827                                requests: Default::default(),
1828                                gas_used: 0,
1829                                blob_gas_used: 0,
1830                            },
1831                        }),
1832                        ..Default::default()
1833                    }
1834                })
1835                .unwrap()],
1836        };
1837        provider.canonical_in_memory_state.update_chain(chain);
1838
1839        let consistent_provider = provider.consistent_provider()?;
1840
1841        assert_eq!(
1842            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1843            database_changesets
1844                .into_iter()
1845                .next_back()
1846                .unwrap()
1847                .into_iter()
1848                .sorted_by_key(|(address, _, _)| *address)
1849                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1850                .collect::<Vec<_>>()
1851        );
1852        assert_eq!(
1853            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1854            in_memory_changesets
1855                .into_iter()
1856                .sorted_by_key(|(address, _, _)| *address)
1857                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1858                .collect::<Vec<_>>()
1859        );
1860
1861        Ok(())
1862    }
1863    #[test]
1864    fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
1865        use alloy_primitives::U256;
1866        use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
1867        use reth_primitives_traits::StorageEntry;
1868        use reth_storage_api::StorageSettingsCache;
1869        use std::collections::HashMap;
1870
1871        let address = alloy_primitives::Address::with_last_byte(1);
1872        let account = reth_primitives_traits::Account {
1873            nonce: 1,
1874            balance: U256::from(1000),
1875            bytecode_hash: None,
1876        };
1877        let slot = U256::from(0x42);
1878        let slot_b256 = B256::from(slot);
1879
1880        let mut rng = generators::rng();
1881        let factory = create_test_provider_factory();
1882        factory.set_storage_settings_cache(StorageSettings::v1());
1883
1884        let blocks = random_block_range(
1885            &mut rng,
1886            0..=1,
1887            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1888        );
1889
1890        let provider_rw = factory.provider_rw()?;
1891        provider_rw.append_blocks_with_state(
1892            blocks
1893                .into_iter()
1894                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1895                .collect(),
1896            &ExecutionOutcome {
1897                bundle: BundleState::new(
1898                    [(address, None, Some(account.into()), {
1899                        let mut s = HashMap::default();
1900                        s.insert(slot, (U256::ZERO, U256::from(100)));
1901                        s
1902                    })],
1903                    [
1904                        Vec::new(),
1905                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
1906                    ],
1907                    [],
1908                ),
1909                first_block: 0,
1910                ..Default::default()
1911            },
1912            Default::default(),
1913        )?;
1914
1915        provider_rw.tx_ref().put::<tables::PlainStorageState>(
1916            address,
1917            StorageEntry { key: slot_b256, value: U256::from(100) },
1918        )?;
1919        provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
1920
1921        provider_rw.commit()?;
1922
1923        let provider = BlockchainProvider::new(factory)?;
1924        let consistent_provider = provider.consistent_provider()?;
1925
1926        let outcome = consistent_provider.get_state(1)?.expect("should return execution outcome");
1927
1928        let state = &outcome.bundle.state;
1929        let account_state = state.get(&address).expect("should have account in bundle state");
1930        let storage = &account_state.storage;
1931
1932        let storage_slot = storage.get(&slot).expect("should have the slot in storage");
1933
1934        assert_eq!(
1935            storage_slot.present_value,
1936            U256::from(100),
1937            "present_value should be 100 (the actual value in PlainStorageState)"
1938        );
1939
1940        Ok(())
1941    }
1942
1943    #[test]
1944    fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
1945        use alloy_primitives::U256;
1946        use reth_db_api::models::StorageSettings;
1947        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
1948        use std::collections::HashMap;
1949
1950        let mut rng = generators::rng();
1951        let factory = create_test_provider_factory();
1952        factory.set_storage_settings_cache(StorageSettings::v1());
1953
1954        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
1955
1956        let address = alloy_primitives::Address::with_last_byte(1);
1957        let account = reth_primitives_traits::Account {
1958            nonce: 1,
1959            balance: U256::from(1000),
1960            bytecode_hash: None,
1961        };
1962        let slot = U256::from(0x42);
1963
1964        let provider_rw = factory.provider_rw()?;
1965        provider_rw.append_blocks_with_state(
1966            database_blocks
1967                .into_iter()
1968                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1969                .collect(),
1970            &ExecutionOutcome {
1971                bundle: BundleState::new(
1972                    [(address, None, Some(account.into()), {
1973                        let mut s = HashMap::default();
1974                        s.insert(slot, (U256::ZERO, U256::from(100)));
1975                        s
1976                    })],
1977                    [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
1978                    [],
1979                ),
1980                first_block: 0,
1981                ..Default::default()
1982            },
1983            Default::default(),
1984        )?;
1985        provider_rw.commit()?;
1986
1987        let provider = BlockchainProvider::new(factory)?;
1988
1989        let in_mem_block = in_memory_blocks.first().unwrap();
1990        let senders = in_mem_block.senders().expect("failed to recover senders");
1991        let chain = NewCanonicalChain::Commit {
1992            new: vec![ExecutedBlock {
1993                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1994                    in_mem_block.clone(),
1995                    senders,
1996                )),
1997                execution_output: Arc::new(BlockExecutionOutput {
1998                    state: BundleState::new(
1999                        [(address, None, Some(account.into()), {
2000                            let mut s = HashMap::default();
2001                            s.insert(slot, (U256::from(100), U256::from(200)));
2002                            s
2003                        })],
2004                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2005                        [],
2006                    ),
2007                    result: BlockExecutionResult {
2008                        receipts: Default::default(),
2009                        requests: Default::default(),
2010                        gas_used: 0,
2011                        blob_gas_used: 0,
2012                    },
2013                }),
2014                ..Default::default()
2015            }],
2016        };
2017        provider.canonical_in_memory_state.update_chain(chain);
2018
2019        let consistent_provider = provider.consistent_provider()?;
2020
2021        let db_changeset = consistent_provider.storage_changeset(0)?;
2022        let mem_changeset = consistent_provider.storage_changeset(1)?;
2023
2024        let slot_b256 = B256::from(slot);
2025
2026        assert_eq!(db_changeset.len(), 1);
2027        assert_eq!(mem_changeset.len(), 1);
2028
2029        let db_key = db_changeset[0].1.key;
2030        let mem_key = mem_changeset[0].1.key;
2031
2032        assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2033        assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2034        assert_eq!(
2035            db_key, mem_key,
2036            "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2037        );
2038
2039        Ok(())
2040    }
2041
2042    #[test]
2043    fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2044        use alloy_primitives::U256;
2045        use reth_db_api::models::StorageSettings;
2046        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2047        use std::collections::HashMap;
2048
2049        let mut rng = generators::rng();
2050        let factory = create_test_provider_factory();
2051        factory.set_storage_settings_cache(StorageSettings::v1());
2052
2053        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2054
2055        let address = alloy_primitives::Address::with_last_byte(1);
2056        let account = reth_primitives_traits::Account {
2057            nonce: 1,
2058            balance: U256::from(1000),
2059            bytecode_hash: None,
2060        };
2061        let slot = U256::from(0x42);
2062
2063        let provider_rw = factory.provider_rw()?;
2064        provider_rw.append_blocks_with_state(
2065            database_blocks
2066                .into_iter()
2067                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2068                .collect(),
2069            &ExecutionOutcome {
2070                bundle: BundleState::new(
2071                    [(address, None, Some(account.into()), {
2072                        let mut s = HashMap::default();
2073                        s.insert(slot, (U256::ZERO, U256::from(100)));
2074                        s
2075                    })],
2076                    vec![
2077                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2078                        vec![],
2079                    ],
2080                    [],
2081                ),
2082                first_block: 0,
2083                ..Default::default()
2084            },
2085            Default::default(),
2086        )?;
2087        provider_rw.commit()?;
2088
2089        let provider = BlockchainProvider::new(factory)?;
2090
2091        let in_mem_block = in_memory_blocks.first().unwrap();
2092        let senders = in_mem_block.senders().expect("failed to recover senders");
2093        let chain = NewCanonicalChain::Commit {
2094            new: vec![ExecutedBlock {
2095                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2096                    in_mem_block.clone(),
2097                    senders,
2098                )),
2099                execution_output: Arc::new(BlockExecutionOutput {
2100                    state: BundleState::new(
2101                        [(address, None, Some(account.into()), {
2102                            let mut s = HashMap::default();
2103                            s.insert(slot, (U256::from(100), U256::from(200)));
2104                            s
2105                        })],
2106                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2107                        [],
2108                    ),
2109                    result: BlockExecutionResult {
2110                        receipts: Default::default(),
2111                        requests: Default::default(),
2112                        gas_used: 0,
2113                        blob_gas_used: 0,
2114                    },
2115                }),
2116                ..Default::default()
2117            }],
2118        };
2119        provider.canonical_in_memory_state.update_chain(chain);
2120
2121        let consistent_provider = provider.consistent_provider()?;
2122
2123        let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2124
2125        assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2126
2127        let slot_b256 = B256::from(slot);
2128        let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key).collect();
2129
2130        assert_eq!(
2131            keys[0], keys[1],
2132            "same logical slot should produce identical keys whether from DB or memory"
2133        );
2134        assert_eq!(
2135            keys[0], slot_b256,
2136            "keys should be plain/unhashed when use_hashed_state is false"
2137        );
2138
2139        Ok(())
2140    }
2141}