1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4 to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5 BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8 TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13 HashOrNumber,
14};
15use alloy_primitives::{
16 map::{hash_map, HashMap},
17 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
25use reth_prune_types::{PruneCheckpoint, PruneSegment};
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::{
29 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30 StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
224 &self,
225 account_changeset: Vec<(u64, AccountBeforeTx)>,
226 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
227 block_range_end: BlockNumber,
228 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
229 let mut state: BundleStateInit = HashMap::default();
230 let mut reverts: RevertsInit = HashMap::default();
231 let state_provider = self.state_by_block_number_ref(block_range_end)?;
232
233 for (block_number, account_before) in account_changeset.into_iter().rev() {
235 let AccountBeforeTx { info: old_info, address } = account_before;
236 match state.entry(address) {
237 hash_map::Entry::Vacant(entry) => {
238 let new_info = state_provider.basic_account(&address)?;
239 entry.insert((old_info, new_info, HashMap::default()));
240 }
241 hash_map::Entry::Occupied(mut entry) => {
242 entry.get_mut().0 = old_info;
244 }
245 }
246 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
248 }
249
250 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
252 let BlockNumberAddress((block_number, address)) = block_and_address;
253 let account_state = match state.entry(address) {
255 hash_map::Entry::Vacant(entry) => {
256 let present_info = state_provider.basic_account(&address)?;
257 entry.insert((present_info, present_info, HashMap::default()))
258 }
259 hash_map::Entry::Occupied(entry) => entry.into_mut(),
260 };
261
262 match account_state.2.entry(old_storage.key) {
264 hash_map::Entry::Vacant(entry) => {
265 let new_storage_value =
266 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
267 entry.insert((old_storage.value, new_storage_value));
268 }
269 hash_map::Entry::Occupied(mut entry) => {
270 entry.get_mut().0 = old_storage.value;
271 }
272 };
273
274 reverts
275 .entry(block_number)
276 .or_default()
277 .entry(address)
278 .or_default()
279 .1
280 .push(old_storage);
281 }
282
283 Ok((state, reverts))
284 }
285
286 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
298 &self,
299 range: impl RangeBounds<BlockNumber>,
300 fetch_db_range: F,
301 map_block_state_item: G,
302 mut predicate: P,
303 ) -> ProviderResult<Vec<T>>
304 where
305 F: FnOnce(
306 &DatabaseProviderRO<N::DB, N>,
307 RangeInclusive<BlockNumber>,
308 &mut P,
309 ) -> ProviderResult<Vec<T>>,
310 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
311 P: FnMut(&T) -> bool,
312 {
313 let mut in_memory_chain =
321 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
322 let db_provider = &self.storage_provider;
323
324 let (start, end) = self.convert_range_bounds(range, || {
325 in_memory_chain
327 .first()
328 .map(|b| b.number())
329 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
330 });
331
332 if start > end {
333 return Ok(vec![])
334 }
335
336 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
341 Some(lowest_memory_block) if lowest_memory_block <= end => {
342 let highest_memory_block =
343 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
344
345 let in_memory_range =
349 lowest_memory_block.max(start)..=end.min(highest_memory_block);
350
351 in_memory_chain.truncate(
354 in_memory_chain
355 .len()
356 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
357 );
358
359 let storage_range =
360 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
361
362 (Some((in_memory_chain, in_memory_range)), storage_range)
363 }
364 _ => {
365 drop(in_memory_chain);
367
368 (None, Some(start..=end))
369 }
370 };
371
372 let mut items = Vec::with_capacity((end - start + 1) as usize);
373
374 if let Some(storage_range) = storage_range {
375 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
376 items.append(&mut db_items);
377
378 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
381 return Ok(items)
382 }
383 }
384
385 if let Some((in_memory_chain, in_memory_range)) = in_memory {
386 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
387 debug_assert!(num == block.number());
388 if let Some(item) = map_block_state_item(block, &mut predicate) {
389 items.push(item);
390 } else {
391 break
392 }
393 }
394 }
395
396 Ok(items)
397 }
398
399 fn block_state_provider_ref(
401 &self,
402 state: &BlockState<N::Primitives>,
403 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
404 let anchor_hash = state.anchor().hash;
405 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
406 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
407 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
408 }
409
410 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
416 &self,
417 range: impl RangeBounds<BlockNumber>,
418 fetch_from_db: S,
419 fetch_from_block_state: M,
420 ) -> ProviderResult<Vec<R>>
421 where
422 S: FnOnce(
423 &DatabaseProviderRO<N::DB, N>,
424 RangeInclusive<TxNumber>,
425 ) -> ProviderResult<Vec<R>>,
426 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
427 {
428 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
429 let provider = &self.storage_provider;
430
431 let last_database_block_number = in_mem_chain
434 .last()
435 .map(|b| Ok(b.anchor().number))
436 .unwrap_or_else(|| provider.last_block_number())?;
437
438 let last_block_body_index = provider
441 .block_body_indices(last_database_block_number)?
442 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
443 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
444
445 let (start, end) = self.convert_range_bounds(range, || {
446 in_mem_chain
447 .iter()
448 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
449 .sum::<u64>() +
450 last_block_body_index.last_tx_num()
451 });
452
453 if start > end {
454 return Ok(vec![])
455 }
456
457 let mut tx_range = start..=end;
458
459 if *tx_range.end() < in_memory_tx_num {
462 return fetch_from_db(provider, tx_range);
463 }
464
465 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
466
467 if *tx_range.start() < in_memory_tx_num {
469 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
471
472 tx_range = in_memory_tx_num..=*tx_range.end();
474
475 items.extend(fetch_from_db(provider, db_range)?);
476 }
477
478 for block_state in in_mem_chain.iter().rev() {
480 let block_tx_count =
481 block_state.block_ref().recovered_block().body().transactions().len();
482 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
483
484 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
487 in_memory_tx_num += block_tx_count as u64;
488 continue
489 }
490
491 let skip = (tx_range.start() - in_memory_tx_num) as usize;
493
494 items.extend(fetch_from_block_state(
495 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
496 block_state,
497 )?);
498
499 in_memory_tx_num += block_tx_count as u64;
500
501 if in_memory_tx_num > *tx_range.end() {
503 break
504 }
505
506 tx_range = in_memory_tx_num..=*tx_range.end();
508 }
509
510 Ok(items)
511 }
512
513 fn get_in_memory_or_storage_by_tx<S, M, R>(
516 &self,
517 id: HashOrNumber,
518 fetch_from_db: S,
519 fetch_from_block_state: M,
520 ) -> ProviderResult<Option<R>>
521 where
522 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
523 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
524 {
525 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
526 let provider = &self.storage_provider;
527
528 let last_database_block_number = in_mem_chain
531 .last()
532 .map(|b| Ok(b.anchor().number))
533 .unwrap_or_else(|| provider.last_block_number())?;
534
535 let last_block_body_index = provider
538 .block_body_indices(last_database_block_number)?
539 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
540 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
541
542 if let HashOrNumber::Number(id) = id &&
545 id < in_memory_tx_num
546 {
547 return fetch_from_db(provider)
548 }
549
550 for block_state in in_mem_chain.iter().rev() {
552 let executed_block = block_state.block_ref();
553 let block = executed_block.recovered_block();
554
555 for tx_index in 0..block.body().transactions().len() {
556 match id {
557 HashOrNumber::Hash(tx_hash) => {
558 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
559 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560 }
561 }
562 HashOrNumber::Number(id) => {
563 if id == in_memory_tx_num {
564 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565 }
566 }
567 }
568
569 in_memory_tx_num += 1;
570 }
571 }
572
573 if let HashOrNumber::Hash(_) = id {
575 return fetch_from_db(provider)
576 }
577
578 Ok(None)
579 }
580
581 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
583 &self,
584 id: BlockHashOrNumber,
585 fetch_from_db: S,
586 fetch_from_block_state: M,
587 ) -> ProviderResult<R>
588 where
589 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
590 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
591 {
592 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
593 return fetch_from_block_state(block_state)
594 }
595 fetch_from_db(&self.storage_provider)
596 }
597
598 pub(crate) fn into_state_provider_at_block_hash(
600 self,
601 block_hash: BlockHash,
602 ) -> ProviderResult<StateProviderBox> {
603 let Self { storage_provider, head_block, .. } = self;
604 let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
605 let block_number = storage_provider
606 .block_number(block_hash)?
607 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
608 storage_provider.try_into_history_at_block(block_number)
609 };
610 if let Some(Some(block_state)) =
611 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
612 {
613 let anchor_hash = block_state.anchor().hash;
614 let latest_historical = into_history_at_block_hash(anchor_hash)?;
615 return Ok(Box::new(block_state.state_provider(latest_historical)));
616 }
617 into_history_at_block_hash(block_hash)
618 }
619}
620
621impl<N: ProviderNodeTypes> ConsistentProvider<N> {
622 #[inline]
631 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
632 let latest = self.best_block_number()?;
633 if block_number > latest {
634 Err(ProviderError::HeaderNotFound(block_number.into()))
635 } else {
636 Ok(())
637 }
638 }
639}
640
641impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
642 type Primitives = N::Primitives;
643}
644
645impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
646 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
647 self.storage_provider.static_file_provider()
648 }
649
650 fn get_static_file_writer(
651 &self,
652 block: BlockNumber,
653 segment: StaticFileSegment,
654 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
655 self.storage_provider.get_static_file_writer(block, segment)
656 }
657}
658
659impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
660 type Header = HeaderTy<N>;
661
662 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
663 self.get_in_memory_or_storage_by_block(
664 block_hash.into(),
665 |db_provider| db_provider.header(block_hash),
666 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
667 )
668 }
669
670 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
671 self.get_in_memory_or_storage_by_block(
672 num.into(),
673 |db_provider| db_provider.header_by_number(num),
674 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
675 )
676 }
677
678 fn headers_range(
679 &self,
680 range: impl RangeBounds<BlockNumber>,
681 ) -> ProviderResult<Vec<Self::Header>> {
682 self.get_in_memory_or_storage_by_block_range_while(
683 range,
684 |db_provider, range, _| db_provider.headers_range(range),
685 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
686 |_| true,
687 )
688 }
689
690 fn sealed_header(
691 &self,
692 number: BlockNumber,
693 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
694 self.get_in_memory_or_storage_by_block(
695 number.into(),
696 |db_provider| db_provider.sealed_header(number),
697 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
698 )
699 }
700
701 fn sealed_headers_range(
702 &self,
703 range: impl RangeBounds<BlockNumber>,
704 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
705 self.get_in_memory_or_storage_by_block_range_while(
706 range,
707 |db_provider, range, _| db_provider.sealed_headers_range(range),
708 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
709 |_| true,
710 )
711 }
712
713 fn sealed_headers_while(
714 &self,
715 range: impl RangeBounds<BlockNumber>,
716 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
717 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
718 self.get_in_memory_or_storage_by_block_range_while(
719 range,
720 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
721 |block_state, predicate| {
722 let header = block_state.block_ref().recovered_block().sealed_header();
723 predicate(header).then(|| header.clone())
724 },
725 predicate,
726 )
727 }
728}
729
730impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
731 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
732 self.get_in_memory_or_storage_by_block(
733 number.into(),
734 |db_provider| db_provider.block_hash(number),
735 |block_state| Ok(Some(block_state.hash())),
736 )
737 }
738
739 fn canonical_hashes_range(
740 &self,
741 start: BlockNumber,
742 end: BlockNumber,
743 ) -> ProviderResult<Vec<B256>> {
744 self.get_in_memory_or_storage_by_block_range_while(
745 start..end,
746 |db_provider, inclusive_range, _| {
747 db_provider
748 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
749 },
750 |block_state, _| Some(block_state.hash()),
751 |_| true,
752 )
753 }
754}
755
756impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
757 fn chain_info(&self) -> ProviderResult<ChainInfo> {
758 let best_number = self.best_block_number()?;
759 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
760 }
761
762 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
763 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
764 }
765
766 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
767 self.storage_provider.last_block_number()
768 }
769
770 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
771 self.get_in_memory_or_storage_by_block(
772 hash.into(),
773 |db_provider| db_provider.block_number(hash),
774 |block_state| Ok(Some(block_state.number())),
775 )
776 }
777}
778
779impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
780 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
781 Ok(self.canonical_in_memory_state.pending_block_num_hash())
782 }
783
784 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
785 Ok(self.canonical_in_memory_state.get_safe_num_hash())
786 }
787
788 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
789 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
790 }
791}
792
793impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
794 type Block = BlockTy<N>;
795
796 fn find_block_by_hash(
797 &self,
798 hash: B256,
799 source: BlockSource,
800 ) -> ProviderResult<Option<Self::Block>> {
801 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
802 let Some(block) = self.get_in_memory_or_storage_by_block(
803 hash.into(),
804 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
805 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
806 )?
807 {
808 return Ok(Some(block))
809 }
810
811 if matches!(source, BlockSource::Pending | BlockSource::Any) {
812 return Ok(self
813 .canonical_in_memory_state
814 .pending_block()
815 .filter(|b| b.hash() == hash)
816 .map(|b| b.into_block()))
817 }
818
819 Ok(None)
820 }
821
822 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
823 self.get_in_memory_or_storage_by_block(
824 id,
825 |db_provider| db_provider.block(id),
826 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
827 )
828 }
829
830 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
831 Ok(self.canonical_in_memory_state.pending_recovered_block())
832 }
833
834 fn pending_block_and_receipts(
835 &self,
836 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
837 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
838 }
839
840 fn recovered_block(
847 &self,
848 id: BlockHashOrNumber,
849 transaction_kind: TransactionVariant,
850 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
851 self.get_in_memory_or_storage_by_block(
852 id,
853 |db_provider| db_provider.recovered_block(id, transaction_kind),
854 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
855 )
856 }
857
858 fn sealed_block_with_senders(
859 &self,
860 id: BlockHashOrNumber,
861 transaction_kind: TransactionVariant,
862 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
863 self.get_in_memory_or_storage_by_block(
864 id,
865 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
866 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
867 )
868 }
869
870 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
871 self.get_in_memory_or_storage_by_block_range_while(
872 range,
873 |db_provider, range, _| db_provider.block_range(range),
874 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
875 |_| true,
876 )
877 }
878
879 fn block_with_senders_range(
880 &self,
881 range: RangeInclusive<BlockNumber>,
882 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
883 self.get_in_memory_or_storage_by_block_range_while(
884 range,
885 |db_provider, range, _| db_provider.block_with_senders_range(range),
886 |block_state, _| Some(block_state.block().recovered_block().clone()),
887 |_| true,
888 )
889 }
890
891 fn recovered_block_range(
892 &self,
893 range: RangeInclusive<BlockNumber>,
894 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
895 self.get_in_memory_or_storage_by_block_range_while(
896 range,
897 |db_provider, range, _| db_provider.recovered_block_range(range),
898 |block_state, _| Some(block_state.block().recovered_block().clone()),
899 |_| true,
900 )
901 }
902
903 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
904 self.get_in_memory_or_storage_by_tx(
905 id.into(),
906 |db_provider| db_provider.block_by_transaction_id(id),
907 |_, _, block_state| Ok(Some(block_state.number())),
908 )
909 }
910}
911
912impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
913 type Transaction = TxTy<N>;
914
915 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
916 self.get_in_memory_or_storage_by_tx(
917 tx_hash.into(),
918 |db_provider| db_provider.transaction_id(tx_hash),
919 |_, tx_number, _| Ok(Some(tx_number)),
920 )
921 }
922
923 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
924 self.get_in_memory_or_storage_by_tx(
925 id.into(),
926 |provider| provider.transaction_by_id(id),
927 |tx_index, _, block_state| {
928 Ok(block_state
929 .block_ref()
930 .recovered_block()
931 .body()
932 .transactions()
933 .get(tx_index)
934 .cloned())
935 },
936 )
937 }
938
939 fn transaction_by_id_unhashed(
940 &self,
941 id: TxNumber,
942 ) -> ProviderResult<Option<Self::Transaction>> {
943 self.get_in_memory_or_storage_by_tx(
944 id.into(),
945 |provider| provider.transaction_by_id_unhashed(id),
946 |tx_index, _, block_state| {
947 Ok(block_state
948 .block_ref()
949 .recovered_block()
950 .body()
951 .transactions()
952 .get(tx_index)
953 .cloned())
954 },
955 )
956 }
957
958 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
959 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
960 return Ok(Some(tx))
961 }
962
963 self.storage_provider.transaction_by_hash(hash)
964 }
965
966 fn transaction_by_hash_with_meta(
967 &self,
968 tx_hash: TxHash,
969 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
970 if let Some((tx, meta)) =
971 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
972 {
973 return Ok(Some((tx, meta)))
974 }
975
976 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
977 }
978
979 fn transactions_by_block(
980 &self,
981 id: BlockHashOrNumber,
982 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
983 self.get_in_memory_or_storage_by_block(
984 id,
985 |provider| provider.transactions_by_block(id),
986 |block_state| {
987 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
988 },
989 )
990 }
991
992 fn transactions_by_block_range(
993 &self,
994 range: impl RangeBounds<BlockNumber>,
995 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
996 self.get_in_memory_or_storage_by_block_range_while(
997 range,
998 |db_provider, range, _| db_provider.transactions_by_block_range(range),
999 |block_state, _| {
1000 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1001 },
1002 |_| true,
1003 )
1004 }
1005
1006 fn transactions_by_tx_range(
1007 &self,
1008 range: impl RangeBounds<TxNumber>,
1009 ) -> ProviderResult<Vec<Self::Transaction>> {
1010 self.get_in_memory_or_storage_by_tx_range(
1011 range,
1012 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1013 |index_range, block_state| {
1014 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1015 .to_vec())
1016 },
1017 )
1018 }
1019
1020 fn senders_by_tx_range(
1021 &self,
1022 range: impl RangeBounds<TxNumber>,
1023 ) -> ProviderResult<Vec<Address>> {
1024 self.get_in_memory_or_storage_by_tx_range(
1025 range,
1026 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1027 |index_range, block_state| {
1028 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1029 },
1030 )
1031 }
1032
1033 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1034 self.get_in_memory_or_storage_by_tx(
1035 id.into(),
1036 |provider| provider.transaction_sender(id),
1037 |tx_index, _, block_state| {
1038 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1039 },
1040 )
1041 }
1042}
1043
1044impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1045 type Receipt = ReceiptTy<N>;
1046
1047 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1048 self.get_in_memory_or_storage_by_tx(
1049 id.into(),
1050 |provider| provider.receipt(id),
1051 |tx_index, _, block_state| {
1052 Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1053 },
1054 )
1055 }
1056
1057 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1058 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1059 let executed_block = block_state.block_ref();
1060 let block = executed_block.recovered_block();
1061 let receipts = block_state.executed_block_receipts_ref();
1062
1063 debug_assert_eq!(
1065 block.body().transactions().len(),
1066 receipts.len(),
1067 "Mismatch between transaction and receipt count"
1068 );
1069
1070 if let Some(tx_index) =
1071 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1072 {
1073 return Ok(receipts.get(tx_index).cloned());
1075 }
1076 }
1077
1078 self.storage_provider.receipt_by_hash(hash)
1079 }
1080
1081 fn receipts_by_block(
1082 &self,
1083 block: BlockHashOrNumber,
1084 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1085 self.get_in_memory_or_storage_by_block(
1086 block,
1087 |db_provider| db_provider.receipts_by_block(block),
1088 |block_state| Ok(Some(block_state.executed_block_receipts())),
1089 )
1090 }
1091
1092 fn receipts_by_tx_range(
1093 &self,
1094 range: impl RangeBounds<TxNumber>,
1095 ) -> ProviderResult<Vec<Self::Receipt>> {
1096 self.get_in_memory_or_storage_by_tx_range(
1097 range,
1098 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1099 |index_range, block_state| {
1100 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1101 },
1102 )
1103 }
1104
1105 fn receipts_by_block_range(
1106 &self,
1107 block_range: RangeInclusive<BlockNumber>,
1108 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1109 self.storage_provider.receipts_by_block_range(block_range)
1110 }
1111}
1112
1113impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1114 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1115 match block {
1116 BlockId::Hash(rpc_block_hash) => {
1117 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1118 if receipts.is_none() &&
1119 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1120 let Some(state) = self
1121 .head_block
1122 .as_ref()
1123 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1124 {
1125 receipts = Some(state.executed_block_receipts());
1126 }
1127 Ok(receipts)
1128 }
1129 BlockId::Number(num_tag) => match num_tag {
1130 BlockNumberOrTag::Pending => Ok(self
1131 .canonical_in_memory_state
1132 .pending_state()
1133 .map(|block_state| block_state.executed_block_receipts())),
1134 _ => {
1135 if let Some(num) = self.convert_block_number(num_tag)? {
1136 self.receipts_by_block(num.into())
1137 } else {
1138 Ok(None)
1139 }
1140 }
1141 },
1142 }
1143 }
1144}
1145
1146impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1147 fn block_body_indices(
1148 &self,
1149 number: BlockNumber,
1150 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1151 self.get_in_memory_or_storage_by_block(
1152 number.into(),
1153 |db_provider| db_provider.block_body_indices(number),
1154 |block_state| {
1155 let last_storage_block_number = block_state.anchor().number;
1157 let mut stored_indices = self
1158 .storage_provider
1159 .block_body_indices(last_storage_block_number)?
1160 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1161
1162 stored_indices.first_tx_num = stored_indices.next_tx_num();
1164 stored_indices.tx_count = 0;
1165
1166 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1168 let block_tx_count =
1169 state.block_ref().recovered_block().body().transactions().len() as u64;
1170 if state.block_ref().recovered_block().number() == number {
1171 stored_indices.tx_count = block_tx_count;
1172 } else {
1173 stored_indices.first_tx_num += block_tx_count;
1174 }
1175 }
1176
1177 Ok(Some(stored_indices))
1178 },
1179 )
1180 }
1181
1182 fn block_body_indices_range(
1183 &self,
1184 range: RangeInclusive<BlockNumber>,
1185 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1186 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1187 }
1188}
1189
1190impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1191 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1192 self.storage_provider.get_stage_checkpoint(id)
1193 }
1194
1195 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1196 self.storage_provider.get_stage_checkpoint_progress(id)
1197 }
1198
1199 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1200 self.storage_provider.get_all_checkpoints()
1201 }
1202}
1203
1204impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1205 fn get_prune_checkpoint(
1206 &self,
1207 segment: PruneSegment,
1208 ) -> ProviderResult<Option<PruneCheckpoint>> {
1209 self.storage_provider.get_prune_checkpoint(segment)
1210 }
1211
1212 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1213 self.storage_provider.get_prune_checkpoints()
1214 }
1215}
1216
1217impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1218 type ChainSpec = N::ChainSpec;
1219
1220 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1221 ChainSpecProvider::chain_spec(&self.storage_provider)
1222 }
1223}
1224
1225impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1226 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1227 match id {
1228 BlockId::Number(num) => self.block_by_number_or_tag(num),
1229 BlockId::Hash(hash) => {
1230 if Some(true) == hash.require_canonical {
1235 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1237 } else {
1238 self.block_by_hash(hash.block_hash)
1239 }
1240 }
1241 }
1242 }
1243
1244 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1245 Ok(match id {
1246 BlockNumberOrTag::Latest => {
1247 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1248 }
1249 BlockNumberOrTag::Finalized => {
1250 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1251 }
1252 BlockNumberOrTag::Safe => {
1253 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1254 }
1255 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1256 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1257
1258 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1259 })
1260 }
1261
1262 fn sealed_header_by_number_or_tag(
1263 &self,
1264 id: BlockNumberOrTag,
1265 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1266 match id {
1267 BlockNumberOrTag::Latest => {
1268 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1269 }
1270 BlockNumberOrTag::Finalized => {
1271 Ok(self.canonical_in_memory_state.get_finalized_header())
1272 }
1273 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1274 BlockNumberOrTag::Earliest => self
1275 .header_by_number(self.earliest_block_number()?)?
1276 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1277 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1278 BlockNumberOrTag::Number(num) => self
1279 .header_by_number(num)?
1280 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1281 }
1282 }
1283
1284 fn sealed_header_by_id(
1285 &self,
1286 id: BlockId,
1287 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1288 Ok(match id {
1289 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1290 BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1291 })
1292 }
1293
1294 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1295 Ok(match id {
1296 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1297 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1298 })
1299 }
1300}
1301
1302impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1303 fn storage_changeset(
1304 &self,
1305 block_number: BlockNumber,
1306 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1307 if let Some(state) =
1308 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1309 {
1310 let changesets = state
1311 .block()
1312 .execution_output
1313 .state
1314 .reverts
1315 .clone()
1316 .to_plain_state_reverts()
1317 .storage
1318 .into_iter()
1319 .flatten()
1320 .flat_map(|revert: PlainStorageRevert| {
1321 revert.storage_revert.into_iter().map(move |(key, value)| {
1322 let plain_key = B256::from(key.to_be_bytes());
1323 (
1324 BlockNumberAddress((block_number, revert.address)),
1325 StorageEntry { key: plain_key, value: value.to_previous_value() },
1326 )
1327 })
1328 })
1329 .collect();
1330 Ok(changesets)
1331 } else {
1332 let storage_history_exists = self
1336 .storage_provider
1337 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1338 .and_then(|checkpoint| {
1339 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1344 })
1345 .unwrap_or(true);
1346
1347 if !storage_history_exists {
1348 return Err(ProviderError::StateAtBlockPruned(block_number))
1349 }
1350
1351 self.storage_provider.storage_changeset(block_number)
1352 }
1353 }
1354
1355 fn get_storage_before_block(
1356 &self,
1357 block_number: BlockNumber,
1358 address: Address,
1359 storage_key: B256,
1360 ) -> ProviderResult<Option<StorageEntry>> {
1361 if let Some(state) =
1362 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1363 {
1364 let changeset = state
1365 .block_ref()
1366 .execution_output
1367 .state
1368 .reverts
1369 .clone()
1370 .to_plain_state_reverts()
1371 .storage
1372 .into_iter()
1373 .flatten()
1374 .find_map(|revert: PlainStorageRevert| {
1375 if revert.address != address {
1376 return None
1377 }
1378 revert.storage_revert.into_iter().find_map(|(key, value)| {
1379 let plain_key = B256::from(key.to_be_bytes());
1380 (plain_key == storage_key).then(|| StorageEntry {
1381 key: plain_key,
1382 value: value.to_previous_value(),
1383 })
1384 })
1385 });
1386 Ok(changeset)
1387 } else {
1388 let storage_history_exists = self
1389 .storage_provider
1390 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1391 .and_then(|checkpoint| {
1392 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1393 })
1394 .unwrap_or(true);
1395
1396 if !storage_history_exists {
1397 return Err(ProviderError::StateAtBlockPruned(block_number))
1398 }
1399
1400 self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1401 }
1402 }
1403
1404 fn storage_changesets_range(
1405 &self,
1406 range: impl RangeBounds<BlockNumber>,
1407 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1408 let range = to_range(range);
1409 let mut changesets = Vec::new();
1410 let database_start = range.start;
1411 let mut database_end = range.end;
1412
1413 if let Some(head_block) = &self.head_block {
1414 database_end = head_block.anchor().number;
1415
1416 for state in head_block.chain() {
1417 let block_changesets = state
1418 .block_ref()
1419 .execution_output
1420 .state
1421 .reverts
1422 .clone()
1423 .to_plain_state_reverts()
1424 .storage
1425 .into_iter()
1426 .flatten()
1427 .flat_map(|revert: PlainStorageRevert| {
1428 revert.storage_revert.into_iter().map(move |(key, value)| {
1429 let plain_key = B256::from(key.to_be_bytes());
1430 (
1431 BlockNumberAddress((state.number(), revert.address)),
1432 StorageEntry { key: plain_key, value: value.to_previous_value() },
1433 )
1434 })
1435 });
1436
1437 changesets.extend(block_changesets);
1438 }
1439 }
1440
1441 if database_start < database_end {
1442 let storage_history_exists = self
1443 .storage_provider
1444 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1445 .and_then(|checkpoint| {
1446 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1447 })
1448 .unwrap_or(true);
1449
1450 if !storage_history_exists {
1451 return Err(ProviderError::StateAtBlockPruned(database_start))
1452 }
1453
1454 let db_changesets = self
1455 .storage_provider
1456 .storage_changesets_range(database_start..=database_end - 1)?;
1457 changesets.extend(db_changesets);
1458 }
1459
1460 changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1461
1462 Ok(changesets)
1463 }
1464
1465 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1466 let mut count = 0;
1467 if let Some(head_block) = &self.head_block {
1468 for state in head_block.chain() {
1469 count += state
1470 .block_ref()
1471 .execution_output
1472 .state
1473 .reverts
1474 .clone()
1475 .to_plain_state_reverts()
1476 .storage
1477 .into_iter()
1478 .flatten()
1479 .map(|revert: PlainStorageRevert| revert.storage_revert.len())
1480 .sum::<usize>();
1481 }
1482 }
1483
1484 count += self.storage_provider.storage_changeset_count()?;
1485
1486 Ok(count)
1487 }
1488}
1489
1490impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1491 fn account_block_changeset(
1492 &self,
1493 block_number: BlockNumber,
1494 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1495 if let Some(state) =
1496 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1497 {
1498 let changesets = state
1499 .block_ref()
1500 .execution_output
1501 .state
1502 .reverts
1503 .clone()
1504 .to_plain_state_reverts()
1505 .accounts
1506 .into_iter()
1507 .flatten()
1508 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1509 .collect();
1510 Ok(changesets)
1511 } else {
1512 let account_history_exists = self
1516 .storage_provider
1517 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1518 .and_then(|checkpoint| {
1519 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1524 })
1525 .unwrap_or(true);
1526
1527 if !account_history_exists {
1528 return Err(ProviderError::StateAtBlockPruned(block_number))
1529 }
1530
1531 self.storage_provider.account_block_changeset(block_number)
1532 }
1533 }
1534
1535 fn get_account_before_block(
1536 &self,
1537 block_number: BlockNumber,
1538 address: Address,
1539 ) -> ProviderResult<Option<AccountBeforeTx>> {
1540 if let Some(state) =
1541 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1542 {
1543 let changeset = state
1545 .block_ref()
1546 .execution_output
1547 .state
1548 .reverts
1549 .clone()
1550 .to_plain_state_reverts()
1551 .accounts
1552 .into_iter()
1553 .flatten()
1554 .find(|(addr, _)| addr == &address)
1555 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1556 Ok(changeset)
1557 } else {
1558 let account_history_exists = self
1561 .storage_provider
1562 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1563 .and_then(|checkpoint| {
1564 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1569 })
1570 .unwrap_or(true);
1571
1572 if !account_history_exists {
1573 return Err(ProviderError::StateAtBlockPruned(block_number))
1574 }
1575
1576 self.storage_provider.get_account_before_block(block_number, address)
1578 }
1579 }
1580
1581 fn account_changesets_range(
1582 &self,
1583 range: impl core::ops::RangeBounds<BlockNumber>,
1584 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1585 let range = to_range(range);
1586 let mut changesets = Vec::new();
1587 let database_start = range.start;
1588 let mut database_end = range.end;
1589
1590 if let Some(head_block) = &self.head_block {
1592 database_end = head_block.anchor().number;
1594
1595 for state in head_block.chain() {
1596 let block_changesets = state
1598 .block_ref()
1599 .execution_output
1600 .state
1601 .reverts
1602 .clone()
1603 .to_plain_state_reverts()
1604 .accounts
1605 .into_iter()
1606 .flatten()
1607 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1608
1609 for changeset in block_changesets {
1610 changesets.push((state.number(), changeset));
1611 }
1612 }
1613 }
1614
1615 if database_start < database_end {
1617 let account_history_exists = self
1619 .storage_provider
1620 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1621 .and_then(|checkpoint| {
1622 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1623 })
1624 .unwrap_or(true);
1625
1626 if !account_history_exists {
1627 return Err(ProviderError::StateAtBlockPruned(database_start))
1628 }
1629
1630 let db_changesets =
1631 self.storage_provider.account_changesets_range(database_start..database_end)?;
1632 changesets.extend(db_changesets);
1633 }
1634
1635 changesets.sort_by_key(|(block_num, _)| *block_num);
1636
1637 Ok(changesets)
1638 }
1639
1640 fn account_changeset_count(&self) -> ProviderResult<usize> {
1641 let mut count = 0;
1643 if let Some(head_block) = &self.head_block {
1644 for state in head_block.chain() {
1645 count += state
1646 .block_ref()
1647 .execution_output
1648 .state
1649 .reverts
1650 .clone()
1651 .to_plain_state_reverts()
1652 .accounts
1653 .len();
1654 }
1655 }
1656
1657 count += self.storage_provider.account_changeset_count()?;
1659
1660 Ok(count)
1661 }
1662}
1663
1664impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1665 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1667 let state_provider = self.latest_ref()?;
1669 state_provider.basic_account(address)
1670 }
1671}
1672
1673impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1674 type Receipt = ReceiptTy<N>;
1675
1676 fn get_state(
1686 &self,
1687 block: BlockNumber,
1688 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1689 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1690 let state = state.block_ref().execution_outcome().clone();
1691 Ok(Some(ExecutionOutcome::from((state, block))))
1692 } else {
1693 Self::get_state(self, block..=block)
1694 }
1695 }
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700 use crate::{
1701 providers::blockchain_provider::BlockchainProvider,
1702 test_utils::create_test_provider_factory, BlockWriter,
1703 };
1704 use alloy_eips::BlockHashOrNumber;
1705 use alloy_primitives::B256;
1706 use itertools::Itertools;
1707 use rand::Rng;
1708 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1709 use reth_db_api::models::AccountBeforeTx;
1710 use reth_ethereum_primitives::Block;
1711 use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1712 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1713 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1714 use reth_testing_utils::generators::{
1715 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1716 };
1717 use revm_database::BundleState;
1718 use std::{
1719 ops::{Bound, Range, RangeBounds},
1720 sync::Arc,
1721 };
1722
1723 const TEST_BLOCKS_COUNT: usize = 5;
1724
1725 fn random_blocks(
1726 rng: &mut impl Rng,
1727 database_blocks: usize,
1728 in_memory_blocks: usize,
1729 requests_count: Option<Range<u8>>,
1730 withdrawals_count: Option<Range<u8>>,
1731 tx_count: impl RangeBounds<u8>,
1732 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1733 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1734
1735 let tx_start = match tx_count.start_bound() {
1736 Bound::Included(&n) | Bound::Excluded(&n) => n,
1737 Bound::Unbounded => u8::MIN,
1738 };
1739 let tx_end = match tx_count.end_bound() {
1740 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1741 Bound::Unbounded => u8::MAX,
1742 };
1743
1744 let blocks = random_block_range(
1745 rng,
1746 0..=block_range,
1747 BlockRangeParams {
1748 parent: Some(B256::ZERO),
1749 tx_count: tx_start..tx_end,
1750 requests_count,
1751 withdrawals_count,
1752 },
1753 );
1754 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1755 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1756 }
1757
1758 #[test]
1759 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1760 let mut rng = generators::rng();
1762 let factory = create_test_provider_factory();
1763
1764 let blocks = random_block_range(
1766 &mut rng,
1767 0..=10,
1768 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1769 );
1770 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1771
1772 let provider_rw = factory.provider_rw()?;
1774 for block in database_blocks {
1775 provider_rw.insert_block(
1776 &block.clone().try_recover().expect("failed to seal block with senders"),
1777 )?;
1778 }
1779 provider_rw.commit()?;
1780
1781 let provider = BlockchainProvider::new(factory)?;
1783 let consistent_provider = provider.consistent_provider()?;
1784
1785 let first_db_block = database_blocks.first().unwrap();
1787 let first_in_mem_block = in_memory_blocks.first().unwrap();
1788 let last_in_mem_block = in_memory_blocks.last().unwrap();
1789
1790 assert_eq!(
1792 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1793 None
1794 );
1795 assert_eq!(
1796 consistent_provider
1797 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1798 None
1799 );
1800 assert_eq!(
1802 consistent_provider
1803 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1804 None
1805 );
1806
1807 let in_memory_block_senders =
1809 first_in_mem_block.senders().expect("failed to recover senders");
1810 let chain = NewCanonicalChain::Commit {
1811 new: vec![ExecutedBlock {
1812 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1813 first_in_mem_block.clone(),
1814 in_memory_block_senders,
1815 )),
1816 ..Default::default()
1817 }],
1818 };
1819 consistent_provider.canonical_in_memory_state.update_chain(chain);
1820 let consistent_provider = provider.consistent_provider()?;
1821
1822 assert_eq!(
1824 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1825 Some(first_in_mem_block.clone().into_block())
1826 );
1827 assert_eq!(
1828 consistent_provider
1829 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1830 Some(first_in_mem_block.clone().into_block())
1831 );
1832
1833 assert_eq!(
1835 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1836 Some(first_db_block.clone().into_block())
1837 );
1838 assert_eq!(
1839 consistent_provider
1840 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1841 Some(first_db_block.clone().into_block())
1842 );
1843
1844 assert_eq!(
1846 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1847 None
1848 );
1849
1850 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1852 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1853 last_in_mem_block.clone(),
1854 Default::default(),
1855 )),
1856 ..Default::default()
1857 });
1858
1859 assert_eq!(
1861 consistent_provider
1862 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1863 Some(last_in_mem_block.clone_block())
1864 );
1865
1866 Ok(())
1867 }
1868
1869 #[test]
1870 fn test_block_reader_block() -> eyre::Result<()> {
1871 let mut rng = generators::rng();
1873 let factory = create_test_provider_factory();
1874
1875 let blocks = random_block_range(
1877 &mut rng,
1878 0..=10,
1879 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1880 );
1881 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1882
1883 let provider_rw = factory.provider_rw()?;
1885 for block in database_blocks {
1886 provider_rw.insert_block(
1887 &block.clone().try_recover().expect("failed to seal block with senders"),
1888 )?;
1889 }
1890 provider_rw.commit()?;
1891
1892 let provider = BlockchainProvider::new(factory)?;
1894 let consistent_provider = provider.consistent_provider()?;
1895
1896 let first_in_mem_block = in_memory_blocks.first().unwrap();
1898 let first_db_block = database_blocks.first().unwrap();
1900
1901 assert_eq!(
1903 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1904 None
1905 );
1906 assert_eq!(
1907 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1908 None
1909 );
1910
1911 let in_memory_block_senders =
1913 first_in_mem_block.senders().expect("failed to recover senders");
1914 let chain = NewCanonicalChain::Commit {
1915 new: vec![ExecutedBlock {
1916 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1917 first_in_mem_block.clone(),
1918 in_memory_block_senders,
1919 )),
1920 ..Default::default()
1921 }],
1922 };
1923 consistent_provider.canonical_in_memory_state.update_chain(chain);
1924
1925 let consistent_provider = provider.consistent_provider()?;
1926
1927 assert_eq!(
1929 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1930 Some(first_in_mem_block.clone().into_block())
1931 );
1932 assert_eq!(
1933 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1934 Some(first_in_mem_block.clone().into_block())
1935 );
1936
1937 assert_eq!(
1939 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1940 Some(first_db_block.clone().into_block())
1941 );
1942 assert_eq!(
1943 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1944 Some(first_db_block.clone().into_block())
1945 );
1946
1947 Ok(())
1948 }
1949
1950 #[test]
1951 fn test_changeset_reader() -> eyre::Result<()> {
1952 let mut rng = generators::rng();
1953
1954 let (database_blocks, in_memory_blocks) =
1955 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1956
1957 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1958 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1959 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1960
1961 let accounts = random_eoa_accounts(&mut rng, 2);
1962
1963 let (database_changesets, database_state) = random_changeset_range(
1964 &mut rng,
1965 &database_blocks,
1966 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1967 0..0,
1968 0..0,
1969 );
1970 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1971 &mut rng,
1972 &in_memory_blocks,
1973 database_state
1974 .iter()
1975 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1976 0..0,
1977 0..0,
1978 );
1979
1980 let factory = create_test_provider_factory();
1981
1982 let provider_rw = factory.provider_rw()?;
1983 provider_rw.append_blocks_with_state(
1984 database_blocks
1985 .into_iter()
1986 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1987 .collect(),
1988 &ExecutionOutcome {
1989 bundle: BundleState::new(
1990 database_state.into_iter().map(|(address, (account, _))| {
1991 (address, None, Some(account.into()), Default::default())
1992 }),
1993 database_changesets.iter().map(|block_changesets| {
1994 block_changesets.iter().map(|(address, account, _)| {
1995 (*address, Some(Some((*account).into())), [])
1996 })
1997 }),
1998 Vec::new(),
1999 ),
2000 first_block: first_database_block,
2001 ..Default::default()
2002 },
2003 Default::default(),
2004 )?;
2005 provider_rw.commit()?;
2006
2007 let provider = BlockchainProvider::new(factory)?;
2008
2009 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
2010 let chain = NewCanonicalChain::Commit {
2011 new: vec![in_memory_blocks
2012 .first()
2013 .map(|block| {
2014 let senders = block.senders().expect("failed to recover senders");
2015 ExecutedBlock {
2016 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2017 block.clone(),
2018 senders,
2019 )),
2020 execution_output: Arc::new(BlockExecutionOutput {
2021 state: BundleState::new(
2022 in_memory_state.into_iter().map(|(address, (account, _))| {
2023 (address, None, Some(account.into()), Default::default())
2024 }),
2025 [in_memory_changesets.iter().map(|(address, account, _)| {
2026 (*address, Some(Some((*account).into())), Vec::new())
2027 })],
2028 [],
2029 ),
2030 result: BlockExecutionResult {
2031 receipts: Default::default(),
2032 requests: Default::default(),
2033 gas_used: 0,
2034 blob_gas_used: 0,
2035 },
2036 }),
2037 ..Default::default()
2038 }
2039 })
2040 .unwrap()],
2041 };
2042 provider.canonical_in_memory_state.update_chain(chain);
2043
2044 let consistent_provider = provider.consistent_provider()?;
2045
2046 assert_eq!(
2047 consistent_provider.account_block_changeset(last_database_block).unwrap(),
2048 database_changesets
2049 .into_iter()
2050 .next_back()
2051 .unwrap()
2052 .into_iter()
2053 .sorted_by_key(|(address, _, _)| *address)
2054 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2055 .collect::<Vec<_>>()
2056 );
2057 assert_eq!(
2058 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2059 in_memory_changesets
2060 .into_iter()
2061 .sorted_by_key(|(address, _, _)| *address)
2062 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2063 .collect::<Vec<_>>()
2064 );
2065
2066 Ok(())
2067 }
2068 #[test]
2069 fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
2070 use alloy_primitives::U256;
2071 use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2072 use reth_primitives_traits::StorageEntry;
2073 use reth_storage_api::StorageSettingsCache;
2074 use std::collections::HashMap;
2075
2076 let address = alloy_primitives::Address::with_last_byte(1);
2077 let account = reth_primitives_traits::Account {
2078 nonce: 1,
2079 balance: U256::from(1000),
2080 bytecode_hash: None,
2081 };
2082 let slot = U256::from(0x42);
2083 let slot_b256 = B256::from(slot);
2084
2085 let mut rng = generators::rng();
2086 let factory = create_test_provider_factory();
2087 factory.set_storage_settings_cache(StorageSettings::v1());
2088
2089 let blocks = random_block_range(
2090 &mut rng,
2091 0..=1,
2092 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2093 );
2094
2095 let provider_rw = factory.provider_rw()?;
2096 provider_rw.append_blocks_with_state(
2097 blocks
2098 .into_iter()
2099 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2100 .collect(),
2101 &ExecutionOutcome {
2102 bundle: BundleState::new(
2103 [(address, None, Some(account.into()), {
2104 let mut s = HashMap::default();
2105 s.insert(slot, (U256::ZERO, U256::from(100)));
2106 s
2107 })],
2108 [
2109 Vec::new(),
2110 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2111 ],
2112 [],
2113 ),
2114 first_block: 0,
2115 ..Default::default()
2116 },
2117 Default::default(),
2118 )?;
2119
2120 provider_rw.tx_ref().put::<tables::PlainStorageState>(
2121 address,
2122 StorageEntry { key: slot_b256, value: U256::from(100) },
2123 )?;
2124 provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
2125
2126 provider_rw.commit()?;
2127
2128 let provider = BlockchainProvider::new(factory)?;
2129 let consistent_provider = provider.consistent_provider()?;
2130
2131 let outcome =
2132 consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2133
2134 let state = &outcome.bundle.state;
2135 let account_state = state.get(&address).expect("should have account in bundle state");
2136 let storage = &account_state.storage;
2137
2138 let storage_slot = storage.get(&slot).expect("should have the slot in storage");
2139
2140 assert_eq!(
2141 storage_slot.present_value,
2142 U256::from(100),
2143 "present_value should be 100 (the actual value in PlainStorageState)"
2144 );
2145
2146 Ok(())
2147 }
2148
2149 #[test]
2150 fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
2151 use alloy_primitives::U256;
2152 use reth_db_api::models::StorageSettings;
2153 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2154 use std::collections::HashMap;
2155
2156 let mut rng = generators::rng();
2157 let factory = create_test_provider_factory();
2158 factory.set_storage_settings_cache(StorageSettings::v1());
2159
2160 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2161
2162 let address = alloy_primitives::Address::with_last_byte(1);
2163 let account = reth_primitives_traits::Account {
2164 nonce: 1,
2165 balance: U256::from(1000),
2166 bytecode_hash: None,
2167 };
2168 let slot = U256::from(0x42);
2169
2170 let provider_rw = factory.provider_rw()?;
2171 provider_rw.append_blocks_with_state(
2172 database_blocks
2173 .into_iter()
2174 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2175 .collect(),
2176 &ExecutionOutcome {
2177 bundle: BundleState::new(
2178 [(address, None, Some(account.into()), {
2179 let mut s = HashMap::default();
2180 s.insert(slot, (U256::ZERO, U256::from(100)));
2181 s
2182 })],
2183 [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2184 [],
2185 ),
2186 first_block: 0,
2187 ..Default::default()
2188 },
2189 Default::default(),
2190 )?;
2191 provider_rw.commit()?;
2192
2193 let provider = BlockchainProvider::new(factory)?;
2194
2195 let in_mem_block = in_memory_blocks.first().unwrap();
2196 let senders = in_mem_block.senders().expect("failed to recover senders");
2197 let chain = NewCanonicalChain::Commit {
2198 new: vec![ExecutedBlock {
2199 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2200 in_mem_block.clone(),
2201 senders,
2202 )),
2203 execution_output: Arc::new(BlockExecutionOutput {
2204 state: BundleState::new(
2205 [(address, None, Some(account.into()), {
2206 let mut s = HashMap::default();
2207 s.insert(slot, (U256::from(100), U256::from(200)));
2208 s
2209 })],
2210 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2211 [],
2212 ),
2213 result: BlockExecutionResult {
2214 receipts: Default::default(),
2215 requests: Default::default(),
2216 gas_used: 0,
2217 blob_gas_used: 0,
2218 },
2219 }),
2220 ..Default::default()
2221 }],
2222 };
2223 provider.canonical_in_memory_state.update_chain(chain);
2224
2225 let consistent_provider = provider.consistent_provider()?;
2226
2227 let db_changeset = consistent_provider.storage_changeset(0)?;
2228 let mem_changeset = consistent_provider.storage_changeset(1)?;
2229
2230 let slot_b256 = B256::from(slot);
2231
2232 assert_eq!(db_changeset.len(), 1);
2233 assert_eq!(mem_changeset.len(), 1);
2234
2235 let db_key = db_changeset[0].1.key;
2236 let mem_key = mem_changeset[0].1.key;
2237
2238 assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2239 assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2240 assert_eq!(
2241 db_key, mem_key,
2242 "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2243 );
2244
2245 Ok(())
2246 }
2247
2248 #[test]
2249 fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2250 use alloy_primitives::U256;
2251 use reth_db_api::models::StorageSettings;
2252 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2253 use std::collections::HashMap;
2254
2255 let mut rng = generators::rng();
2256 let factory = create_test_provider_factory();
2257 factory.set_storage_settings_cache(StorageSettings::v1());
2258
2259 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2260
2261 let address = alloy_primitives::Address::with_last_byte(1);
2262 let account = reth_primitives_traits::Account {
2263 nonce: 1,
2264 balance: U256::from(1000),
2265 bytecode_hash: None,
2266 };
2267 let slot = U256::from(0x42);
2268
2269 let provider_rw = factory.provider_rw()?;
2270 provider_rw.append_blocks_with_state(
2271 database_blocks
2272 .into_iter()
2273 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2274 .collect(),
2275 &ExecutionOutcome {
2276 bundle: BundleState::new(
2277 [(address, None, Some(account.into()), {
2278 let mut s = HashMap::default();
2279 s.insert(slot, (U256::ZERO, U256::from(100)));
2280 s
2281 })],
2282 vec![
2283 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2284 vec![],
2285 ],
2286 [],
2287 ),
2288 first_block: 0,
2289 ..Default::default()
2290 },
2291 Default::default(),
2292 )?;
2293 provider_rw.commit()?;
2294
2295 let provider = BlockchainProvider::new(factory)?;
2296
2297 let in_mem_block = in_memory_blocks.first().unwrap();
2298 let senders = in_mem_block.senders().expect("failed to recover senders");
2299 let chain = NewCanonicalChain::Commit {
2300 new: vec![ExecutedBlock {
2301 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2302 in_mem_block.clone(),
2303 senders,
2304 )),
2305 execution_output: Arc::new(BlockExecutionOutput {
2306 state: BundleState::new(
2307 [(address, None, Some(account.into()), {
2308 let mut s = HashMap::default();
2309 s.insert(slot, (U256::from(100), U256::from(200)));
2310 s
2311 })],
2312 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2313 [],
2314 ),
2315 result: BlockExecutionResult {
2316 receipts: Default::default(),
2317 requests: Default::default(),
2318 gas_used: 0,
2319 blob_gas_used: 0,
2320 },
2321 }),
2322 ..Default::default()
2323 }],
2324 };
2325 provider.canonical_in_memory_state.update_chain(chain);
2326
2327 let consistent_provider = provider.consistent_provider()?;
2328
2329 let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2330
2331 assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2332
2333 let slot_b256 = B256::from(slot);
2334 let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key).collect();
2335
2336 assert_eq!(
2337 keys[0], keys[1],
2338 "same logical slot should produce identical keys whether from DB or memory"
2339 );
2340 assert_eq!(
2341 keys[0], slot_b256,
2342 "keys should be plain/unhashed when use_hashed_state is false"
2343 );
2344
2345 Ok(())
2346 }
2347}