Skip to main content

reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4    to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5    BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8    TransactionsProvider,
9};
10use alloy_consensus::{
11    transaction::{TransactionMeta, TxHashRef},
12    BlockHeader,
13};
14use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber};
15use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
16use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
17use reth_chainspec::ChainInfo;
18use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
19use reth_execution_types::ExecutionOutcome;
20use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
21use reth_primitives_traits::{
22    Account, BlockBody, RecoveredBlock, SealedHeader, SealedOrRecoveredBlock, StorageEntry,
23};
24use reth_prune_types::{PruneCheckpoint, PruneSegment};
25use reth_stages_types::{StageCheckpoint, StageId};
26use reth_static_file_types::StaticFileSegment;
27use reth_storage_api::{
28    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
29    StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
30};
31use reth_storage_errors::provider::ProviderResult;
32use revm_database::states::PlainStorageRevert;
33use std::{
34    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
35    sync::Arc,
36};
37use tracing::trace;
38
39/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
40/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
41/// this provider.
42///
43/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
44/// time-out.
45#[derive(Debug)]
46#[doc(hidden)] // triggers ICE for `cargo docs`
47pub struct ConsistentProvider<N: ProviderNodeTypes> {
48    /// Storage provider.
49    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
50    /// Head block at time of [`Self`] creation
51    head_block: Option<Arc<BlockState<N::Primitives>>>,
52    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
53    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
54}
55
56impl<N: ProviderNodeTypes> ConsistentProvider<N> {
57    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
58    ///
59    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
60    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
61    /// view of memory and database.
62    pub fn new(
63        storage_provider_factory: ProviderFactory<N>,
64        state: CanonicalInMemoryState<N::Primitives>,
65    ) -> ProviderResult<Self> {
66        // Each one provides a snapshot at the time of instantiation, but its order matters.
67        //
68        // If we acquire first the database provider, it's possible that before the in-memory chain
69        // snapshot is instantiated, it will flush blocks to disk. This would
70        // mean that our database provider would not have access to the flushed blocks (since it's
71        // working under an older view), while the in-memory state may have deleted them
72        // entirely. Resulting in gaps on the range.
73        let head_block = state.head_state();
74        let storage_provider = storage_provider_factory.database_provider_ro()?;
75        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
76    }
77
78    // Helper function to convert range bounds
79    fn convert_range_bounds<T>(
80        &self,
81        range: impl RangeBounds<T>,
82        end_unbounded: impl FnOnce() -> T,
83    ) -> (T, T)
84    where
85        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
86    {
87        let start = match range.start_bound() {
88            Bound::Included(&n) => n,
89            Bound::Excluded(&n) => n + T::from(1u8),
90            Bound::Unbounded => T::from(0u8),
91        };
92
93        let end = match range.end_bound() {
94            Bound::Included(&n) => n,
95            Bound::Excluded(&n) => n - T::from(1u8),
96            Bound::Unbounded => end_unbounded(),
97        };
98
99        (start, end)
100    }
101
102    /// Storage provider for latest block
103    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
104        trace!(target: "providers::blockchain", "Getting latest block state provider");
105
106        // use latest state provider if the head state exists
107        if let Some(state) = &self.head_block {
108            trace!(target: "providers::blockchain", "Using head state for latest state provider");
109            Ok(self.block_state_provider_ref(state)?.boxed())
110        } else {
111            trace!(target: "providers::blockchain", "Using database state for latest state provider");
112            Ok(self.storage_provider.latest())
113        }
114    }
115
116    fn history_by_block_hash_ref<'a>(
117        &'a self,
118        block_hash: BlockHash,
119    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
120        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
121
122        self.get_in_memory_or_storage_by_block(
123            block_hash.into(),
124            |_| self.storage_provider.history_by_block_hash(block_hash),
125            |block_state| {
126                let state_provider = self.block_state_provider_ref(block_state)?;
127                Ok(Box::new(state_provider))
128            },
129        )
130    }
131
132    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
133    /// is met.
134    ///
135    /// Creates a snapshot of the in-memory chain state and database provider to prevent
136    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
137    /// recent in-memory blocks in case of overlaps.
138    ///
139    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
140    ///   user to retrieve the required items from the database using [`RangeInclusive`].
141    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
142    ///   state, allowing for selection or filtering for the desired data.
143    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
144        &self,
145        range: impl RangeBounds<BlockNumber>,
146        fetch_db_range: F,
147        map_block_state_item: G,
148        mut predicate: P,
149    ) -> ProviderResult<Vec<T>>
150    where
151        F: FnOnce(
152            &DatabaseProviderRO<N::DB, N>,
153            RangeInclusive<BlockNumber>,
154            &mut P,
155        ) -> ProviderResult<Vec<T>>,
156        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
157        P: FnMut(&T) -> bool,
158    {
159        // Each one provides a snapshot at the time of instantiation, but its order matters.
160        //
161        // If we acquire first the database provider, it's possible that before the in-memory chain
162        // snapshot is instantiated, it will flush blocks to disk. This would
163        // mean that our database provider would not have access to the flushed blocks (since it's
164        // working under an older view), while the in-memory state may have deleted them
165        // entirely. Resulting in gaps on the range.
166        let mut in_memory_chain =
167            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
168        let db_provider = &self.storage_provider;
169
170        let (start, end) = self.convert_range_bounds(range, || {
171            // the first block is the highest one.
172            in_memory_chain
173                .first()
174                .map(|b| b.number())
175                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
176        });
177
178        if start > end {
179            return Ok(vec![])
180        }
181
182        // Split range into storage_range and in-memory range. If the in-memory range is not
183        // necessary drop it early.
184        //
185        // The last block of `in_memory_chain` is the lowest block number.
186        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
187            Some(lowest_memory_block) if lowest_memory_block <= end => {
188                let highest_memory_block =
189                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
190
191                // Database will for a time overlap with in-memory-chain blocks. In
192                // case of a re-org, it can mean that the database blocks are of a forked chain, and
193                // so, we should prioritize the in-memory overlapped blocks.
194                let in_memory_range =
195                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
196
197                // If requested range is in the middle of the in-memory range, remove the necessary
198                // lowest blocks
199                in_memory_chain.truncate(
200                    in_memory_chain
201                        .len()
202                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
203                );
204
205                let storage_range =
206                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
207
208                (Some((in_memory_chain, in_memory_range)), storage_range)
209            }
210            _ => {
211                // Drop the in-memory chain so we don't hold blocks in memory.
212                drop(in_memory_chain);
213
214                (None, Some(start..=end))
215            }
216        };
217
218        let mut items = Vec::with_capacity((end - start + 1) as usize);
219
220        if let Some(storage_range) = storage_range {
221            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
222            items.append(&mut db_items);
223
224            // The predicate was not met, if the number of items differs from the expected. So, we
225            // return what we have.
226            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
227                return Ok(items)
228            }
229        }
230
231        if let Some((in_memory_chain, in_memory_range)) = in_memory {
232            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
233                debug_assert!(num == block.number());
234                if let Some(item) = map_block_state_item(block, &mut predicate) {
235                    items.push(item);
236                } else {
237                    break
238                }
239            }
240        }
241
242        Ok(items)
243    }
244
245    /// This uses a given [`BlockState`] to initialize a state provider for that block.
246    fn block_state_provider_ref(
247        &self,
248        state: &BlockState<N::Primitives>,
249    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
250        let anchor_hash = state.anchor().hash;
251        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
252        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
253        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
254    }
255
256    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
257    ///
258    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
259    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
260    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
261    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
262        &self,
263        range: impl RangeBounds<BlockNumber>,
264        fetch_from_db: S,
265        fetch_from_block_state: M,
266    ) -> ProviderResult<Vec<R>>
267    where
268        S: FnOnce(
269            &DatabaseProviderRO<N::DB, N>,
270            RangeInclusive<TxNumber>,
271        ) -> ProviderResult<Vec<R>>,
272        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
273    {
274        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
275        let provider = &self.storage_provider;
276
277        // Get the last block number stored in the storage which does NOT overlap with in-memory
278        // chain.
279        let last_database_block_number = in_mem_chain
280            .last()
281            .map(|b| Ok(b.anchor().number))
282            .unwrap_or_else(|| provider.last_block_number())?;
283
284        // Get the next tx number for the last block stored in the storage, which marks the start of
285        // the in-memory state.
286        let last_block_body_index = provider
287            .block_body_indices(last_database_block_number)?
288            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
289        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
290
291        let (start, end) = self.convert_range_bounds(range, || {
292            in_mem_chain
293                .iter()
294                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
295                .sum::<u64>() +
296                last_block_body_index.last_tx_num()
297        });
298
299        if start > end {
300            return Ok(vec![])
301        }
302
303        let mut tx_range = start..=end;
304
305        // If the range is entirely before the first in-memory transaction number, fetch from
306        // storage
307        if *tx_range.end() < in_memory_tx_num {
308            return fetch_from_db(provider, tx_range);
309        }
310
311        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
312
313        // If the range spans storage and memory, get elements from storage first.
314        if *tx_range.start() < in_memory_tx_num {
315            // Determine the range that needs to be fetched from storage.
316            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
317
318            // Set the remaining transaction range for in-memory
319            tx_range = in_memory_tx_num..=*tx_range.end();
320
321            items.extend(fetch_from_db(provider, db_range)?);
322        }
323
324        // Iterate from the lowest block to the highest in-memory chain
325        for block_state in in_mem_chain.iter().rev() {
326            let block_tx_count =
327                block_state.block_ref().recovered_block().body().transactions().len();
328            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
329
330            // If the transaction range start is equal or higher than the next block first
331            // transaction, advance
332            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
333                in_memory_tx_num += block_tx_count as u64;
334                continue
335            }
336
337            // This should only be more than 0 once, in case of a partial range inside a block.
338            let skip = (tx_range.start() - in_memory_tx_num) as usize;
339
340            items.extend(fetch_from_block_state(
341                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
342                block_state,
343            )?);
344
345            in_memory_tx_num += block_tx_count as u64;
346
347            // Break if the range has been fully processed
348            if in_memory_tx_num > *tx_range.end() {
349                break
350            }
351
352            // Set updated range
353            tx_range = in_memory_tx_num..=*tx_range.end();
354        }
355
356        Ok(items)
357    }
358
359    /// Fetches data from either in-memory state or persistent storage by transaction
360    /// [`HashOrNumber`].
361    fn get_in_memory_or_storage_by_tx<S, M, R>(
362        &self,
363        id: HashOrNumber,
364        fetch_from_db: S,
365        fetch_from_block_state: M,
366    ) -> ProviderResult<Option<R>>
367    where
368        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
369        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
370    {
371        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
372        let provider = &self.storage_provider;
373
374        // Get the last block number stored in the database which does NOT overlap with in-memory
375        // chain.
376        let last_database_block_number = in_mem_chain
377            .last()
378            .map(|b| Ok(b.anchor().number))
379            .unwrap_or_else(|| provider.last_block_number())?;
380
381        // Get the next tx number for the last block stored in the database and consider it the
382        // first tx number of the in-memory state
383        let last_block_body_index = provider
384            .block_body_indices(last_database_block_number)?
385            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
386        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
387
388        // If the transaction number is less than the first in-memory transaction number, make a
389        // database lookup
390        if let HashOrNumber::Number(id) = id &&
391            id < in_memory_tx_num
392        {
393            return fetch_from_db(provider)
394        }
395
396        // Iterate from the lowest block to the highest
397        for block_state in in_mem_chain.iter().rev() {
398            let executed_block = block_state.block_ref();
399            let block = executed_block.recovered_block();
400
401            for tx_index in 0..block.body().transactions().len() {
402                match id {
403                    HashOrNumber::Hash(tx_hash) => {
404                        if tx_hash == *block.body().transactions()[tx_index].tx_hash() {
405                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
406                        }
407                    }
408                    HashOrNumber::Number(id) => {
409                        if id == in_memory_tx_num {
410                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
411                        }
412                    }
413                }
414
415                in_memory_tx_num += 1;
416            }
417        }
418
419        // Not found in-memory, so check database.
420        if let HashOrNumber::Hash(_) = id {
421            return fetch_from_db(provider)
422        }
423
424        Ok(None)
425    }
426
427    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
428    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
429        &self,
430        id: BlockHashOrNumber,
431        fetch_from_db: S,
432        fetch_from_block_state: M,
433    ) -> ProviderResult<R>
434    where
435        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
436        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
437    {
438        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
439            return fetch_from_block_state(block_state)
440        }
441        fetch_from_db(&self.storage_provider)
442    }
443
444    /// Consumes the provider and returns a state provider for the specific block hash.
445    pub(crate) fn into_state_provider_at_block_hash(
446        self,
447        block_hash: BlockHash,
448    ) -> ProviderResult<StateProviderBox> {
449        // Resolve block number and verify it's canonical before destructuring self
450        let block_number =
451            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
452        self.ensure_canonical_block(block_number)?;
453
454        let Self { storage_provider, head_block, .. } = self;
455        if let Some(Some(block_state)) =
456            head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
457        {
458            let anchor_hash = block_state.anchor().hash;
459            let block_number = storage_provider
460                .block_number(anchor_hash)?
461                .ok_or(ProviderError::BlockHashNotFound(anchor_hash))?;
462            let latest_historical = storage_provider.try_into_history_at_block(block_number)?;
463            return Ok(Box::new(block_state.state_provider(latest_historical)));
464        }
465        storage_provider.try_into_history_at_block(block_number)
466    }
467}
468
469impl<N: ProviderNodeTypes> ConsistentProvider<N> {
470    /// Ensures that the given block number is canonical (synced)
471    ///
472    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
473    /// out of range and would lead to invalid results, mainly during initial sync.
474    ///
475    /// Verifying the `block_number` would be expensive since we need to lookup sync table
476    /// Instead, we ensure that the `block_number` is within the range of the
477    /// [`Self::best_block_number`] which is updated when a block is synced.
478    #[inline]
479    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
480        let latest = self.best_block_number()?;
481        if block_number > latest {
482            Err(ProviderError::HeaderNotFound(block_number.into()))
483        } else {
484            Ok(())
485        }
486    }
487}
488
489impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
490    type Primitives = N::Primitives;
491}
492
493impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
494    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
495        self.storage_provider.static_file_provider()
496    }
497
498    fn get_static_file_writer(
499        &self,
500        block: BlockNumber,
501        segment: StaticFileSegment,
502    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
503        self.storage_provider.get_static_file_writer(block, segment)
504    }
505}
506
507impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
508    type Header = HeaderTy<N>;
509
510    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
511        self.get_in_memory_or_storage_by_block(
512            block_hash.into(),
513            |db_provider| db_provider.header(block_hash),
514            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
515        )
516    }
517
518    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
519        self.get_in_memory_or_storage_by_block(
520            num.into(),
521            |db_provider| db_provider.header_by_number(num),
522            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
523        )
524    }
525
526    fn headers_range(
527        &self,
528        range: impl RangeBounds<BlockNumber>,
529    ) -> ProviderResult<Vec<Self::Header>> {
530        self.get_in_memory_or_storage_by_block_range_while(
531            range,
532            |db_provider, range, _| db_provider.headers_range(range),
533            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
534            |_| true,
535        )
536    }
537
538    fn sealed_header(
539        &self,
540        number: BlockNumber,
541    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
542        self.get_in_memory_or_storage_by_block(
543            number.into(),
544            |db_provider| db_provider.sealed_header(number),
545            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
546        )
547    }
548
549    fn sealed_headers_range(
550        &self,
551        range: impl RangeBounds<BlockNumber>,
552    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
553        self.get_in_memory_or_storage_by_block_range_while(
554            range,
555            |db_provider, range, _| db_provider.sealed_headers_range(range),
556            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
557            |_| true,
558        )
559    }
560
561    fn sealed_headers_while(
562        &self,
563        range: impl RangeBounds<BlockNumber>,
564        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
565    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
566        self.get_in_memory_or_storage_by_block_range_while(
567            range,
568            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
569            |block_state, predicate| {
570                let header = block_state.block_ref().recovered_block().sealed_header();
571                predicate(header).then(|| header.clone())
572            },
573            predicate,
574        )
575    }
576}
577
578impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
579    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
580        self.get_in_memory_or_storage_by_block(
581            number.into(),
582            |db_provider| db_provider.block_hash(number),
583            |block_state| Ok(Some(block_state.hash())),
584        )
585    }
586
587    fn canonical_hashes_range(
588        &self,
589        start: BlockNumber,
590        end: BlockNumber,
591    ) -> ProviderResult<Vec<B256>> {
592        self.get_in_memory_or_storage_by_block_range_while(
593            start..end,
594            |db_provider, inclusive_range, _| {
595                db_provider
596                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
597            },
598            |block_state, _| Some(block_state.hash()),
599            |_| true,
600        )
601    }
602}
603
604impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
605    fn chain_info(&self) -> ProviderResult<ChainInfo> {
606        let best_number = self.best_block_number()?;
607        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
608    }
609
610    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
611        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
612    }
613
614    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
615        self.storage_provider.last_block_number()
616    }
617
618    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
619        self.get_in_memory_or_storage_by_block(
620            hash.into(),
621            |db_provider| db_provider.block_number(hash),
622            |block_state| Ok(Some(block_state.number())),
623        )
624    }
625}
626
627impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
628    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
629        Ok(self.canonical_in_memory_state.pending_block_num_hash())
630    }
631
632    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
633        Ok(self.canonical_in_memory_state.get_safe_num_hash())
634    }
635
636    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
637        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
638    }
639}
640
641impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
642    type Block = BlockTy<N>;
643
644    fn find_block_by_hash(
645        &self,
646        hash: B256,
647        source: BlockSource,
648    ) -> ProviderResult<Option<Self::Block>> {
649        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
650            let Some(block) = self.get_in_memory_or_storage_by_block(
651                hash.into(),
652                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
653                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
654            )?
655        {
656            return Ok(Some(block))
657        }
658
659        if matches!(source, BlockSource::Pending | BlockSource::Any) {
660            return Ok(self
661                .canonical_in_memory_state
662                .pending_block()
663                .filter(|b| b.hash() == hash)
664                .map(|b| b.into_block()))
665        }
666
667        Ok(None)
668    }
669
670    fn find_sealed_or_recovered_block(
671        &self,
672        hash: B256,
673        source: BlockSource,
674    ) -> ProviderResult<Option<SealedOrRecoveredBlock<Self::Block>>> {
675        if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
676            let Some(block) = self.get_in_memory_or_storage_by_block(
677                hash.into(),
678                |db_provider| {
679                    db_provider.find_sealed_or_recovered_block(hash, BlockSource::Canonical)
680                },
681                |block_state| {
682                    Ok(Some(SealedOrRecoveredBlock::recovered_arc(Arc::clone(
683                        &block_state.block_ref().recovered_block,
684                    ))))
685                },
686            )?
687        {
688            return Ok(Some(block))
689        }
690
691        if matches!(source, BlockSource::Pending | BlockSource::Any) &&
692            let Some(block_state) = self.canonical_in_memory_state.pending_state()
693        {
694            let recovered_block = Arc::clone(&block_state.block_ref().recovered_block);
695            if recovered_block.hash() == hash {
696                return Ok(Some(SealedOrRecoveredBlock::recovered_arc(recovered_block)))
697            }
698        }
699
700        Ok(None)
701    }
702
703    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
704        self.get_in_memory_or_storage_by_block(
705            id,
706            |db_provider| db_provider.block(id),
707            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
708        )
709    }
710
711    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
712        Ok(self.canonical_in_memory_state.pending_recovered_block())
713    }
714
715    fn pending_block_and_receipts(
716        &self,
717    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
718        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
719    }
720
721    /// Returns the block with senders with matching number or hash from database.
722    ///
723    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
724    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
725    ///
726    /// Returns `None` if block is not found.
727    fn recovered_block(
728        &self,
729        id: BlockHashOrNumber,
730        transaction_kind: TransactionVariant,
731    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
732        self.get_in_memory_or_storage_by_block(
733            id,
734            |db_provider| db_provider.recovered_block(id, transaction_kind),
735            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
736        )
737    }
738
739    fn sealed_block_with_senders(
740        &self,
741        id: BlockHashOrNumber,
742        transaction_kind: TransactionVariant,
743    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
744        self.get_in_memory_or_storage_by_block(
745            id,
746            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
747            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
748        )
749    }
750
751    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
752        self.get_in_memory_or_storage_by_block_range_while(
753            range,
754            |db_provider, range, _| db_provider.block_range(range),
755            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
756            |_| true,
757        )
758    }
759
760    fn block_with_senders_range(
761        &self,
762        range: RangeInclusive<BlockNumber>,
763    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
764        self.get_in_memory_or_storage_by_block_range_while(
765            range,
766            |db_provider, range, _| db_provider.block_with_senders_range(range),
767            |block_state, _| Some(block_state.block().recovered_block().clone()),
768            |_| true,
769        )
770    }
771
772    fn recovered_block_range(
773        &self,
774        range: RangeInclusive<BlockNumber>,
775    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
776        self.get_in_memory_or_storage_by_block_range_while(
777            range,
778            |db_provider, range, _| db_provider.recovered_block_range(range),
779            |block_state, _| Some(block_state.block().recovered_block().clone()),
780            |_| true,
781        )
782    }
783
784    fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
785        self.get_in_memory_or_storage_by_tx(
786            id.into(),
787            |db_provider| db_provider.block_by_transaction_id(id),
788            |_, _, block_state| Ok(Some(block_state.number())),
789        )
790    }
791}
792
793impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
794    type Transaction = TxTy<N>;
795
796    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
797        self.get_in_memory_or_storage_by_tx(
798            tx_hash.into(),
799            |db_provider| db_provider.transaction_id(tx_hash),
800            |_, tx_number, _| Ok(Some(tx_number)),
801        )
802    }
803
804    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
805        self.get_in_memory_or_storage_by_tx(
806            id.into(),
807            |provider| provider.transaction_by_id(id),
808            |tx_index, _, block_state| {
809                Ok(block_state
810                    .block_ref()
811                    .recovered_block()
812                    .body()
813                    .transactions()
814                    .get(tx_index)
815                    .cloned())
816            },
817        )
818    }
819
820    fn transaction_by_id_unhashed(
821        &self,
822        id: TxNumber,
823    ) -> ProviderResult<Option<Self::Transaction>> {
824        self.get_in_memory_or_storage_by_tx(
825            id.into(),
826            |provider| provider.transaction_by_id_unhashed(id),
827            |tx_index, _, block_state| {
828                Ok(block_state
829                    .block_ref()
830                    .recovered_block()
831                    .body()
832                    .transactions()
833                    .get(tx_index)
834                    .cloned())
835            },
836        )
837    }
838
839    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
840        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
841            return Ok(Some(tx))
842        }
843
844        self.storage_provider.transaction_by_hash(hash)
845    }
846
847    fn transaction_by_hash_with_meta(
848        &self,
849        tx_hash: TxHash,
850    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
851        if let Some((tx, meta)) =
852            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
853        {
854            return Ok(Some((tx, meta)))
855        }
856
857        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
858    }
859
860    fn transactions_by_block(
861        &self,
862        id: BlockHashOrNumber,
863    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
864        self.get_in_memory_or_storage_by_block(
865            id,
866            |provider| provider.transactions_by_block(id),
867            |block_state| {
868                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
869            },
870        )
871    }
872
873    fn transactions_by_block_range(
874        &self,
875        range: impl RangeBounds<BlockNumber>,
876    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
877        self.get_in_memory_or_storage_by_block_range_while(
878            range,
879            |db_provider, range, _| db_provider.transactions_by_block_range(range),
880            |block_state, _| {
881                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
882            },
883            |_| true,
884        )
885    }
886
887    fn transactions_by_tx_range(
888        &self,
889        range: impl RangeBounds<TxNumber>,
890    ) -> ProviderResult<Vec<Self::Transaction>> {
891        self.get_in_memory_or_storage_by_tx_range(
892            range,
893            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
894            |index_range, block_state| {
895                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
896                    .to_vec())
897            },
898        )
899    }
900
901    fn senders_by_tx_range(
902        &self,
903        range: impl RangeBounds<TxNumber>,
904    ) -> ProviderResult<Vec<Address>> {
905        self.get_in_memory_or_storage_by_tx_range(
906            range,
907            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
908            |index_range, block_state| {
909                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
910            },
911        )
912    }
913
914    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
915        self.get_in_memory_or_storage_by_tx(
916            id.into(),
917            |provider| provider.transaction_sender(id),
918            |tx_index, _, block_state| {
919                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
920            },
921        )
922    }
923}
924
925impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
926    type Receipt = ReceiptTy<N>;
927
928    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
929        self.get_in_memory_or_storage_by_tx(
930            id.into(),
931            |provider| provider.receipt(id),
932            |tx_index, _, block_state| {
933                Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
934            },
935        )
936    }
937
938    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
939        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
940            let executed_block = block_state.block_ref();
941            let block = executed_block.recovered_block();
942            let receipts = block_state.executed_block_receipts_ref();
943
944            // assuming 1:1 correspondence between transactions and receipts
945            debug_assert_eq!(
946                block.body().transactions().len(),
947                receipts.len(),
948                "Mismatch between transaction and receipt count"
949            );
950
951            if let Some(tx_index) =
952                block.body().transactions_iter().position(|tx| *tx.tx_hash() == hash)
953            {
954                // safe to use tx_index for receipts due to 1:1 correspondence
955                return Ok(receipts.get(tx_index).cloned());
956            }
957        }
958
959        self.storage_provider.receipt_by_hash(hash)
960    }
961
962    fn receipts_by_block(
963        &self,
964        block: BlockHashOrNumber,
965    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
966        self.get_in_memory_or_storage_by_block(
967            block,
968            |db_provider| db_provider.receipts_by_block(block),
969            |block_state| Ok(Some(block_state.executed_block_receipts())),
970        )
971    }
972
973    fn receipts_by_tx_range(
974        &self,
975        range: impl RangeBounds<TxNumber>,
976    ) -> ProviderResult<Vec<Self::Receipt>> {
977        self.get_in_memory_or_storage_by_tx_range(
978            range,
979            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
980            |index_range, block_state| {
981                Ok(block_state.executed_block_receipts_ref()[index_range].to_vec())
982            },
983        )
984    }
985
986    fn receipts_by_block_range(
987        &self,
988        block_range: RangeInclusive<BlockNumber>,
989    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
990        self.storage_provider.receipts_by_block_range(block_range)
991    }
992}
993
994impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
995    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
996        match block {
997            BlockId::Hash(rpc_block_hash) => {
998                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
999                if receipts.is_none() &&
1000                    !rpc_block_hash.require_canonical.unwrap_or(false) &&
1001                    let Some(state) = self
1002                        .head_block
1003                        .as_ref()
1004                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1005                {
1006                    receipts = Some(state.executed_block_receipts());
1007                }
1008                Ok(receipts)
1009            }
1010            BlockId::Number(num_tag) => match num_tag {
1011                BlockNumberOrTag::Pending => Ok(self
1012                    .canonical_in_memory_state
1013                    .pending_state()
1014                    .map(|block_state| block_state.executed_block_receipts())),
1015                _ => {
1016                    if let Some(num) = self.convert_block_number(num_tag)? {
1017                        self.receipts_by_block(num.into())
1018                    } else {
1019                        Ok(None)
1020                    }
1021                }
1022            },
1023        }
1024    }
1025}
1026
1027impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1028    fn block_body_indices(
1029        &self,
1030        number: BlockNumber,
1031    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1032        self.get_in_memory_or_storage_by_block(
1033            number.into(),
1034            |db_provider| db_provider.block_body_indices(number),
1035            |block_state| {
1036                // Find the last block indices on database
1037                let last_storage_block_number = block_state.anchor().number;
1038                let mut stored_indices = self
1039                    .storage_provider
1040                    .block_body_indices(last_storage_block_number)?
1041                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1042
1043                // Prepare our block indices
1044                stored_indices.first_tx_num = stored_indices.next_tx_num();
1045                stored_indices.tx_count = 0;
1046
1047                // Iterate from the lowest block in memory until our target block
1048                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1049                    let block_tx_count =
1050                        state.block_ref().recovered_block().body().transactions().len() as u64;
1051                    if state.block_ref().recovered_block().number() == number {
1052                        stored_indices.tx_count = block_tx_count;
1053                    } else {
1054                        stored_indices.first_tx_num += block_tx_count;
1055                    }
1056                }
1057
1058                Ok(Some(stored_indices))
1059            },
1060        )
1061    }
1062
1063    fn block_body_indices_range(
1064        &self,
1065        range: RangeInclusive<BlockNumber>,
1066    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1067        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1068    }
1069}
1070
1071impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1072    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1073        self.storage_provider.get_stage_checkpoint(id)
1074    }
1075
1076    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1077        self.storage_provider.get_stage_checkpoint_progress(id)
1078    }
1079
1080    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1081        self.storage_provider.get_all_checkpoints()
1082    }
1083}
1084
1085impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1086    fn get_prune_checkpoint(
1087        &self,
1088        segment: PruneSegment,
1089    ) -> ProviderResult<Option<PruneCheckpoint>> {
1090        self.storage_provider.get_prune_checkpoint(segment)
1091    }
1092
1093    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1094        self.storage_provider.get_prune_checkpoints()
1095    }
1096}
1097
1098impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1099    type ChainSpec = N::ChainSpec;
1100
1101    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1102        ChainSpecProvider::chain_spec(&self.storage_provider)
1103    }
1104}
1105
1106impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1107    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1108        match id {
1109            BlockId::Number(num) => self.block_by_number_or_tag(num),
1110            BlockId::Hash(hash) => {
1111                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1112                // so not at the provider level?
1113                // if we decide to do this at a higher level, then we can make this an automatic
1114                // trait impl
1115                if Some(true) == hash.require_canonical {
1116                    // check the database, canonical blocks are only stored in the database
1117                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1118                } else {
1119                    self.block_by_hash(hash.block_hash)
1120                }
1121            }
1122        }
1123    }
1124
1125    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1126        Ok(match id {
1127            BlockNumberOrTag::Latest => {
1128                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1129            }
1130            BlockNumberOrTag::Finalized => {
1131                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1132            }
1133            BlockNumberOrTag::Safe => {
1134                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1135            }
1136            BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1137            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1138
1139            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1140        })
1141    }
1142
1143    fn sealed_header_by_number_or_tag(
1144        &self,
1145        id: BlockNumberOrTag,
1146    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1147        match id {
1148            BlockNumberOrTag::Latest => {
1149                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1150            }
1151            BlockNumberOrTag::Finalized => {
1152                Ok(self.canonical_in_memory_state.get_finalized_header())
1153            }
1154            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1155            BlockNumberOrTag::Earliest => self
1156                .header_by_number(self.earliest_block_number()?)?
1157                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1158            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1159            BlockNumberOrTag::Number(num) => self
1160                .header_by_number(num)?
1161                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1162        }
1163    }
1164
1165    fn sealed_header_by_id(
1166        &self,
1167        id: BlockId,
1168    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1169        Ok(match id {
1170            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1171            BlockId::Hash(hash) => self
1172                .header(hash.block_hash)?
1173                .map(|header| SealedHeader::new(header, hash.block_hash)),
1174        })
1175    }
1176
1177    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1178        Ok(match id {
1179            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1180            BlockId::Hash(hash) => self.header(hash.block_hash)?,
1181        })
1182    }
1183}
1184
1185impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1186    fn storage_changeset(
1187        &self,
1188        block_number: BlockNumber,
1189    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1190        if let Some(state) =
1191            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1192        {
1193            let changesets = state
1194                .block()
1195                .execution_output
1196                .state
1197                .reverts
1198                .to_plain_state_reverts()
1199                .storage
1200                .into_iter()
1201                .flatten()
1202                .flat_map(|revert: PlainStorageRevert| {
1203                    revert.storage_revert.into_iter().map(move |(key, value)| {
1204                        let plain_key = B256::from(key.to_be_bytes());
1205                        (
1206                            BlockNumberAddress((block_number, revert.address)),
1207                            StorageEntry { key: plain_key, value: value.to_previous_value() },
1208                        )
1209                    })
1210                })
1211                .collect();
1212            Ok(changesets)
1213        } else {
1214            // Perform checks on whether or not changesets exist for the block.
1215
1216            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1217            let storage_history_exists = self
1218                .storage_provider
1219                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1220                .and_then(|checkpoint| {
1221                    // return true if the block number is ahead of the prune checkpoint.
1222                    //
1223                    // The checkpoint stores the highest pruned block number, so we should make
1224                    // sure the block_number is strictly greater.
1225                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1226                })
1227                .unwrap_or(true);
1228
1229            if !storage_history_exists {
1230                return Err(ProviderError::StateAtBlockPruned(block_number))
1231            }
1232
1233            self.storage_provider.storage_changeset(block_number)
1234        }
1235    }
1236
1237    fn get_storage_before_block(
1238        &self,
1239        block_number: BlockNumber,
1240        address: Address,
1241        storage_key: B256,
1242    ) -> ProviderResult<Option<StorageEntry>> {
1243        if let Some(state) =
1244            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1245        {
1246            let changeset = state
1247                .block_ref()
1248                .execution_output
1249                .state
1250                .reverts
1251                .to_plain_state_reverts()
1252                .storage
1253                .into_iter()
1254                .flatten()
1255                .find_map(|revert: PlainStorageRevert| {
1256                    if revert.address != address {
1257                        return None
1258                    }
1259                    revert.storage_revert.into_iter().find_map(|(key, value)| {
1260                        let plain_key = B256::from(key.to_be_bytes());
1261                        (plain_key == storage_key).then(|| StorageEntry {
1262                            key: plain_key,
1263                            value: value.to_previous_value(),
1264                        })
1265                    })
1266                });
1267            Ok(changeset)
1268        } else {
1269            let storage_history_exists = self
1270                .storage_provider
1271                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1272                .and_then(|checkpoint| {
1273                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1274                })
1275                .unwrap_or(true);
1276
1277            if !storage_history_exists {
1278                return Err(ProviderError::StateAtBlockPruned(block_number))
1279            }
1280
1281            self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1282        }
1283    }
1284
1285    fn storage_changesets_range(
1286        &self,
1287        range: impl RangeBounds<BlockNumber>,
1288    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1289        let range = to_range(range);
1290        let mut changesets = Vec::new();
1291        let database_start = range.start;
1292        let mut database_end = range.end;
1293
1294        if let Some(head_block) = &self.head_block {
1295            database_end = head_block.anchor().number;
1296
1297            for state in head_block.chain() {
1298                let block_changesets = state
1299                    .block_ref()
1300                    .execution_output
1301                    .state
1302                    .reverts
1303                    .to_plain_state_reverts()
1304                    .storage
1305                    .into_iter()
1306                    .flatten()
1307                    .flat_map(|revert: PlainStorageRevert| {
1308                        revert.storage_revert.into_iter().map(move |(key, value)| {
1309                            let plain_key = B256::from(key.to_be_bytes());
1310                            (
1311                                BlockNumberAddress((state.number(), revert.address)),
1312                                StorageEntry { key: plain_key, value: value.to_previous_value() },
1313                            )
1314                        })
1315                    });
1316
1317                changesets.extend(block_changesets);
1318            }
1319        }
1320
1321        if database_start < database_end {
1322            let storage_history_exists = self
1323                .storage_provider
1324                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1325                .and_then(|checkpoint| {
1326                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1327                })
1328                .unwrap_or(true);
1329
1330            if !storage_history_exists {
1331                return Err(ProviderError::StateAtBlockPruned(database_start))
1332            }
1333
1334            let db_changesets = self
1335                .storage_provider
1336                .storage_changesets_range(database_start..=database_end - 1)?;
1337            changesets.extend(db_changesets);
1338        }
1339
1340        changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1341
1342        Ok(changesets)
1343    }
1344}
1345
1346impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1347    fn account_block_changeset(
1348        &self,
1349        block_number: BlockNumber,
1350    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1351        if let Some(state) =
1352            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1353        {
1354            let changesets = state
1355                .block_ref()
1356                .execution_output
1357                .state
1358                .reverts
1359                .to_plain_state_reverts()
1360                .accounts
1361                .into_iter()
1362                .flatten()
1363                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1364                .collect();
1365            Ok(changesets)
1366        } else {
1367            // Perform checks on whether or not changesets exist for the block.
1368
1369            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1370            let account_history_exists = self
1371                .storage_provider
1372                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1373                .and_then(|checkpoint| {
1374                    // return true if the block number is ahead of the prune checkpoint.
1375                    //
1376                    // The checkpoint stores the highest pruned block number, so we should make
1377                    // sure the block_number is strictly greater.
1378                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1379                })
1380                .unwrap_or(true);
1381
1382            if !account_history_exists {
1383                return Err(ProviderError::StateAtBlockPruned(block_number))
1384            }
1385
1386            self.storage_provider.account_block_changeset(block_number)
1387        }
1388    }
1389
1390    fn get_account_before_block(
1391        &self,
1392        block_number: BlockNumber,
1393        address: Address,
1394    ) -> ProviderResult<Option<AccountBeforeTx>> {
1395        if let Some(state) =
1396            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1397        {
1398            // Search in-memory state for the account changeset
1399            let changeset = state
1400                .block_ref()
1401                .execution_output
1402                .state
1403                .reverts
1404                .to_plain_state_reverts()
1405                .accounts
1406                .into_iter()
1407                .flatten()
1408                .find(|(addr, _)| addr == &address)
1409                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1410            Ok(changeset)
1411        } else {
1412            // Perform checks on whether or not changesets exist for the block.
1413            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1414            let account_history_exists = self
1415                .storage_provider
1416                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1417                .and_then(|checkpoint| {
1418                    // return true if the block number is ahead of the prune checkpoint.
1419                    //
1420                    // The checkpoint stores the highest pruned block number, so we should make
1421                    // sure the block_number is strictly greater.
1422                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1423                })
1424                .unwrap_or(true);
1425
1426            if !account_history_exists {
1427                return Err(ProviderError::StateAtBlockPruned(block_number))
1428            }
1429
1430            // Delegate to the storage provider for database lookups
1431            self.storage_provider.get_account_before_block(block_number, address)
1432        }
1433    }
1434
1435    fn account_changesets_range(
1436        &self,
1437        range: impl core::ops::RangeBounds<BlockNumber>,
1438    ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1439        let range = to_range(range);
1440        let mut changesets = Vec::new();
1441        let database_start = range.start;
1442        let mut database_end = range.end;
1443
1444        // Check which blocks in the range are in memory
1445        if let Some(head_block) = &self.head_block {
1446            // the anchor is the end of the db range
1447            database_end = head_block.anchor().number;
1448
1449            for state in head_block.chain() {
1450                // found block in memory, collect its changesets
1451                let block_changesets = state
1452                    .block_ref()
1453                    .execution_output
1454                    .state
1455                    .reverts
1456                    .to_plain_state_reverts()
1457                    .accounts
1458                    .into_iter()
1459                    .flatten()
1460                    .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1461
1462                for changeset in block_changesets {
1463                    changesets.push((state.number(), changeset));
1464                }
1465            }
1466        }
1467
1468        // get changesets from database for remaining blocks
1469        if database_start < database_end {
1470            // check if account history is pruned for these blocks
1471            let account_history_exists = self
1472                .storage_provider
1473                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1474                .and_then(|checkpoint| {
1475                    checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1476                })
1477                .unwrap_or(true);
1478
1479            if !account_history_exists {
1480                return Err(ProviderError::StateAtBlockPruned(database_start))
1481            }
1482
1483            let db_changesets =
1484                self.storage_provider.account_changesets_range(database_start..database_end)?;
1485            changesets.extend(db_changesets);
1486        }
1487
1488        changesets.sort_by_key(|(block_num, _)| *block_num);
1489
1490        Ok(changesets)
1491    }
1492}
1493
1494impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1495    /// Get basic account information.
1496    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1497        // use latest state provider
1498        let state_provider = self.latest_ref()?;
1499        state_provider.basic_account(address)
1500    }
1501}
1502
1503impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1504    type Receipt = ReceiptTy<N>;
1505
1506    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1507    ///
1508    /// If data for the block does not exist, this will return [`None`].
1509    ///
1510    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1511    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1512    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1513    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1514    /// first place.
1515    fn get_state(
1516        &self,
1517        block: BlockNumber,
1518    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1519        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1520            let state = state.block_ref().execution_outcome().clone();
1521            Ok(Some(ExecutionOutcome::from((state, block))))
1522        } else {
1523            self.storage_provider.get_state(block)
1524        }
1525    }
1526}
1527
1528#[cfg(test)]
1529mod tests {
1530    use crate::{
1531        providers::blockchain_provider::BlockchainProvider,
1532        test_utils::create_test_provider_factory, BlockWriter,
1533    };
1534    use alloy_eips::BlockHashOrNumber;
1535    use alloy_primitives::B256;
1536    use itertools::Itertools;
1537    use rand::Rng;
1538    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1539    use reth_db_api::models::AccountBeforeTx;
1540    use reth_ethereum_primitives::Block;
1541    use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1542    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1543    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader, StateReader};
1544    use reth_testing_utils::generators::{
1545        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1546    };
1547    use revm_database::BundleState;
1548    use std::{
1549        ops::{Bound, Range, RangeBounds},
1550        sync::Arc,
1551    };
1552
1553    const TEST_BLOCKS_COUNT: usize = 5;
1554
1555    fn random_blocks(
1556        rng: &mut impl Rng,
1557        database_blocks: usize,
1558        in_memory_blocks: usize,
1559        requests_count: Option<Range<u8>>,
1560        withdrawals_count: Option<Range<u8>>,
1561        tx_count: impl RangeBounds<u8>,
1562    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1563        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1564
1565        let tx_start = match tx_count.start_bound() {
1566            Bound::Included(&n) | Bound::Excluded(&n) => n,
1567            Bound::Unbounded => u8::MIN,
1568        };
1569        let tx_end = match tx_count.end_bound() {
1570            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1571            Bound::Unbounded => u8::MAX,
1572        };
1573
1574        let blocks = random_block_range(
1575            rng,
1576            0..=block_range,
1577            BlockRangeParams {
1578                parent: Some(B256::ZERO),
1579                tx_count: tx_start..tx_end,
1580                requests_count,
1581                withdrawals_count,
1582            },
1583        );
1584        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1585        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1586    }
1587
1588    #[test]
1589    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1590        // Initialize random number generator and provider factory
1591        let mut rng = generators::rng();
1592        let factory = create_test_provider_factory();
1593
1594        // Generate 10 random blocks and split into database and in-memory blocks
1595        let blocks = random_block_range(
1596            &mut rng,
1597            0..=10,
1598            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1599        );
1600        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1601
1602        // Insert first 5 blocks into the database
1603        let provider_rw = factory.provider_rw()?;
1604        for block in database_blocks {
1605            provider_rw.insert_block(
1606                &block.clone().try_recover().expect("failed to seal block with senders"),
1607            )?;
1608        }
1609        provider_rw.commit()?;
1610
1611        // Create a new provider
1612        let provider = BlockchainProvider::new(factory)?;
1613        let consistent_provider = provider.consistent_provider()?;
1614
1615        // Useful blocks
1616        let first_db_block = database_blocks.first().unwrap();
1617        let first_in_mem_block = in_memory_blocks.first().unwrap();
1618        let last_in_mem_block = in_memory_blocks.last().unwrap();
1619
1620        // No block in memory before setting in memory state
1621        assert_eq!(
1622            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1623            None
1624        );
1625        assert!(consistent_provider
1626            .find_sealed_or_recovered_block(first_in_mem_block.hash(), BlockSource::Any)?
1627            .is_none());
1628        assert_eq!(
1629            consistent_provider
1630                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1631            None
1632        );
1633        // No pending block in memory
1634        assert_eq!(
1635            consistent_provider
1636                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1637            None
1638        );
1639
1640        // Insert first block into the in-memory state
1641        let in_memory_block_senders =
1642            first_in_mem_block.senders().expect("failed to recover senders");
1643        let chain = NewCanonicalChain::Commit {
1644            new: vec![ExecutedBlock {
1645                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1646                    first_in_mem_block.clone(),
1647                    in_memory_block_senders,
1648                )),
1649                ..Default::default()
1650            }],
1651        };
1652        consistent_provider.canonical_in_memory_state.update_chain(chain);
1653        let consistent_provider = provider.consistent_provider()?;
1654
1655        // Now the block should be found in memory
1656        assert_eq!(
1657            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1658            Some(first_in_mem_block.clone().into_block())
1659        );
1660        let block = consistent_provider
1661            .find_sealed_or_recovered_block(first_in_mem_block.hash(), BlockSource::Any)?
1662            .expect("in-memory block should be found");
1663        assert_eq!(block.sealed_block(), first_in_mem_block);
1664        assert!(block.recovered_block().is_some());
1665
1666        assert_eq!(
1667            consistent_provider
1668                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1669            Some(first_in_mem_block.clone().into_block())
1670        );
1671        let block = consistent_provider
1672            .find_sealed_or_recovered_block(first_in_mem_block.hash(), BlockSource::Canonical)?
1673            .expect("canonical in-memory block should be found");
1674        assert_eq!(block.sealed_block(), first_in_mem_block);
1675        assert!(block.recovered_block().is_some());
1676
1677        // Find the first block in database by hash
1678        assert_eq!(
1679            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1680            Some(first_db_block.clone().into_block())
1681        );
1682        let block = consistent_provider
1683            .find_sealed_or_recovered_block(first_db_block.hash(), BlockSource::Any)?
1684            .expect("database block should be found");
1685        assert_eq!(block.sealed_block(), first_db_block);
1686        assert!(block.recovered_block().is_none());
1687
1688        assert_eq!(
1689            consistent_provider
1690                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1691            Some(first_db_block.clone().into_block())
1692        );
1693        let block = consistent_provider
1694            .find_sealed_or_recovered_block(first_db_block.hash(), BlockSource::Canonical)?
1695            .expect("canonical database block should be found");
1696        assert_eq!(block.sealed_block(), first_db_block);
1697        assert!(block.recovered_block().is_none());
1698
1699        // No pending block in database
1700        assert_eq!(
1701            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1702            None
1703        );
1704        assert!(consistent_provider
1705            .find_sealed_or_recovered_block(first_db_block.hash(), BlockSource::Pending)?
1706            .is_none());
1707
1708        // Insert the last block into the pending state
1709        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1710            recovered_block: Arc::new(RecoveredBlock::new_sealed(
1711                last_in_mem_block.clone(),
1712                Default::default(),
1713            )),
1714            ..Default::default()
1715        });
1716
1717        // Now the last block should be found in memory
1718        assert_eq!(
1719            consistent_provider
1720                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1721            Some(last_in_mem_block.clone_block())
1722        );
1723        let block = consistent_provider
1724            .find_sealed_or_recovered_block(last_in_mem_block.hash(), BlockSource::Pending)?
1725            .expect("pending block should be found");
1726        assert_eq!(block.sealed_block(), last_in_mem_block);
1727        assert!(block.recovered_block().is_some());
1728
1729        Ok(())
1730    }
1731
1732    #[test]
1733    fn test_block_reader_block() -> eyre::Result<()> {
1734        // Initialize random number generator and provider factory
1735        let mut rng = generators::rng();
1736        let factory = create_test_provider_factory();
1737
1738        // Generate 10 random blocks and split into database and in-memory blocks
1739        let blocks = random_block_range(
1740            &mut rng,
1741            0..=10,
1742            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1743        );
1744        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1745
1746        // Insert first 5 blocks into the database
1747        let provider_rw = factory.provider_rw()?;
1748        for block in database_blocks {
1749            provider_rw.insert_block(
1750                &block.clone().try_recover().expect("failed to seal block with senders"),
1751            )?;
1752        }
1753        provider_rw.commit()?;
1754
1755        // Create a new provider
1756        let provider = BlockchainProvider::new(factory)?;
1757        let consistent_provider = provider.consistent_provider()?;
1758
1759        // First in memory block
1760        let first_in_mem_block = in_memory_blocks.first().unwrap();
1761        // First database block
1762        let first_db_block = database_blocks.first().unwrap();
1763
1764        // First in memory block should not be found yet as not integrated to the in-memory state
1765        assert_eq!(
1766            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1767            None
1768        );
1769        assert_eq!(
1770            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1771            None
1772        );
1773
1774        // Insert first block into the in-memory state
1775        let in_memory_block_senders =
1776            first_in_mem_block.senders().expect("failed to recover senders");
1777        let chain = NewCanonicalChain::Commit {
1778            new: vec![ExecutedBlock {
1779                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1780                    first_in_mem_block.clone(),
1781                    in_memory_block_senders,
1782                )),
1783                ..Default::default()
1784            }],
1785        };
1786        consistent_provider.canonical_in_memory_state.update_chain(chain);
1787
1788        let consistent_provider = provider.consistent_provider()?;
1789
1790        // First in memory block should be found
1791        assert_eq!(
1792            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1793            Some(first_in_mem_block.clone().into_block())
1794        );
1795        assert_eq!(
1796            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1797            Some(first_in_mem_block.clone().into_block())
1798        );
1799
1800        // First database block should be found
1801        assert_eq!(
1802            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1803            Some(first_db_block.clone().into_block())
1804        );
1805        assert_eq!(
1806            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1807            Some(first_db_block.clone().into_block())
1808        );
1809
1810        Ok(())
1811    }
1812
1813    #[test]
1814    fn test_changeset_reader() -> eyre::Result<()> {
1815        let mut rng = generators::rng();
1816
1817        let (database_blocks, in_memory_blocks) =
1818            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1819
1820        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1821        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1822        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1823
1824        let accounts = random_eoa_accounts(&mut rng, 2);
1825
1826        let (database_changesets, database_state) = random_changeset_range(
1827            &mut rng,
1828            &database_blocks,
1829            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1830            0..0,
1831            0..0,
1832        );
1833        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1834            &mut rng,
1835            &in_memory_blocks,
1836            database_state
1837                .iter()
1838                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1839            0..0,
1840            0..0,
1841        );
1842
1843        let factory = create_test_provider_factory();
1844
1845        let provider_rw = factory.provider_rw()?;
1846        provider_rw.append_blocks_with_state(
1847            database_blocks
1848                .into_iter()
1849                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1850                .collect(),
1851            &ExecutionOutcome {
1852                bundle: BundleState::new(
1853                    database_state.into_iter().map(|(address, (account, _))| {
1854                        (address, None, Some(account.into()), Default::default())
1855                    }),
1856                    database_changesets.iter().map(|block_changesets| {
1857                        block_changesets.iter().map(|(address, account, _)| {
1858                            (*address, Some(Some((*account).into())), [])
1859                        })
1860                    }),
1861                    Vec::new(),
1862                ),
1863                first_block: first_database_block,
1864                ..Default::default()
1865            },
1866            Default::default(),
1867        )?;
1868        provider_rw.commit()?;
1869
1870        let provider = BlockchainProvider::new(factory)?;
1871
1872        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1873        let chain = NewCanonicalChain::Commit {
1874            new: vec![in_memory_blocks
1875                .first()
1876                .map(|block| {
1877                    let senders = block.senders().expect("failed to recover senders");
1878                    ExecutedBlock {
1879                        recovered_block: Arc::new(RecoveredBlock::new_sealed(
1880                            block.clone(),
1881                            senders,
1882                        )),
1883                        execution_output: Arc::new(BlockExecutionOutput {
1884                            state: BundleState::new(
1885                                in_memory_state.into_iter().map(|(address, (account, _))| {
1886                                    (address, None, Some(account.into()), Default::default())
1887                                }),
1888                                [in_memory_changesets.iter().map(|(address, account, _)| {
1889                                    (*address, Some(Some((*account).into())), Vec::new())
1890                                })],
1891                                [],
1892                            ),
1893                            result: BlockExecutionResult {
1894                                receipts: Default::default(),
1895                                requests: Default::default(),
1896                                gas_used: 0,
1897                                blob_gas_used: 0,
1898                            },
1899                        }),
1900                        ..Default::default()
1901                    }
1902                })
1903                .unwrap()],
1904        };
1905        provider.canonical_in_memory_state.update_chain(chain);
1906
1907        let consistent_provider = provider.consistent_provider()?;
1908
1909        assert_eq!(
1910            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1911            database_changesets
1912                .into_iter()
1913                .next_back()
1914                .unwrap()
1915                .into_iter()
1916                .sorted_by_key(|(address, _, _)| *address)
1917                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1918                .collect::<Vec<_>>()
1919        );
1920        assert_eq!(
1921            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1922            in_memory_changesets
1923                .into_iter()
1924                .sorted_by_key(|(address, _, _)| *address)
1925                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1926                .collect::<Vec<_>>()
1927        );
1928
1929        Ok(())
1930    }
1931    #[test]
1932    fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
1933        use alloy_primitives::U256;
1934        use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
1935        use reth_primitives_traits::StorageEntry;
1936        use reth_storage_api::StorageSettingsCache;
1937        use std::collections::HashMap;
1938
1939        let address = alloy_primitives::Address::with_last_byte(1);
1940        let account = reth_primitives_traits::Account {
1941            nonce: 1,
1942            balance: U256::from(1000),
1943            bytecode_hash: None,
1944        };
1945        let slot = U256::from(0x42);
1946        let slot_b256 = B256::from(slot);
1947
1948        let mut rng = generators::rng();
1949        let factory = create_test_provider_factory();
1950        factory.set_storage_settings_cache(StorageSettings::v1());
1951
1952        let blocks = random_block_range(
1953            &mut rng,
1954            0..=1,
1955            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1956        );
1957
1958        let provider_rw = factory.provider_rw()?;
1959        provider_rw.append_blocks_with_state(
1960            blocks
1961                .into_iter()
1962                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1963                .collect(),
1964            &ExecutionOutcome {
1965                bundle: BundleState::new(
1966                    [(address, None, Some(account.into()), {
1967                        let mut s = HashMap::default();
1968                        s.insert(slot, (U256::ZERO, U256::from(100)));
1969                        s
1970                    })],
1971                    [
1972                        Vec::new(),
1973                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
1974                    ],
1975                    [],
1976                ),
1977                first_block: 0,
1978                ..Default::default()
1979            },
1980            Default::default(),
1981        )?;
1982
1983        provider_rw.tx_ref().put::<tables::PlainStorageState>(
1984            address,
1985            StorageEntry { key: slot_b256, value: U256::from(100) },
1986        )?;
1987        provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
1988
1989        provider_rw.commit()?;
1990
1991        let provider = BlockchainProvider::new(factory)?;
1992        let consistent_provider = provider.consistent_provider()?;
1993
1994        let outcome = consistent_provider.get_state(1)?.expect("should return execution outcome");
1995
1996        let state = &outcome.bundle.state;
1997        let account_state = state.get(&address).expect("should have account in bundle state");
1998        let storage = &account_state.storage;
1999
2000        let storage_slot = storage.get(&slot).expect("should have the slot in storage");
2001
2002        assert_eq!(
2003            storage_slot.present_value,
2004            U256::from(100),
2005            "present_value should be 100 (the actual value in PlainStorageState)"
2006        );
2007
2008        Ok(())
2009    }
2010
2011    #[test]
2012    fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
2013        use alloy_primitives::U256;
2014        use reth_db_api::models::StorageSettings;
2015        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2016        use std::collections::HashMap;
2017
2018        let mut rng = generators::rng();
2019        let factory = create_test_provider_factory();
2020        factory.set_storage_settings_cache(StorageSettings::v1());
2021
2022        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2023
2024        let address = alloy_primitives::Address::with_last_byte(1);
2025        let account = reth_primitives_traits::Account {
2026            nonce: 1,
2027            balance: U256::from(1000),
2028            bytecode_hash: None,
2029        };
2030        let slot = U256::from(0x42);
2031
2032        let provider_rw = factory.provider_rw()?;
2033        provider_rw.append_blocks_with_state(
2034            database_blocks
2035                .into_iter()
2036                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2037                .collect(),
2038            &ExecutionOutcome {
2039                bundle: BundleState::new(
2040                    [(address, None, Some(account.into()), {
2041                        let mut s = HashMap::default();
2042                        s.insert(slot, (U256::ZERO, U256::from(100)));
2043                        s
2044                    })],
2045                    [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2046                    [],
2047                ),
2048                first_block: 0,
2049                ..Default::default()
2050            },
2051            Default::default(),
2052        )?;
2053        provider_rw.commit()?;
2054
2055        let provider = BlockchainProvider::new(factory)?;
2056
2057        let in_mem_block = in_memory_blocks.first().unwrap();
2058        let senders = in_mem_block.senders().expect("failed to recover senders");
2059        let chain = NewCanonicalChain::Commit {
2060            new: vec![ExecutedBlock {
2061                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2062                    in_mem_block.clone(),
2063                    senders,
2064                )),
2065                execution_output: Arc::new(BlockExecutionOutput {
2066                    state: BundleState::new(
2067                        [(address, None, Some(account.into()), {
2068                            let mut s = HashMap::default();
2069                            s.insert(slot, (U256::from(100), U256::from(200)));
2070                            s
2071                        })],
2072                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2073                        [],
2074                    ),
2075                    result: BlockExecutionResult {
2076                        receipts: Default::default(),
2077                        requests: Default::default(),
2078                        gas_used: 0,
2079                        blob_gas_used: 0,
2080                    },
2081                }),
2082                ..Default::default()
2083            }],
2084        };
2085        provider.canonical_in_memory_state.update_chain(chain);
2086
2087        let consistent_provider = provider.consistent_provider()?;
2088
2089        let db_changeset = consistent_provider.storage_changeset(0)?;
2090        let mem_changeset = consistent_provider.storage_changeset(1)?;
2091
2092        let slot_b256 = B256::from(slot);
2093
2094        assert_eq!(db_changeset.len(), 1);
2095        assert_eq!(mem_changeset.len(), 1);
2096
2097        let db_key = db_changeset[0].1.key;
2098        let mem_key = mem_changeset[0].1.key;
2099
2100        assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2101        assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2102        assert_eq!(
2103            db_key, mem_key,
2104            "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2105        );
2106
2107        Ok(())
2108    }
2109
2110    #[test]
2111    fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2112        use alloy_primitives::U256;
2113        use reth_db_api::models::StorageSettings;
2114        use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2115        use std::collections::HashMap;
2116
2117        let mut rng = generators::rng();
2118        let factory = create_test_provider_factory();
2119        factory.set_storage_settings_cache(StorageSettings::v1());
2120
2121        let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2122
2123        let address = alloy_primitives::Address::with_last_byte(1);
2124        let account = reth_primitives_traits::Account {
2125            nonce: 1,
2126            balance: U256::from(1000),
2127            bytecode_hash: None,
2128        };
2129        let slot = U256::from(0x42);
2130
2131        let provider_rw = factory.provider_rw()?;
2132        provider_rw.append_blocks_with_state(
2133            database_blocks
2134                .into_iter()
2135                .map(|b| b.try_recover().expect("failed to seal block with senders"))
2136                .collect(),
2137            &ExecutionOutcome {
2138                bundle: BundleState::new(
2139                    [(address, None, Some(account.into()), {
2140                        let mut s = HashMap::default();
2141                        s.insert(slot, (U256::ZERO, U256::from(100)));
2142                        s
2143                    })],
2144                    vec![
2145                        vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2146                        vec![],
2147                    ],
2148                    [],
2149                ),
2150                first_block: 0,
2151                ..Default::default()
2152            },
2153            Default::default(),
2154        )?;
2155        provider_rw.commit()?;
2156
2157        let provider = BlockchainProvider::new(factory)?;
2158
2159        let in_mem_block = in_memory_blocks.first().unwrap();
2160        let senders = in_mem_block.senders().expect("failed to recover senders");
2161        let chain = NewCanonicalChain::Commit {
2162            new: vec![ExecutedBlock {
2163                recovered_block: Arc::new(RecoveredBlock::new_sealed(
2164                    in_mem_block.clone(),
2165                    senders,
2166                )),
2167                execution_output: Arc::new(BlockExecutionOutput {
2168                    state: BundleState::new(
2169                        [(address, None, Some(account.into()), {
2170                            let mut s = HashMap::default();
2171                            s.insert(slot, (U256::from(100), U256::from(200)));
2172                            s
2173                        })],
2174                        [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2175                        [],
2176                    ),
2177                    result: BlockExecutionResult {
2178                        receipts: Default::default(),
2179                        requests: Default::default(),
2180                        gas_used: 0,
2181                        blob_gas_used: 0,
2182                    },
2183                }),
2184                ..Default::default()
2185            }],
2186        };
2187        provider.canonical_in_memory_state.update_chain(chain);
2188
2189        let consistent_provider = provider.consistent_provider()?;
2190
2191        let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2192
2193        assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2194
2195        let slot_b256 = B256::from(slot);
2196        let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key).collect();
2197
2198        assert_eq!(
2199            keys[0], keys[1],
2200            "same logical slot should produce identical keys whether from DB or memory"
2201        );
2202        assert_eq!(
2203            keys[0], slot_b256,
2204            "keys should be plain/unhashed when use_hashed_state is false"
2205        );
2206
2207        Ok(())
2208    }
2209}