1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4 to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5 BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8 TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13 HashOrNumber,
14};
15use alloy_primitives::{
16 map::{hash_map, HashMap},
17 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{
25 Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry, StorageSlotKey,
26};
27use reth_prune_types::{PruneCheckpoint, PruneSegment};
28use reth_stages_types::{StageCheckpoint, StageId};
29use reth_static_file_types::StaticFileSegment;
30use reth_storage_api::{
31 BlockBodyIndicesProvider, ChangesetEntry, DatabaseProviderFactory, NodePrimitivesProvider,
32 StateProvider, StateProviderBox, StorageChangeSetReader, StorageSettingsCache,
33 TryIntoHistoricalStateProvider,
34};
35use reth_storage_errors::provider::ProviderResult;
36use revm_database::states::PlainStorageRevert;
37use std::{
38 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
39 sync::Arc,
40};
41use tracing::trace;
42
43#[derive(Debug)]
50#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
52 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
54 head_block: Option<Arc<BlockState<N::Primitives>>>,
56 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
58}
59
60impl<N: ProviderNodeTypes> ConsistentProvider<N> {
61 pub fn new(
67 storage_provider_factory: ProviderFactory<N>,
68 state: CanonicalInMemoryState<N::Primitives>,
69 ) -> ProviderResult<Self> {
70 let head_block = state.head_state();
78 let storage_provider = storage_provider_factory.database_provider_ro()?;
79 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
80 }
81
82 fn convert_range_bounds<T>(
84 &self,
85 range: impl RangeBounds<T>,
86 end_unbounded: impl FnOnce() -> T,
87 ) -> (T, T)
88 where
89 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
90 {
91 let start = match range.start_bound() {
92 Bound::Included(&n) => n,
93 Bound::Excluded(&n) => n + T::from(1u8),
94 Bound::Unbounded => T::from(0u8),
95 };
96
97 let end = match range.end_bound() {
98 Bound::Included(&n) => n,
99 Bound::Excluded(&n) => n - T::from(1u8),
100 Bound::Unbounded => end_unbounded(),
101 };
102
103 (start, end)
104 }
105
106 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
108 trace!(target: "providers::blockchain", "Getting latest block state provider");
109
110 if let Some(state) = &self.head_block {
112 trace!(target: "providers::blockchain", "Using head state for latest state provider");
113 Ok(self.block_state_provider_ref(state)?.boxed())
114 } else {
115 trace!(target: "providers::blockchain", "Using database state for latest state provider");
116 Ok(self.storage_provider.latest())
117 }
118 }
119
120 fn history_by_block_hash_ref<'a>(
121 &'a self,
122 block_hash: BlockHash,
123 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
124 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
125
126 self.get_in_memory_or_storage_by_block(
127 block_hash.into(),
128 |_| self.storage_provider.history_by_block_hash(block_hash),
129 |block_state| {
130 let state_provider = self.block_state_provider_ref(block_state)?;
131 Ok(Box::new(state_provider))
132 },
133 )
134 }
135
136 fn state_by_block_number_ref<'a>(
138 &'a self,
139 number: BlockNumber,
140 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
141 let hash =
142 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
143 self.history_by_block_hash_ref(hash)
144 }
145
146 pub fn get_state(
150 &self,
151 range: RangeInclusive<BlockNumber>,
152 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
153 if range.is_empty() {
154 return Ok(None)
155 }
156 let start_block_number = *range.start();
157 let end_block_number = *range.end();
158
159 let mut block_bodies = Vec::new();
161 for block_num in range.clone() {
162 let block_body = self
163 .block_body_indices(block_num)?
164 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
165 block_bodies.push((block_num, block_body))
166 }
167
168 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
170 else {
171 return Ok(None)
172 };
173 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
174 return Ok(None)
175 };
176
177 let mut account_changeset = Vec::new();
178 for block_num in range.clone() {
179 let changeset =
180 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
181 account_changeset.extend(changeset);
182 }
183
184 let mut storage_changeset = Vec::new();
185 for block_num in range {
186 let changeset = self.storage_changeset(block_num)?;
187 storage_changeset.extend(changeset);
188 }
189
190 let (state, reverts) =
191 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
192
193 let mut receipt_iter =
194 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
195
196 let mut receipts = Vec::with_capacity(block_bodies.len());
197 for (_, block_body) in block_bodies {
199 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
200 for tx_num in block_body.tx_num_range() {
201 let receipt = receipt_iter
202 .next()
203 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
204 block_receipts.push(receipt);
205 }
206 receipts.push(block_receipts);
207 }
208
209 Ok(Some(ExecutionOutcome::new_init(
210 state,
211 reverts,
212 Vec::new(),
214 receipts,
215 start_block_number,
216 Vec::new(),
217 )))
218 }
219
220 fn populate_bundle_state(
227 &self,
228 account_changeset: Vec<(u64, AccountBeforeTx)>,
229 storage_changeset: Vec<(BlockNumberAddress, ChangesetEntry)>,
230 block_range_end: BlockNumber,
231 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
232 let mut state: BundleStateInit = HashMap::default();
233 let mut reverts: RevertsInit = HashMap::default();
234 let state_provider = self.state_by_block_number_ref(block_range_end)?;
235
236 for (block_number, account_before) in account_changeset.into_iter().rev() {
238 let AccountBeforeTx { info: old_info, address } = account_before;
239 match state.entry(address) {
240 hash_map::Entry::Vacant(entry) => {
241 let new_info = state_provider.basic_account(&address)?;
242 entry.insert((old_info, new_info, HashMap::default()));
243 }
244 hash_map::Entry::Occupied(mut entry) => {
245 entry.get_mut().0 = old_info;
247 }
248 }
249 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
251 }
252
253 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
255 let BlockNumberAddress((block_number, address)) = block_and_address;
256 let account_state = match state.entry(address) {
258 hash_map::Entry::Vacant(entry) => {
259 let present_info = state_provider.basic_account(&address)?;
260 entry.insert((present_info, present_info, HashMap::default()))
261 }
262 hash_map::Entry::Occupied(entry) => entry.into_mut(),
263 };
264
265 match account_state.2.entry(old_storage.key.as_b256()) {
267 hash_map::Entry::Vacant(entry) => {
268 let new_storage_value = match old_storage.key {
269 StorageSlotKey::Hashed(_) => state_provider
270 .storage_by_hashed_key(address, old_storage.key.as_b256())?
271 .unwrap_or_default(),
272 StorageSlotKey::Plain(_) => state_provider
273 .storage(address, old_storage.key.as_b256())?
274 .unwrap_or_default(),
275 };
276 entry.insert((old_storage.value, new_storage_value));
277 }
278 hash_map::Entry::Occupied(mut entry) => {
279 entry.get_mut().0 = old_storage.value;
280 }
281 };
282
283 reverts
284 .entry(block_number)
285 .or_default()
286 .entry(address)
287 .or_default()
288 .1
289 .push(StorageEntry::from(old_storage));
290 }
291
292 Ok((state, reverts))
293 }
294
295 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
307 &self,
308 range: impl RangeBounds<BlockNumber>,
309 fetch_db_range: F,
310 map_block_state_item: G,
311 mut predicate: P,
312 ) -> ProviderResult<Vec<T>>
313 where
314 F: FnOnce(
315 &DatabaseProviderRO<N::DB, N>,
316 RangeInclusive<BlockNumber>,
317 &mut P,
318 ) -> ProviderResult<Vec<T>>,
319 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
320 P: FnMut(&T) -> bool,
321 {
322 let mut in_memory_chain =
330 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
331 let db_provider = &self.storage_provider;
332
333 let (start, end) = self.convert_range_bounds(range, || {
334 in_memory_chain
336 .first()
337 .map(|b| b.number())
338 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
339 });
340
341 if start > end {
342 return Ok(vec![])
343 }
344
345 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
350 Some(lowest_memory_block) if lowest_memory_block <= end => {
351 let highest_memory_block =
352 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
353
354 let in_memory_range =
358 lowest_memory_block.max(start)..=end.min(highest_memory_block);
359
360 in_memory_chain.truncate(
363 in_memory_chain
364 .len()
365 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
366 );
367
368 let storage_range =
369 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
370
371 (Some((in_memory_chain, in_memory_range)), storage_range)
372 }
373 _ => {
374 drop(in_memory_chain);
376
377 (None, Some(start..=end))
378 }
379 };
380
381 let mut items = Vec::with_capacity((end - start + 1) as usize);
382
383 if let Some(storage_range) = storage_range {
384 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
385 items.append(&mut db_items);
386
387 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
390 return Ok(items)
391 }
392 }
393
394 if let Some((in_memory_chain, in_memory_range)) = in_memory {
395 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
396 debug_assert!(num == block.number());
397 if let Some(item) = map_block_state_item(block, &mut predicate) {
398 items.push(item);
399 } else {
400 break
401 }
402 }
403 }
404
405 Ok(items)
406 }
407
408 fn block_state_provider_ref(
410 &self,
411 state: &BlockState<N::Primitives>,
412 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
413 let anchor_hash = state.anchor().hash;
414 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
415 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
416 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
417 }
418
419 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
425 &self,
426 range: impl RangeBounds<BlockNumber>,
427 fetch_from_db: S,
428 fetch_from_block_state: M,
429 ) -> ProviderResult<Vec<R>>
430 where
431 S: FnOnce(
432 &DatabaseProviderRO<N::DB, N>,
433 RangeInclusive<TxNumber>,
434 ) -> ProviderResult<Vec<R>>,
435 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
436 {
437 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
438 let provider = &self.storage_provider;
439
440 let last_database_block_number = in_mem_chain
443 .last()
444 .map(|b| Ok(b.anchor().number))
445 .unwrap_or_else(|| provider.last_block_number())?;
446
447 let last_block_body_index = provider
450 .block_body_indices(last_database_block_number)?
451 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
452 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
453
454 let (start, end) = self.convert_range_bounds(range, || {
455 in_mem_chain
456 .iter()
457 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
458 .sum::<u64>() +
459 last_block_body_index.last_tx_num()
460 });
461
462 if start > end {
463 return Ok(vec![])
464 }
465
466 let mut tx_range = start..=end;
467
468 if *tx_range.end() < in_memory_tx_num {
471 return fetch_from_db(provider, tx_range);
472 }
473
474 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
475
476 if *tx_range.start() < in_memory_tx_num {
478 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
480
481 tx_range = in_memory_tx_num..=*tx_range.end();
483
484 items.extend(fetch_from_db(provider, db_range)?);
485 }
486
487 for block_state in in_mem_chain.iter().rev() {
489 let block_tx_count =
490 block_state.block_ref().recovered_block().body().transactions().len();
491 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
492
493 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
496 in_memory_tx_num += block_tx_count as u64;
497 continue
498 }
499
500 let skip = (tx_range.start() - in_memory_tx_num) as usize;
502
503 items.extend(fetch_from_block_state(
504 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
505 block_state,
506 )?);
507
508 in_memory_tx_num += block_tx_count as u64;
509
510 if in_memory_tx_num > *tx_range.end() {
512 break
513 }
514
515 tx_range = in_memory_tx_num..=*tx_range.end();
517 }
518
519 Ok(items)
520 }
521
522 fn get_in_memory_or_storage_by_tx<S, M, R>(
525 &self,
526 id: HashOrNumber,
527 fetch_from_db: S,
528 fetch_from_block_state: M,
529 ) -> ProviderResult<Option<R>>
530 where
531 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
532 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
533 {
534 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
535 let provider = &self.storage_provider;
536
537 let last_database_block_number = in_mem_chain
540 .last()
541 .map(|b| Ok(b.anchor().number))
542 .unwrap_or_else(|| provider.last_block_number())?;
543
544 let last_block_body_index = provider
547 .block_body_indices(last_database_block_number)?
548 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
549 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
550
551 if let HashOrNumber::Number(id) = id &&
554 id < in_memory_tx_num
555 {
556 return fetch_from_db(provider)
557 }
558
559 for block_state in in_mem_chain.iter().rev() {
561 let executed_block = block_state.block_ref();
562 let block = executed_block.recovered_block();
563
564 for tx_index in 0..block.body().transactions().len() {
565 match id {
566 HashOrNumber::Hash(tx_hash) => {
567 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
568 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
569 }
570 }
571 HashOrNumber::Number(id) => {
572 if id == in_memory_tx_num {
573 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
574 }
575 }
576 }
577
578 in_memory_tx_num += 1;
579 }
580 }
581
582 if let HashOrNumber::Hash(_) = id {
584 return fetch_from_db(provider)
585 }
586
587 Ok(None)
588 }
589
590 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
592 &self,
593 id: BlockHashOrNumber,
594 fetch_from_db: S,
595 fetch_from_block_state: M,
596 ) -> ProviderResult<R>
597 where
598 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
599 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
600 {
601 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
602 return fetch_from_block_state(block_state)
603 }
604 fetch_from_db(&self.storage_provider)
605 }
606
607 pub(crate) fn into_state_provider_at_block_hash(
609 self,
610 block_hash: BlockHash,
611 ) -> ProviderResult<StateProviderBox> {
612 let Self { storage_provider, head_block, .. } = self;
613 let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
614 let block_number = storage_provider
615 .block_number(block_hash)?
616 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
617 storage_provider.try_into_history_at_block(block_number)
618 };
619 if let Some(Some(block_state)) =
620 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
621 {
622 let anchor_hash = block_state.anchor().hash;
623 let latest_historical = into_history_at_block_hash(anchor_hash)?;
624 return Ok(Box::new(block_state.state_provider(latest_historical)));
625 }
626 into_history_at_block_hash(block_hash)
627 }
628}
629
630impl<N: ProviderNodeTypes> ConsistentProvider<N> {
631 #[inline]
640 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
641 let latest = self.best_block_number()?;
642 if block_number > latest {
643 Err(ProviderError::HeaderNotFound(block_number.into()))
644 } else {
645 Ok(())
646 }
647 }
648}
649
650impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
651 type Primitives = N::Primitives;
652}
653
654impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
655 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
656 self.storage_provider.static_file_provider()
657 }
658
659 fn get_static_file_writer(
660 &self,
661 block: BlockNumber,
662 segment: StaticFileSegment,
663 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
664 self.storage_provider.get_static_file_writer(block, segment)
665 }
666}
667
668impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
669 type Header = HeaderTy<N>;
670
671 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
672 self.get_in_memory_or_storage_by_block(
673 block_hash.into(),
674 |db_provider| db_provider.header(block_hash),
675 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
676 )
677 }
678
679 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
680 self.get_in_memory_or_storage_by_block(
681 num.into(),
682 |db_provider| db_provider.header_by_number(num),
683 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
684 )
685 }
686
687 fn headers_range(
688 &self,
689 range: impl RangeBounds<BlockNumber>,
690 ) -> ProviderResult<Vec<Self::Header>> {
691 self.get_in_memory_or_storage_by_block_range_while(
692 range,
693 |db_provider, range, _| db_provider.headers_range(range),
694 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
695 |_| true,
696 )
697 }
698
699 fn sealed_header(
700 &self,
701 number: BlockNumber,
702 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
703 self.get_in_memory_or_storage_by_block(
704 number.into(),
705 |db_provider| db_provider.sealed_header(number),
706 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
707 )
708 }
709
710 fn sealed_headers_range(
711 &self,
712 range: impl RangeBounds<BlockNumber>,
713 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
714 self.get_in_memory_or_storage_by_block_range_while(
715 range,
716 |db_provider, range, _| db_provider.sealed_headers_range(range),
717 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
718 |_| true,
719 )
720 }
721
722 fn sealed_headers_while(
723 &self,
724 range: impl RangeBounds<BlockNumber>,
725 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
726 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
727 self.get_in_memory_or_storage_by_block_range_while(
728 range,
729 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
730 |block_state, predicate| {
731 let header = block_state.block_ref().recovered_block().sealed_header();
732 predicate(header).then(|| header.clone())
733 },
734 predicate,
735 )
736 }
737}
738
739impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
740 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
741 self.get_in_memory_or_storage_by_block(
742 number.into(),
743 |db_provider| db_provider.block_hash(number),
744 |block_state| Ok(Some(block_state.hash())),
745 )
746 }
747
748 fn canonical_hashes_range(
749 &self,
750 start: BlockNumber,
751 end: BlockNumber,
752 ) -> ProviderResult<Vec<B256>> {
753 self.get_in_memory_or_storage_by_block_range_while(
754 start..end,
755 |db_provider, inclusive_range, _| {
756 db_provider
757 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
758 },
759 |block_state, _| Some(block_state.hash()),
760 |_| true,
761 )
762 }
763}
764
765impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
766 fn chain_info(&self) -> ProviderResult<ChainInfo> {
767 let best_number = self.best_block_number()?;
768 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
769 }
770
771 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
772 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
773 }
774
775 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
776 self.storage_provider.last_block_number()
777 }
778
779 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
780 self.get_in_memory_or_storage_by_block(
781 hash.into(),
782 |db_provider| db_provider.block_number(hash),
783 |block_state| Ok(Some(block_state.number())),
784 )
785 }
786}
787
788impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
789 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
790 Ok(self.canonical_in_memory_state.pending_block_num_hash())
791 }
792
793 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
794 Ok(self.canonical_in_memory_state.get_safe_num_hash())
795 }
796
797 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
798 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
799 }
800}
801
802impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
803 type Block = BlockTy<N>;
804
805 fn find_block_by_hash(
806 &self,
807 hash: B256,
808 source: BlockSource,
809 ) -> ProviderResult<Option<Self::Block>> {
810 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
811 let Some(block) = self.get_in_memory_or_storage_by_block(
812 hash.into(),
813 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
814 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
815 )?
816 {
817 return Ok(Some(block))
818 }
819
820 if matches!(source, BlockSource::Pending | BlockSource::Any) {
821 return Ok(self
822 .canonical_in_memory_state
823 .pending_block()
824 .filter(|b| b.hash() == hash)
825 .map(|b| b.into_block()))
826 }
827
828 Ok(None)
829 }
830
831 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
832 self.get_in_memory_or_storage_by_block(
833 id,
834 |db_provider| db_provider.block(id),
835 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
836 )
837 }
838
839 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
840 Ok(self.canonical_in_memory_state.pending_recovered_block())
841 }
842
843 fn pending_block_and_receipts(
844 &self,
845 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
846 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
847 }
848
849 fn recovered_block(
856 &self,
857 id: BlockHashOrNumber,
858 transaction_kind: TransactionVariant,
859 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
860 self.get_in_memory_or_storage_by_block(
861 id,
862 |db_provider| db_provider.recovered_block(id, transaction_kind),
863 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
864 )
865 }
866
867 fn sealed_block_with_senders(
868 &self,
869 id: BlockHashOrNumber,
870 transaction_kind: TransactionVariant,
871 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
872 self.get_in_memory_or_storage_by_block(
873 id,
874 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
875 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
876 )
877 }
878
879 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
880 self.get_in_memory_or_storage_by_block_range_while(
881 range,
882 |db_provider, range, _| db_provider.block_range(range),
883 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
884 |_| true,
885 )
886 }
887
888 fn block_with_senders_range(
889 &self,
890 range: RangeInclusive<BlockNumber>,
891 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
892 self.get_in_memory_or_storage_by_block_range_while(
893 range,
894 |db_provider, range, _| db_provider.block_with_senders_range(range),
895 |block_state, _| Some(block_state.block().recovered_block().clone()),
896 |_| true,
897 )
898 }
899
900 fn recovered_block_range(
901 &self,
902 range: RangeInclusive<BlockNumber>,
903 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
904 self.get_in_memory_or_storage_by_block_range_while(
905 range,
906 |db_provider, range, _| db_provider.recovered_block_range(range),
907 |block_state, _| Some(block_state.block().recovered_block().clone()),
908 |_| true,
909 )
910 }
911
912 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
913 self.get_in_memory_or_storage_by_tx(
914 id.into(),
915 |db_provider| db_provider.block_by_transaction_id(id),
916 |_, _, block_state| Ok(Some(block_state.number())),
917 )
918 }
919}
920
921impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
922 type Transaction = TxTy<N>;
923
924 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
925 self.get_in_memory_or_storage_by_tx(
926 tx_hash.into(),
927 |db_provider| db_provider.transaction_id(tx_hash),
928 |_, tx_number, _| Ok(Some(tx_number)),
929 )
930 }
931
932 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
933 self.get_in_memory_or_storage_by_tx(
934 id.into(),
935 |provider| provider.transaction_by_id(id),
936 |tx_index, _, block_state| {
937 Ok(block_state
938 .block_ref()
939 .recovered_block()
940 .body()
941 .transactions()
942 .get(tx_index)
943 .cloned())
944 },
945 )
946 }
947
948 fn transaction_by_id_unhashed(
949 &self,
950 id: TxNumber,
951 ) -> ProviderResult<Option<Self::Transaction>> {
952 self.get_in_memory_or_storage_by_tx(
953 id.into(),
954 |provider| provider.transaction_by_id_unhashed(id),
955 |tx_index, _, block_state| {
956 Ok(block_state
957 .block_ref()
958 .recovered_block()
959 .body()
960 .transactions()
961 .get(tx_index)
962 .cloned())
963 },
964 )
965 }
966
967 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
968 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
969 return Ok(Some(tx))
970 }
971
972 self.storage_provider.transaction_by_hash(hash)
973 }
974
975 fn transaction_by_hash_with_meta(
976 &self,
977 tx_hash: TxHash,
978 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
979 if let Some((tx, meta)) =
980 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
981 {
982 return Ok(Some((tx, meta)))
983 }
984
985 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
986 }
987
988 fn transactions_by_block(
989 &self,
990 id: BlockHashOrNumber,
991 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
992 self.get_in_memory_or_storage_by_block(
993 id,
994 |provider| provider.transactions_by_block(id),
995 |block_state| {
996 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
997 },
998 )
999 }
1000
1001 fn transactions_by_block_range(
1002 &self,
1003 range: impl RangeBounds<BlockNumber>,
1004 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1005 self.get_in_memory_or_storage_by_block_range_while(
1006 range,
1007 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1008 |block_state, _| {
1009 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1010 },
1011 |_| true,
1012 )
1013 }
1014
1015 fn transactions_by_tx_range(
1016 &self,
1017 range: impl RangeBounds<TxNumber>,
1018 ) -> ProviderResult<Vec<Self::Transaction>> {
1019 self.get_in_memory_or_storage_by_tx_range(
1020 range,
1021 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1022 |index_range, block_state| {
1023 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1024 .to_vec())
1025 },
1026 )
1027 }
1028
1029 fn senders_by_tx_range(
1030 &self,
1031 range: impl RangeBounds<TxNumber>,
1032 ) -> ProviderResult<Vec<Address>> {
1033 self.get_in_memory_or_storage_by_tx_range(
1034 range,
1035 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1036 |index_range, block_state| {
1037 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1038 },
1039 )
1040 }
1041
1042 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1043 self.get_in_memory_or_storage_by_tx(
1044 id.into(),
1045 |provider| provider.transaction_sender(id),
1046 |tx_index, _, block_state| {
1047 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1048 },
1049 )
1050 }
1051}
1052
1053impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1054 type Receipt = ReceiptTy<N>;
1055
1056 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1057 self.get_in_memory_or_storage_by_tx(
1058 id.into(),
1059 |provider| provider.receipt(id),
1060 |tx_index, _, block_state| {
1061 Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1062 },
1063 )
1064 }
1065
1066 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1067 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1068 let executed_block = block_state.block_ref();
1069 let block = executed_block.recovered_block();
1070 let receipts = block_state.executed_block_receipts_ref();
1071
1072 debug_assert_eq!(
1074 block.body().transactions().len(),
1075 receipts.len(),
1076 "Mismatch between transaction and receipt count"
1077 );
1078
1079 if let Some(tx_index) =
1080 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1081 {
1082 return Ok(receipts.get(tx_index).cloned());
1084 }
1085 }
1086
1087 self.storage_provider.receipt_by_hash(hash)
1088 }
1089
1090 fn receipts_by_block(
1091 &self,
1092 block: BlockHashOrNumber,
1093 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1094 self.get_in_memory_or_storage_by_block(
1095 block,
1096 |db_provider| db_provider.receipts_by_block(block),
1097 |block_state| Ok(Some(block_state.executed_block_receipts())),
1098 )
1099 }
1100
1101 fn receipts_by_tx_range(
1102 &self,
1103 range: impl RangeBounds<TxNumber>,
1104 ) -> ProviderResult<Vec<Self::Receipt>> {
1105 self.get_in_memory_or_storage_by_tx_range(
1106 range,
1107 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1108 |index_range, block_state| {
1109 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1110 },
1111 )
1112 }
1113
1114 fn receipts_by_block_range(
1115 &self,
1116 block_range: RangeInclusive<BlockNumber>,
1117 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1118 self.storage_provider.receipts_by_block_range(block_range)
1119 }
1120}
1121
1122impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1123 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1124 match block {
1125 BlockId::Hash(rpc_block_hash) => {
1126 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1127 if receipts.is_none() &&
1128 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1129 let Some(state) = self
1130 .head_block
1131 .as_ref()
1132 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1133 {
1134 receipts = Some(state.executed_block_receipts());
1135 }
1136 Ok(receipts)
1137 }
1138 BlockId::Number(num_tag) => match num_tag {
1139 BlockNumberOrTag::Pending => Ok(self
1140 .canonical_in_memory_state
1141 .pending_state()
1142 .map(|block_state| block_state.executed_block_receipts())),
1143 _ => {
1144 if let Some(num) = self.convert_block_number(num_tag)? {
1145 self.receipts_by_block(num.into())
1146 } else {
1147 Ok(None)
1148 }
1149 }
1150 },
1151 }
1152 }
1153}
1154
1155impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1156 fn block_body_indices(
1157 &self,
1158 number: BlockNumber,
1159 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1160 self.get_in_memory_or_storage_by_block(
1161 number.into(),
1162 |db_provider| db_provider.block_body_indices(number),
1163 |block_state| {
1164 let last_storage_block_number = block_state.anchor().number;
1166 let mut stored_indices = self
1167 .storage_provider
1168 .block_body_indices(last_storage_block_number)?
1169 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1170
1171 stored_indices.first_tx_num = stored_indices.next_tx_num();
1173 stored_indices.tx_count = 0;
1174
1175 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1177 let block_tx_count =
1178 state.block_ref().recovered_block().body().transactions().len() as u64;
1179 if state.block_ref().recovered_block().number() == number {
1180 stored_indices.tx_count = block_tx_count;
1181 } else {
1182 stored_indices.first_tx_num += block_tx_count;
1183 }
1184 }
1185
1186 Ok(Some(stored_indices))
1187 },
1188 )
1189 }
1190
1191 fn block_body_indices_range(
1192 &self,
1193 range: RangeInclusive<BlockNumber>,
1194 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1195 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1196 }
1197}
1198
1199impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1200 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1201 self.storage_provider.get_stage_checkpoint(id)
1202 }
1203
1204 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1205 self.storage_provider.get_stage_checkpoint_progress(id)
1206 }
1207
1208 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1209 self.storage_provider.get_all_checkpoints()
1210 }
1211}
1212
1213impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1214 fn get_prune_checkpoint(
1215 &self,
1216 segment: PruneSegment,
1217 ) -> ProviderResult<Option<PruneCheckpoint>> {
1218 self.storage_provider.get_prune_checkpoint(segment)
1219 }
1220
1221 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1222 self.storage_provider.get_prune_checkpoints()
1223 }
1224}
1225
1226impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1227 type ChainSpec = N::ChainSpec;
1228
1229 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1230 ChainSpecProvider::chain_spec(&self.storage_provider)
1231 }
1232}
1233
1234impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1235 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1236 match id {
1237 BlockId::Number(num) => self.block_by_number_or_tag(num),
1238 BlockId::Hash(hash) => {
1239 if Some(true) == hash.require_canonical {
1244 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1246 } else {
1247 self.block_by_hash(hash.block_hash)
1248 }
1249 }
1250 }
1251 }
1252
1253 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1254 Ok(match id {
1255 BlockNumberOrTag::Latest => {
1256 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1257 }
1258 BlockNumberOrTag::Finalized => {
1259 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1260 }
1261 BlockNumberOrTag::Safe => {
1262 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1263 }
1264 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1265 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1266
1267 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1268 })
1269 }
1270
1271 fn sealed_header_by_number_or_tag(
1272 &self,
1273 id: BlockNumberOrTag,
1274 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1275 match id {
1276 BlockNumberOrTag::Latest => {
1277 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1278 }
1279 BlockNumberOrTag::Finalized => {
1280 Ok(self.canonical_in_memory_state.get_finalized_header())
1281 }
1282 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1283 BlockNumberOrTag::Earliest => self
1284 .header_by_number(self.earliest_block_number()?)?
1285 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1286 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1287 BlockNumberOrTag::Number(num) => self
1288 .header_by_number(num)?
1289 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1290 }
1291 }
1292
1293 fn sealed_header_by_id(
1294 &self,
1295 id: BlockId,
1296 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1297 Ok(match id {
1298 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1299 BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1300 })
1301 }
1302
1303 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1304 Ok(match id {
1305 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1306 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1307 })
1308 }
1309}
1310
1311impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1312 fn storage_changeset(
1313 &self,
1314 block_number: BlockNumber,
1315 ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1316 let use_hashed = self.storage_provider.cached_storage_settings().use_hashed_state();
1317 if let Some(state) =
1318 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1319 {
1320 let changesets = state
1321 .block()
1322 .execution_output
1323 .state
1324 .reverts
1325 .clone()
1326 .to_plain_state_reverts()
1327 .storage
1328 .into_iter()
1329 .flatten()
1330 .flat_map(|revert: PlainStorageRevert| {
1331 revert.storage_revert.into_iter().map(move |(key, value)| {
1332 let tagged_key = StorageSlotKey::from_u256(key).to_changeset(use_hashed);
1333 (
1334 BlockNumberAddress((block_number, revert.address)),
1335 ChangesetEntry { key: tagged_key, value: value.to_previous_value() },
1336 )
1337 })
1338 })
1339 .collect();
1340 Ok(changesets)
1341 } else {
1342 let storage_history_exists = self
1346 .storage_provider
1347 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1348 .and_then(|checkpoint| {
1349 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1354 })
1355 .unwrap_or(true);
1356
1357 if !storage_history_exists {
1358 return Err(ProviderError::StateAtBlockPruned(block_number))
1359 }
1360
1361 self.storage_provider.storage_changeset(block_number)
1362 }
1363 }
1364
1365 fn get_storage_before_block(
1366 &self,
1367 block_number: BlockNumber,
1368 address: Address,
1369 storage_key: B256,
1370 ) -> ProviderResult<Option<ChangesetEntry>> {
1371 let use_hashed = self.storage_provider.cached_storage_settings().use_hashed_state();
1372 if let Some(state) =
1373 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1374 {
1375 let changeset = state
1376 .block_ref()
1377 .execution_output
1378 .state
1379 .reverts
1380 .clone()
1381 .to_plain_state_reverts()
1382 .storage
1383 .into_iter()
1384 .flatten()
1385 .find_map(|revert: PlainStorageRevert| {
1386 if revert.address != address {
1387 return None
1388 }
1389 revert.storage_revert.into_iter().find_map(|(key, value)| {
1390 let tagged_key = StorageSlotKey::from_u256(key).to_changeset(use_hashed);
1391 (tagged_key.as_b256() == storage_key).then(|| ChangesetEntry {
1392 key: tagged_key,
1393 value: value.to_previous_value(),
1394 })
1395 })
1396 });
1397 Ok(changeset)
1398 } else {
1399 let storage_history_exists = self
1400 .storage_provider
1401 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1402 .and_then(|checkpoint| {
1403 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1404 })
1405 .unwrap_or(true);
1406
1407 if !storage_history_exists {
1408 return Err(ProviderError::StateAtBlockPruned(block_number))
1409 }
1410
1411 self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1412 }
1413 }
1414
1415 fn storage_changesets_range(
1416 &self,
1417 range: impl RangeBounds<BlockNumber>,
1418 ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
1419 let range = to_range(range);
1420 let mut changesets = Vec::new();
1421 let database_start = range.start;
1422 let mut database_end = range.end;
1423
1424 let use_hashed = self.storage_provider.cached_storage_settings().use_hashed_state();
1425
1426 if let Some(head_block) = &self.head_block {
1427 database_end = head_block.anchor().number;
1428
1429 let chain = head_block.chain().collect::<Vec<_>>();
1430 for state in chain {
1431 let block_changesets = state
1432 .block_ref()
1433 .execution_output
1434 .state
1435 .reverts
1436 .clone()
1437 .to_plain_state_reverts()
1438 .storage
1439 .into_iter()
1440 .flatten()
1441 .flat_map(|revert: PlainStorageRevert| {
1442 revert.storage_revert.into_iter().map(move |(key, value)| {
1443 let tagged_key =
1444 StorageSlotKey::from_u256(key).to_changeset(use_hashed);
1445 (
1446 BlockNumberAddress((state.number(), revert.address)),
1447 ChangesetEntry {
1448 key: tagged_key,
1449 value: value.to_previous_value(),
1450 },
1451 )
1452 })
1453 });
1454
1455 changesets.extend(block_changesets);
1456 }
1457 }
1458
1459 if database_start < database_end {
1460 let storage_history_exists = self
1461 .storage_provider
1462 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1463 .and_then(|checkpoint| {
1464 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1465 })
1466 .unwrap_or(true);
1467
1468 if !storage_history_exists {
1469 return Err(ProviderError::StateAtBlockPruned(database_start))
1470 }
1471
1472 let db_changesets = self
1473 .storage_provider
1474 .storage_changesets_range(database_start..=database_end - 1)?;
1475 changesets.extend(db_changesets);
1476 }
1477
1478 changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1479
1480 Ok(changesets)
1481 }
1482
1483 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1484 let mut count = 0;
1485 if let Some(head_block) = &self.head_block {
1486 for state in head_block.chain() {
1487 count += state
1488 .block_ref()
1489 .execution_output
1490 .state
1491 .reverts
1492 .clone()
1493 .to_plain_state_reverts()
1494 .storage
1495 .into_iter()
1496 .flatten()
1497 .map(|revert: PlainStorageRevert| revert.storage_revert.len())
1498 .sum::<usize>();
1499 }
1500 }
1501
1502 count += self.storage_provider.storage_changeset_count()?;
1503
1504 Ok(count)
1505 }
1506}
1507
1508impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1509 fn account_block_changeset(
1510 &self,
1511 block_number: BlockNumber,
1512 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1513 if let Some(state) =
1514 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1515 {
1516 let changesets = state
1517 .block_ref()
1518 .execution_output
1519 .state
1520 .reverts
1521 .clone()
1522 .to_plain_state_reverts()
1523 .accounts
1524 .into_iter()
1525 .flatten()
1526 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1527 .collect();
1528 Ok(changesets)
1529 } else {
1530 let account_history_exists = self
1534 .storage_provider
1535 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1536 .and_then(|checkpoint| {
1537 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1542 })
1543 .unwrap_or(true);
1544
1545 if !account_history_exists {
1546 return Err(ProviderError::StateAtBlockPruned(block_number))
1547 }
1548
1549 self.storage_provider.account_block_changeset(block_number)
1550 }
1551 }
1552
1553 fn get_account_before_block(
1554 &self,
1555 block_number: BlockNumber,
1556 address: Address,
1557 ) -> ProviderResult<Option<AccountBeforeTx>> {
1558 if let Some(state) =
1559 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1560 {
1561 let changeset = state
1563 .block_ref()
1564 .execution_output
1565 .state
1566 .reverts
1567 .clone()
1568 .to_plain_state_reverts()
1569 .accounts
1570 .into_iter()
1571 .flatten()
1572 .find(|(addr, _)| addr == &address)
1573 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1574 Ok(changeset)
1575 } else {
1576 let account_history_exists = self
1579 .storage_provider
1580 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1581 .and_then(|checkpoint| {
1582 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1587 })
1588 .unwrap_or(true);
1589
1590 if !account_history_exists {
1591 return Err(ProviderError::StateAtBlockPruned(block_number))
1592 }
1593
1594 self.storage_provider.get_account_before_block(block_number, address)
1596 }
1597 }
1598
1599 fn account_changesets_range(
1600 &self,
1601 range: impl core::ops::RangeBounds<BlockNumber>,
1602 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1603 let range = to_range(range);
1604 let mut changesets = Vec::new();
1605 let database_start = range.start;
1606 let mut database_end = range.end;
1607
1608 if let Some(head_block) = &self.head_block {
1610 database_end = head_block.anchor().number;
1612
1613 let chain = head_block.chain().collect::<Vec<_>>();
1614 for state in chain {
1615 let block_changesets = state
1617 .block_ref()
1618 .execution_output
1619 .state
1620 .reverts
1621 .clone()
1622 .to_plain_state_reverts()
1623 .accounts
1624 .into_iter()
1625 .flatten()
1626 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1627
1628 for changeset in block_changesets {
1629 changesets.push((state.number(), changeset));
1630 }
1631 }
1632 }
1633
1634 if database_start < database_end {
1636 let account_history_exists = self
1638 .storage_provider
1639 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1640 .and_then(|checkpoint| {
1641 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1642 })
1643 .unwrap_or(true);
1644
1645 if !account_history_exists {
1646 return Err(ProviderError::StateAtBlockPruned(database_start))
1647 }
1648
1649 let db_changesets =
1650 self.storage_provider.account_changesets_range(database_start..database_end)?;
1651 changesets.extend(db_changesets);
1652 }
1653
1654 changesets.sort_by_key(|(block_num, _)| *block_num);
1655
1656 Ok(changesets)
1657 }
1658
1659 fn account_changeset_count(&self) -> ProviderResult<usize> {
1660 let mut count = 0;
1662 if let Some(head_block) = &self.head_block {
1663 for state in head_block.chain() {
1664 count += state
1665 .block_ref()
1666 .execution_output
1667 .state
1668 .reverts
1669 .clone()
1670 .to_plain_state_reverts()
1671 .accounts
1672 .len();
1673 }
1674 }
1675
1676 count += self.storage_provider.account_changeset_count()?;
1678
1679 Ok(count)
1680 }
1681}
1682
1683impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1684 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1686 let state_provider = self.latest_ref()?;
1688 state_provider.basic_account(address)
1689 }
1690}
1691
1692impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1693 type Receipt = ReceiptTy<N>;
1694
1695 fn get_state(
1705 &self,
1706 block: BlockNumber,
1707 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1708 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1709 let state = state.block_ref().execution_outcome().clone();
1710 Ok(Some(ExecutionOutcome::from((state, block))))
1711 } else {
1712 Self::get_state(self, block..=block)
1713 }
1714 }
1715}
1716
1717#[cfg(test)]
1718mod tests {
1719 use crate::{
1720 providers::blockchain_provider::BlockchainProvider,
1721 test_utils::create_test_provider_factory, BlockWriter,
1722 };
1723 use alloy_eips::BlockHashOrNumber;
1724 use alloy_primitives::B256;
1725 use itertools::Itertools;
1726 use rand::Rng;
1727 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1728 use reth_db_api::models::AccountBeforeTx;
1729 use reth_ethereum_primitives::Block;
1730 use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1731 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1732 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1733 use reth_testing_utils::generators::{
1734 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1735 };
1736 use revm_database::BundleState;
1737 use std::{
1738 ops::{Bound, Range, RangeBounds},
1739 sync::Arc,
1740 };
1741
1742 const TEST_BLOCKS_COUNT: usize = 5;
1743
1744 fn random_blocks(
1745 rng: &mut impl Rng,
1746 database_blocks: usize,
1747 in_memory_blocks: usize,
1748 requests_count: Option<Range<u8>>,
1749 withdrawals_count: Option<Range<u8>>,
1750 tx_count: impl RangeBounds<u8>,
1751 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1752 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1753
1754 let tx_start = match tx_count.start_bound() {
1755 Bound::Included(&n) | Bound::Excluded(&n) => n,
1756 Bound::Unbounded => u8::MIN,
1757 };
1758 let tx_end = match tx_count.end_bound() {
1759 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1760 Bound::Unbounded => u8::MAX,
1761 };
1762
1763 let blocks = random_block_range(
1764 rng,
1765 0..=block_range,
1766 BlockRangeParams {
1767 parent: Some(B256::ZERO),
1768 tx_count: tx_start..tx_end,
1769 requests_count,
1770 withdrawals_count,
1771 },
1772 );
1773 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1774 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1775 }
1776
1777 #[test]
1778 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1779 let mut rng = generators::rng();
1781 let factory = create_test_provider_factory();
1782
1783 let blocks = random_block_range(
1785 &mut rng,
1786 0..=10,
1787 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1788 );
1789 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1790
1791 let provider_rw = factory.provider_rw()?;
1793 for block in database_blocks {
1794 provider_rw.insert_block(
1795 &block.clone().try_recover().expect("failed to seal block with senders"),
1796 )?;
1797 }
1798 provider_rw.commit()?;
1799
1800 let provider = BlockchainProvider::new(factory)?;
1802 let consistent_provider = provider.consistent_provider()?;
1803
1804 let first_db_block = database_blocks.first().unwrap();
1806 let first_in_mem_block = in_memory_blocks.first().unwrap();
1807 let last_in_mem_block = in_memory_blocks.last().unwrap();
1808
1809 assert_eq!(
1811 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1812 None
1813 );
1814 assert_eq!(
1815 consistent_provider
1816 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1817 None
1818 );
1819 assert_eq!(
1821 consistent_provider
1822 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1823 None
1824 );
1825
1826 let in_memory_block_senders =
1828 first_in_mem_block.senders().expect("failed to recover senders");
1829 let chain = NewCanonicalChain::Commit {
1830 new: vec![ExecutedBlock {
1831 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1832 first_in_mem_block.clone(),
1833 in_memory_block_senders,
1834 )),
1835 ..Default::default()
1836 }],
1837 };
1838 consistent_provider.canonical_in_memory_state.update_chain(chain);
1839 let consistent_provider = provider.consistent_provider()?;
1840
1841 assert_eq!(
1843 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1844 Some(first_in_mem_block.clone().into_block())
1845 );
1846 assert_eq!(
1847 consistent_provider
1848 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1849 Some(first_in_mem_block.clone().into_block())
1850 );
1851
1852 assert_eq!(
1854 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1855 Some(first_db_block.clone().into_block())
1856 );
1857 assert_eq!(
1858 consistent_provider
1859 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1860 Some(first_db_block.clone().into_block())
1861 );
1862
1863 assert_eq!(
1865 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1866 None
1867 );
1868
1869 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1871 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1872 last_in_mem_block.clone(),
1873 Default::default(),
1874 )),
1875 ..Default::default()
1876 });
1877
1878 assert_eq!(
1880 consistent_provider
1881 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1882 Some(last_in_mem_block.clone_block())
1883 );
1884
1885 Ok(())
1886 }
1887
1888 #[test]
1889 fn test_block_reader_block() -> eyre::Result<()> {
1890 let mut rng = generators::rng();
1892 let factory = create_test_provider_factory();
1893
1894 let blocks = random_block_range(
1896 &mut rng,
1897 0..=10,
1898 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1899 );
1900 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1901
1902 let provider_rw = factory.provider_rw()?;
1904 for block in database_blocks {
1905 provider_rw.insert_block(
1906 &block.clone().try_recover().expect("failed to seal block with senders"),
1907 )?;
1908 }
1909 provider_rw.commit()?;
1910
1911 let provider = BlockchainProvider::new(factory)?;
1913 let consistent_provider = provider.consistent_provider()?;
1914
1915 let first_in_mem_block = in_memory_blocks.first().unwrap();
1917 let first_db_block = database_blocks.first().unwrap();
1919
1920 assert_eq!(
1922 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1923 None
1924 );
1925 assert_eq!(
1926 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1927 None
1928 );
1929
1930 let in_memory_block_senders =
1932 first_in_mem_block.senders().expect("failed to recover senders");
1933 let chain = NewCanonicalChain::Commit {
1934 new: vec![ExecutedBlock {
1935 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1936 first_in_mem_block.clone(),
1937 in_memory_block_senders,
1938 )),
1939 ..Default::default()
1940 }],
1941 };
1942 consistent_provider.canonical_in_memory_state.update_chain(chain);
1943
1944 let consistent_provider = provider.consistent_provider()?;
1945
1946 assert_eq!(
1948 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1949 Some(first_in_mem_block.clone().into_block())
1950 );
1951 assert_eq!(
1952 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1953 Some(first_in_mem_block.clone().into_block())
1954 );
1955
1956 assert_eq!(
1958 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1959 Some(first_db_block.clone().into_block())
1960 );
1961 assert_eq!(
1962 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1963 Some(first_db_block.clone().into_block())
1964 );
1965
1966 Ok(())
1967 }
1968
1969 #[test]
1970 fn test_changeset_reader() -> eyre::Result<()> {
1971 let mut rng = generators::rng();
1972
1973 let (database_blocks, in_memory_blocks) =
1974 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1975
1976 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1977 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1978 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1979
1980 let accounts = random_eoa_accounts(&mut rng, 2);
1981
1982 let (database_changesets, database_state) = random_changeset_range(
1983 &mut rng,
1984 &database_blocks,
1985 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1986 0..0,
1987 0..0,
1988 );
1989 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1990 &mut rng,
1991 &in_memory_blocks,
1992 database_state
1993 .iter()
1994 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1995 0..0,
1996 0..0,
1997 );
1998
1999 let factory = create_test_provider_factory();
2000
2001 let provider_rw = factory.provider_rw()?;
2002 provider_rw.append_blocks_with_state(
2003 database_blocks
2004 .into_iter()
2005 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2006 .collect(),
2007 &ExecutionOutcome {
2008 bundle: BundleState::new(
2009 database_state.into_iter().map(|(address, (account, _))| {
2010 (address, None, Some(account.into()), Default::default())
2011 }),
2012 database_changesets.iter().map(|block_changesets| {
2013 block_changesets.iter().map(|(address, account, _)| {
2014 (*address, Some(Some((*account).into())), [])
2015 })
2016 }),
2017 Vec::new(),
2018 ),
2019 first_block: first_database_block,
2020 ..Default::default()
2021 },
2022 Default::default(),
2023 )?;
2024 provider_rw.commit()?;
2025
2026 let provider = BlockchainProvider::new(factory)?;
2027
2028 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
2029 let chain = NewCanonicalChain::Commit {
2030 new: vec![in_memory_blocks
2031 .first()
2032 .map(|block| {
2033 let senders = block.senders().expect("failed to recover senders");
2034 ExecutedBlock {
2035 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2036 block.clone(),
2037 senders,
2038 )),
2039 execution_output: Arc::new(BlockExecutionOutput {
2040 state: BundleState::new(
2041 in_memory_state.into_iter().map(|(address, (account, _))| {
2042 (address, None, Some(account.into()), Default::default())
2043 }),
2044 [in_memory_changesets.iter().map(|(address, account, _)| {
2045 (*address, Some(Some((*account).into())), Vec::new())
2046 })],
2047 [],
2048 ),
2049 result: BlockExecutionResult {
2050 receipts: Default::default(),
2051 requests: Default::default(),
2052 gas_used: 0,
2053 blob_gas_used: 0,
2054 },
2055 }),
2056 ..Default::default()
2057 }
2058 })
2059 .unwrap()],
2060 };
2061 provider.canonical_in_memory_state.update_chain(chain);
2062
2063 let consistent_provider = provider.consistent_provider()?;
2064
2065 assert_eq!(
2066 consistent_provider.account_block_changeset(last_database_block).unwrap(),
2067 database_changesets
2068 .into_iter()
2069 .next_back()
2070 .unwrap()
2071 .into_iter()
2072 .sorted_by_key(|(address, _, _)| *address)
2073 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2074 .collect::<Vec<_>>()
2075 );
2076 assert_eq!(
2077 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2078 in_memory_changesets
2079 .into_iter()
2080 .sorted_by_key(|(address, _, _)| *address)
2081 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2082 .collect::<Vec<_>>()
2083 );
2084
2085 Ok(())
2086 }
2087
2088 #[test]
2089 fn test_get_state_storage_value_hashed_state() -> eyre::Result<()> {
2090 use alloy_primitives::{keccak256, U256};
2091 use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2092 use reth_primitives_traits::StorageEntry;
2093 use reth_storage_api::StorageSettingsCache;
2094 use std::collections::HashMap;
2095
2096 let address = alloy_primitives::Address::with_last_byte(1);
2097 let account = reth_primitives_traits::Account {
2098 nonce: 1,
2099 balance: U256::from(1000),
2100 bytecode_hash: None,
2101 };
2102 let slot = U256::from(0x42);
2103 let slot_b256 = B256::from(slot);
2104 let hashed_address = keccak256(address);
2105 let hashed_slot = keccak256(slot_b256);
2106
2107 let mut rng = generators::rng();
2108 let factory = create_test_provider_factory();
2109 factory.set_storage_settings_cache(StorageSettings::v2());
2110
2111 let blocks = random_block_range(
2112 &mut rng,
2113 0..=1,
2114 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2115 );
2116
2117 let provider_rw = factory.provider_rw()?;
2118 provider_rw.append_blocks_with_state(
2119 blocks
2120 .into_iter()
2121 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2122 .collect(),
2123 &ExecutionOutcome {
2124 bundle: BundleState::new(
2125 [(address, None, Some(account.into()), {
2126 let mut s = HashMap::default();
2127 s.insert(slot, (U256::ZERO, U256::from(100)));
2128 s
2129 })],
2130 [
2131 Vec::new(),
2132 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2133 ],
2134 [],
2135 ),
2136 first_block: 0,
2137 ..Default::default()
2138 },
2139 Default::default(),
2140 )?;
2141
2142 provider_rw.tx_ref().put::<tables::HashedStorages>(
2143 hashed_address,
2144 StorageEntry { key: hashed_slot, value: U256::from(100) },
2145 )?;
2146 provider_rw.tx_ref().put::<tables::HashedAccounts>(hashed_address, account)?;
2147
2148 provider_rw.commit()?;
2149
2150 let provider = BlockchainProvider::new(factory)?;
2151 let consistent_provider = provider.consistent_provider()?;
2152
2153 let outcome =
2154 consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2155
2156 let state = &outcome.bundle.state;
2157 let account_state = state.get(&address).expect("should have account in bundle state");
2158 let storage = &account_state.storage;
2159
2160 let slot_as_u256 = U256::from_be_bytes(*hashed_slot);
2161 let storage_slot = storage.get(&slot_as_u256).expect("should have the slot in storage");
2162
2163 assert_eq!(
2164 storage_slot.present_value,
2165 U256::from(100),
2166 "present_value should be 100 (the actual value in HashedStorages)"
2167 );
2168
2169 Ok(())
2170 }
2171
2172 #[test]
2173 #[cfg(all(unix, feature = "rocksdb"))]
2174 fn test_get_state_storage_value_hashed_state_historical() -> eyre::Result<()> {
2175 use alloy_primitives::{keccak256, U256};
2176 use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2177 use reth_primitives_traits::StorageEntry;
2178 use reth_storage_api::StorageSettingsCache;
2179 use std::collections::HashMap;
2180
2181 let address = alloy_primitives::Address::with_last_byte(1);
2182 let account = reth_primitives_traits::Account {
2183 nonce: 1,
2184 balance: U256::from(1000),
2185 bytecode_hash: None,
2186 };
2187 let slot = U256::from(0x42);
2188 let slot_b256 = B256::from(slot);
2189 let hashed_address = keccak256(address);
2190 let hashed_slot = keccak256(slot_b256);
2191
2192 let mut rng = generators::rng();
2193 let factory = create_test_provider_factory();
2194 factory.set_storage_settings_cache(StorageSettings::v2());
2195
2196 let blocks = random_block_range(
2197 &mut rng,
2198 0..=3,
2199 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2200 );
2201
2202 let provider_rw = factory.provider_rw()?;
2203 provider_rw.append_blocks_with_state(
2204 blocks
2205 .into_iter()
2206 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2207 .collect(),
2208 &ExecutionOutcome {
2209 bundle: BundleState::new(
2210 [(address, None, Some(account.into()), {
2211 let mut s = HashMap::default();
2212 s.insert(slot, (U256::ZERO, U256::from(300)));
2213 s
2214 })],
2215 [
2216 Vec::new(),
2217 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2218 vec![(address, Some(Some(account.into())), vec![(slot, U256::from(100))])],
2219 vec![(address, Some(Some(account.into())), vec![(slot, U256::from(200))])],
2220 ],
2221 [],
2222 ),
2223 first_block: 0,
2224 ..Default::default()
2225 },
2226 Default::default(),
2227 )?;
2228
2229 provider_rw.tx_ref().put::<tables::HashedStorages>(
2230 hashed_address,
2231 StorageEntry { key: hashed_slot, value: U256::from(300) },
2232 )?;
2233 provider_rw.tx_ref().put::<tables::HashedAccounts>(hashed_address, account)?;
2234
2235 provider_rw.commit()?;
2236
2237 let provider = BlockchainProvider::new(factory)?;
2238 let consistent_provider = provider.consistent_provider()?;
2239
2240 let outcome =
2241 consistent_provider.get_state(1..=2)?.expect("should return execution outcome");
2242
2243 let state = &outcome.bundle.state;
2244 let account_state = state.get(&address).expect("should have account in bundle state");
2245 let storage = &account_state.storage;
2246
2247 let slot_as_u256 = U256::from_be_bytes(*hashed_slot);
2248 let storage_slot = storage.get(&slot_as_u256).expect("should have the slot in storage");
2249
2250 assert_eq!(
2251 storage_slot.present_value,
2252 U256::from(200),
2253 "present_value should be 200 (the value at block 2, not 300 which is the latest)"
2254 );
2255
2256 Ok(())
2257 }
2258
2259 #[test]
2260 fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
2261 use alloy_primitives::U256;
2262 use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2263 use reth_primitives_traits::StorageEntry;
2264 use reth_storage_api::StorageSettingsCache;
2265 use std::collections::HashMap;
2266
2267 let address = alloy_primitives::Address::with_last_byte(1);
2268 let account = reth_primitives_traits::Account {
2269 nonce: 1,
2270 balance: U256::from(1000),
2271 bytecode_hash: None,
2272 };
2273 let slot = U256::from(0x42);
2274 let slot_b256 = B256::from(slot);
2275
2276 let mut rng = generators::rng();
2277 let factory = create_test_provider_factory();
2278 factory.set_storage_settings_cache(StorageSettings::v1());
2279
2280 let blocks = random_block_range(
2281 &mut rng,
2282 0..=1,
2283 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2284 );
2285
2286 let provider_rw = factory.provider_rw()?;
2287 provider_rw.append_blocks_with_state(
2288 blocks
2289 .into_iter()
2290 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2291 .collect(),
2292 &ExecutionOutcome {
2293 bundle: BundleState::new(
2294 [(address, None, Some(account.into()), {
2295 let mut s = HashMap::default();
2296 s.insert(slot, (U256::ZERO, U256::from(100)));
2297 s
2298 })],
2299 [
2300 Vec::new(),
2301 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2302 ],
2303 [],
2304 ),
2305 first_block: 0,
2306 ..Default::default()
2307 },
2308 Default::default(),
2309 )?;
2310
2311 provider_rw.tx_ref().put::<tables::PlainStorageState>(
2312 address,
2313 StorageEntry { key: slot_b256, value: U256::from(100) },
2314 )?;
2315 provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
2316
2317 provider_rw.commit()?;
2318
2319 let provider = BlockchainProvider::new(factory)?;
2320 let consistent_provider = provider.consistent_provider()?;
2321
2322 let outcome =
2323 consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2324
2325 let state = &outcome.bundle.state;
2326 let account_state = state.get(&address).expect("should have account in bundle state");
2327 let storage = &account_state.storage;
2328
2329 let storage_slot = storage.get(&slot).expect("should have the slot in storage");
2330
2331 assert_eq!(
2332 storage_slot.present_value,
2333 U256::from(100),
2334 "present_value should be 100 (the actual value in PlainStorageState)"
2335 );
2336
2337 Ok(())
2338 }
2339
2340 #[test]
2341 fn test_storage_changeset_consistent_keys_hashed_state() -> eyre::Result<()> {
2342 use alloy_primitives::{keccak256, U256};
2343 use reth_db_api::models::StorageSettings;
2344 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2345 use std::collections::HashMap;
2346
2347 let mut rng = generators::rng();
2348 let factory = create_test_provider_factory();
2349 factory.set_storage_settings_cache(StorageSettings::v2());
2350
2351 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2352
2353 let address = alloy_primitives::Address::with_last_byte(1);
2354 let account = reth_primitives_traits::Account {
2355 nonce: 1,
2356 balance: U256::from(1000),
2357 bytecode_hash: None,
2358 };
2359 let slot = U256::from(0x42);
2360
2361 let provider_rw = factory.provider_rw()?;
2362 provider_rw.append_blocks_with_state(
2363 database_blocks
2364 .into_iter()
2365 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2366 .collect(),
2367 &ExecutionOutcome {
2368 bundle: BundleState::new(
2369 [(address, None, Some(account.into()), {
2370 let mut s = HashMap::default();
2371 s.insert(slot, (U256::ZERO, U256::from(100)));
2372 s
2373 })],
2374 [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2375 [],
2376 ),
2377 first_block: 0,
2378 ..Default::default()
2379 },
2380 Default::default(),
2381 )?;
2382 provider_rw.commit()?;
2383
2384 let provider = BlockchainProvider::new(factory)?;
2385
2386 let in_mem_block = in_memory_blocks.first().unwrap();
2387 let senders = in_mem_block.senders().expect("failed to recover senders");
2388 let chain = NewCanonicalChain::Commit {
2389 new: vec![ExecutedBlock {
2390 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2391 in_mem_block.clone(),
2392 senders,
2393 )),
2394 execution_output: Arc::new(BlockExecutionOutput {
2395 state: BundleState::new(
2396 [(address, None, Some(account.into()), {
2397 let mut s = HashMap::default();
2398 s.insert(slot, (U256::from(100), U256::from(200)));
2399 s
2400 })],
2401 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2402 [],
2403 ),
2404 result: BlockExecutionResult {
2405 receipts: Default::default(),
2406 requests: Default::default(),
2407 gas_used: 0,
2408 blob_gas_used: 0,
2409 },
2410 }),
2411 ..Default::default()
2412 }],
2413 };
2414 provider.canonical_in_memory_state.update_chain(chain);
2415
2416 let consistent_provider = provider.consistent_provider()?;
2417
2418 let db_changeset = consistent_provider.storage_changeset(0)?;
2419 let mem_changeset = consistent_provider.storage_changeset(1)?;
2420
2421 let slot_b256 = B256::from(slot);
2422 let _hashed_slot_b256 = keccak256(slot_b256);
2423
2424 assert_eq!(db_changeset.len(), 1);
2425 assert_eq!(mem_changeset.len(), 1);
2426
2427 let db_key = db_changeset[0].1.key;
2428 let mem_key = mem_changeset[0].1.key;
2429
2430 assert_eq!(
2431 db_key, mem_key,
2432 "DB and in-memory changesets should return the same key format (hashed) for the same logical slot"
2433 );
2434
2435 Ok(())
2436 }
2437
2438 #[test]
2439 fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
2440 use alloy_primitives::U256;
2441 use reth_db_api::models::StorageSettings;
2442 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2443 use std::collections::HashMap;
2444
2445 let mut rng = generators::rng();
2446 let factory = create_test_provider_factory();
2447 factory.set_storage_settings_cache(StorageSettings::v1());
2448
2449 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2450
2451 let address = alloy_primitives::Address::with_last_byte(1);
2452 let account = reth_primitives_traits::Account {
2453 nonce: 1,
2454 balance: U256::from(1000),
2455 bytecode_hash: None,
2456 };
2457 let slot = U256::from(0x42);
2458
2459 let provider_rw = factory.provider_rw()?;
2460 provider_rw.append_blocks_with_state(
2461 database_blocks
2462 .into_iter()
2463 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2464 .collect(),
2465 &ExecutionOutcome {
2466 bundle: BundleState::new(
2467 [(address, None, Some(account.into()), {
2468 let mut s = HashMap::default();
2469 s.insert(slot, (U256::ZERO, U256::from(100)));
2470 s
2471 })],
2472 [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2473 [],
2474 ),
2475 first_block: 0,
2476 ..Default::default()
2477 },
2478 Default::default(),
2479 )?;
2480 provider_rw.commit()?;
2481
2482 let provider = BlockchainProvider::new(factory)?;
2483
2484 let in_mem_block = in_memory_blocks.first().unwrap();
2485 let senders = in_mem_block.senders().expect("failed to recover senders");
2486 let chain = NewCanonicalChain::Commit {
2487 new: vec![ExecutedBlock {
2488 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2489 in_mem_block.clone(),
2490 senders,
2491 )),
2492 execution_output: Arc::new(BlockExecutionOutput {
2493 state: BundleState::new(
2494 [(address, None, Some(account.into()), {
2495 let mut s = HashMap::default();
2496 s.insert(slot, (U256::from(100), U256::from(200)));
2497 s
2498 })],
2499 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2500 [],
2501 ),
2502 result: BlockExecutionResult {
2503 receipts: Default::default(),
2504 requests: Default::default(),
2505 gas_used: 0,
2506 blob_gas_used: 0,
2507 },
2508 }),
2509 ..Default::default()
2510 }],
2511 };
2512 provider.canonical_in_memory_state.update_chain(chain);
2513
2514 let consistent_provider = provider.consistent_provider()?;
2515
2516 let db_changeset = consistent_provider.storage_changeset(0)?;
2517 let mem_changeset = consistent_provider.storage_changeset(1)?;
2518
2519 let slot_b256 = B256::from(slot);
2520
2521 assert_eq!(db_changeset.len(), 1);
2522 assert_eq!(mem_changeset.len(), 1);
2523
2524 let db_key = db_changeset[0].1.key.as_b256();
2525 let mem_key = mem_changeset[0].1.key.as_b256();
2526
2527 assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2528 assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2529 assert_eq!(
2530 db_key, mem_key,
2531 "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2532 );
2533
2534 Ok(())
2535 }
2536
2537 #[test]
2538 fn test_storage_changesets_range_consistent_keys_hashed_state() -> eyre::Result<()> {
2539 use alloy_primitives::U256;
2540 use reth_db_api::models::StorageSettings;
2541 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2542 use std::collections::HashMap;
2543
2544 let mut rng = generators::rng();
2545 let factory = create_test_provider_factory();
2546 factory.set_storage_settings_cache(StorageSettings::v2());
2547
2548 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2549
2550 let address = alloy_primitives::Address::with_last_byte(1);
2551 let account = reth_primitives_traits::Account {
2552 nonce: 1,
2553 balance: U256::from(1000),
2554 bytecode_hash: None,
2555 };
2556 let slot = U256::from(0x42);
2557
2558 let provider_rw = factory.provider_rw()?;
2559 provider_rw.append_blocks_with_state(
2560 database_blocks
2561 .into_iter()
2562 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2563 .collect(),
2564 &ExecutionOutcome {
2565 bundle: BundleState::new(
2566 [(address, None, Some(account.into()), {
2567 let mut s = HashMap::default();
2568 s.insert(slot, (U256::ZERO, U256::from(100)));
2569 s
2570 })],
2571 vec![
2572 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2573 vec![],
2574 ],
2575 [],
2576 ),
2577 first_block: 0,
2578 ..Default::default()
2579 },
2580 Default::default(),
2581 )?;
2582 provider_rw.commit()?;
2583
2584 let provider = BlockchainProvider::new(factory)?;
2585
2586 let in_mem_block = in_memory_blocks.first().unwrap();
2587 let senders = in_mem_block.senders().expect("failed to recover senders");
2588 let chain = NewCanonicalChain::Commit {
2589 new: vec![ExecutedBlock {
2590 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2591 in_mem_block.clone(),
2592 senders,
2593 )),
2594 execution_output: Arc::new(BlockExecutionOutput {
2595 state: BundleState::new(
2596 [(address, None, Some(account.into()), {
2597 let mut s = HashMap::default();
2598 s.insert(slot, (U256::from(100), U256::from(200)));
2599 s
2600 })],
2601 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2602 [],
2603 ),
2604 result: BlockExecutionResult {
2605 receipts: Default::default(),
2606 requests: Default::default(),
2607 gas_used: 0,
2608 blob_gas_used: 0,
2609 },
2610 }),
2611 ..Default::default()
2612 }],
2613 };
2614 provider.canonical_in_memory_state.update_chain(chain);
2615
2616 let consistent_provider = provider.consistent_provider()?;
2617
2618 let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2619
2620 assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2621
2622 let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key.as_b256()).collect();
2623
2624 assert_eq!(
2625 keys[0], keys[1],
2626 "same logical slot should produce identical keys whether from DB or memory"
2627 );
2628
2629 Ok(())
2630 }
2631
2632 #[test]
2633 fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2634 use alloy_primitives::U256;
2635 use reth_db_api::models::StorageSettings;
2636 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2637 use std::collections::HashMap;
2638
2639 let mut rng = generators::rng();
2640 let factory = create_test_provider_factory();
2641 factory.set_storage_settings_cache(StorageSettings::v1());
2642
2643 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2644
2645 let address = alloy_primitives::Address::with_last_byte(1);
2646 let account = reth_primitives_traits::Account {
2647 nonce: 1,
2648 balance: U256::from(1000),
2649 bytecode_hash: None,
2650 };
2651 let slot = U256::from(0x42);
2652
2653 let provider_rw = factory.provider_rw()?;
2654 provider_rw.append_blocks_with_state(
2655 database_blocks
2656 .into_iter()
2657 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2658 .collect(),
2659 &ExecutionOutcome {
2660 bundle: BundleState::new(
2661 [(address, None, Some(account.into()), {
2662 let mut s = HashMap::default();
2663 s.insert(slot, (U256::ZERO, U256::from(100)));
2664 s
2665 })],
2666 vec![
2667 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2668 vec![],
2669 ],
2670 [],
2671 ),
2672 first_block: 0,
2673 ..Default::default()
2674 },
2675 Default::default(),
2676 )?;
2677 provider_rw.commit()?;
2678
2679 let provider = BlockchainProvider::new(factory)?;
2680
2681 let in_mem_block = in_memory_blocks.first().unwrap();
2682 let senders = in_mem_block.senders().expect("failed to recover senders");
2683 let chain = NewCanonicalChain::Commit {
2684 new: vec![ExecutedBlock {
2685 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2686 in_mem_block.clone(),
2687 senders,
2688 )),
2689 execution_output: Arc::new(BlockExecutionOutput {
2690 state: BundleState::new(
2691 [(address, None, Some(account.into()), {
2692 let mut s = HashMap::default();
2693 s.insert(slot, (U256::from(100), U256::from(200)));
2694 s
2695 })],
2696 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2697 [],
2698 ),
2699 result: BlockExecutionResult {
2700 receipts: Default::default(),
2701 requests: Default::default(),
2702 gas_used: 0,
2703 blob_gas_used: 0,
2704 },
2705 }),
2706 ..Default::default()
2707 }],
2708 };
2709 provider.canonical_in_memory_state.update_chain(chain);
2710
2711 let consistent_provider = provider.consistent_provider()?;
2712
2713 let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2714
2715 assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2716
2717 let slot_b256 = B256::from(slot);
2718 let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key.as_b256()).collect();
2719
2720 assert_eq!(
2721 keys[0], keys[1],
2722 "same logical slot should produce identical keys whether from DB or memory"
2723 );
2724 assert_eq!(
2725 keys[0], slot_b256,
2726 "keys should be plain/unhashed when use_hashed_state is false"
2727 );
2728
2729 Ok(())
2730 }
2731}