1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4 to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5 BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8 TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13 HashOrNumber,
14};
15use alloy_primitives::{
16 map::{hash_map, HashMap},
17 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
25use reth_prune_types::{PruneCheckpoint, PruneSegment};
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::{
29 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30 StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
224 &self,
225 account_changeset: Vec<(u64, AccountBeforeTx)>,
226 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
227 block_range_end: BlockNumber,
228 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
229 let mut state: BundleStateInit = HashMap::default();
230 let mut reverts: RevertsInit = HashMap::default();
231 let state_provider = self.state_by_block_number_ref(block_range_end)?;
232
233 for (block_number, account_before) in account_changeset.into_iter().rev() {
235 let AccountBeforeTx { info: old_info, address } = account_before;
236 match state.entry(address) {
237 hash_map::Entry::Vacant(entry) => {
238 let new_info = state_provider.basic_account(&address)?;
239 entry.insert((old_info, new_info, HashMap::default()));
240 }
241 hash_map::Entry::Occupied(mut entry) => {
242 entry.get_mut().0 = old_info;
244 }
245 }
246 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
248 }
249
250 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
252 let BlockNumberAddress((block_number, address)) = block_and_address;
253 let account_state = match state.entry(address) {
255 hash_map::Entry::Vacant(entry) => {
256 let present_info = state_provider.basic_account(&address)?;
257 entry.insert((present_info, present_info, HashMap::default()))
258 }
259 hash_map::Entry::Occupied(entry) => entry.into_mut(),
260 };
261
262 match account_state.2.entry(old_storage.key) {
264 hash_map::Entry::Vacant(entry) => {
265 let new_storage_value =
266 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
267 entry.insert((old_storage.value, new_storage_value));
268 }
269 hash_map::Entry::Occupied(mut entry) => {
270 entry.get_mut().0 = old_storage.value;
271 }
272 };
273
274 reverts
275 .entry(block_number)
276 .or_default()
277 .entry(address)
278 .or_default()
279 .1
280 .push(old_storage);
281 }
282
283 Ok((state, reverts))
284 }
285
286 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
298 &self,
299 range: impl RangeBounds<BlockNumber>,
300 fetch_db_range: F,
301 map_block_state_item: G,
302 mut predicate: P,
303 ) -> ProviderResult<Vec<T>>
304 where
305 F: FnOnce(
306 &DatabaseProviderRO<N::DB, N>,
307 RangeInclusive<BlockNumber>,
308 &mut P,
309 ) -> ProviderResult<Vec<T>>,
310 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
311 P: FnMut(&T) -> bool,
312 {
313 let mut in_memory_chain =
321 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
322 let db_provider = &self.storage_provider;
323
324 let (start, end) = self.convert_range_bounds(range, || {
325 in_memory_chain
327 .first()
328 .map(|b| b.number())
329 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
330 });
331
332 if start > end {
333 return Ok(vec![])
334 }
335
336 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
341 Some(lowest_memory_block) if lowest_memory_block <= end => {
342 let highest_memory_block =
343 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
344
345 let in_memory_range =
349 lowest_memory_block.max(start)..=end.min(highest_memory_block);
350
351 in_memory_chain.truncate(
354 in_memory_chain
355 .len()
356 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
357 );
358
359 let storage_range =
360 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
361
362 (Some((in_memory_chain, in_memory_range)), storage_range)
363 }
364 _ => {
365 drop(in_memory_chain);
367
368 (None, Some(start..=end))
369 }
370 };
371
372 let mut items = Vec::with_capacity((end - start + 1) as usize);
373
374 if let Some(storage_range) = storage_range {
375 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
376 items.append(&mut db_items);
377
378 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
381 return Ok(items)
382 }
383 }
384
385 if let Some((in_memory_chain, in_memory_range)) = in_memory {
386 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
387 debug_assert!(num == block.number());
388 if let Some(item) = map_block_state_item(block, &mut predicate) {
389 items.push(item);
390 } else {
391 break
392 }
393 }
394 }
395
396 Ok(items)
397 }
398
399 fn block_state_provider_ref(
401 &self,
402 state: &BlockState<N::Primitives>,
403 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
404 let anchor_hash = state.anchor().hash;
405 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
406 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
407 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
408 }
409
410 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
416 &self,
417 range: impl RangeBounds<BlockNumber>,
418 fetch_from_db: S,
419 fetch_from_block_state: M,
420 ) -> ProviderResult<Vec<R>>
421 where
422 S: FnOnce(
423 &DatabaseProviderRO<N::DB, N>,
424 RangeInclusive<TxNumber>,
425 ) -> ProviderResult<Vec<R>>,
426 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
427 {
428 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
429 let provider = &self.storage_provider;
430
431 let last_database_block_number = in_mem_chain
434 .last()
435 .map(|b| Ok(b.anchor().number))
436 .unwrap_or_else(|| provider.last_block_number())?;
437
438 let last_block_body_index = provider
441 .block_body_indices(last_database_block_number)?
442 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
443 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
444
445 let (start, end) = self.convert_range_bounds(range, || {
446 in_mem_chain
447 .iter()
448 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
449 .sum::<u64>() +
450 last_block_body_index.last_tx_num()
451 });
452
453 if start > end {
454 return Ok(vec![])
455 }
456
457 let mut tx_range = start..=end;
458
459 if *tx_range.end() < in_memory_tx_num {
462 return fetch_from_db(provider, tx_range);
463 }
464
465 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
466
467 if *tx_range.start() < in_memory_tx_num {
469 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
471
472 tx_range = in_memory_tx_num..=*tx_range.end();
474
475 items.extend(fetch_from_db(provider, db_range)?);
476 }
477
478 for block_state in in_mem_chain.iter().rev() {
480 let block_tx_count =
481 block_state.block_ref().recovered_block().body().transactions().len();
482 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
483
484 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
487 in_memory_tx_num += block_tx_count as u64;
488 continue
489 }
490
491 let skip = (tx_range.start() - in_memory_tx_num) as usize;
493
494 items.extend(fetch_from_block_state(
495 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
496 block_state,
497 )?);
498
499 in_memory_tx_num += block_tx_count as u64;
500
501 if in_memory_tx_num > *tx_range.end() {
503 break
504 }
505
506 tx_range = in_memory_tx_num..=*tx_range.end();
508 }
509
510 Ok(items)
511 }
512
513 fn get_in_memory_or_storage_by_tx<S, M, R>(
516 &self,
517 id: HashOrNumber,
518 fetch_from_db: S,
519 fetch_from_block_state: M,
520 ) -> ProviderResult<Option<R>>
521 where
522 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
523 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
524 {
525 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
526 let provider = &self.storage_provider;
527
528 let last_database_block_number = in_mem_chain
531 .last()
532 .map(|b| Ok(b.anchor().number))
533 .unwrap_or_else(|| provider.last_block_number())?;
534
535 let last_block_body_index = provider
538 .block_body_indices(last_database_block_number)?
539 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
540 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
541
542 if let HashOrNumber::Number(id) = id &&
545 id < in_memory_tx_num
546 {
547 return fetch_from_db(provider)
548 }
549
550 for block_state in in_mem_chain.iter().rev() {
552 let executed_block = block_state.block_ref();
553 let block = executed_block.recovered_block();
554
555 for tx_index in 0..block.body().transactions().len() {
556 match id {
557 HashOrNumber::Hash(tx_hash) => {
558 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
559 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560 }
561 }
562 HashOrNumber::Number(id) => {
563 if id == in_memory_tx_num {
564 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565 }
566 }
567 }
568
569 in_memory_tx_num += 1;
570 }
571 }
572
573 if let HashOrNumber::Hash(_) = id {
575 return fetch_from_db(provider)
576 }
577
578 Ok(None)
579 }
580
581 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
583 &self,
584 id: BlockHashOrNumber,
585 fetch_from_db: S,
586 fetch_from_block_state: M,
587 ) -> ProviderResult<R>
588 where
589 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
590 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
591 {
592 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
593 return fetch_from_block_state(block_state)
594 }
595 fetch_from_db(&self.storage_provider)
596 }
597
598 pub(crate) fn into_state_provider_at_block_hash(
600 self,
601 block_hash: BlockHash,
602 ) -> ProviderResult<StateProviderBox> {
603 let block_number =
605 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
606 self.ensure_canonical_block(block_number)?;
607
608 let Self { storage_provider, head_block, .. } = self;
609 if let Some(Some(block_state)) =
610 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
611 {
612 let anchor_hash = block_state.anchor().hash;
613 let block_number = storage_provider
614 .block_number(anchor_hash)?
615 .ok_or(ProviderError::BlockHashNotFound(anchor_hash))?;
616 let latest_historical = storage_provider.try_into_history_at_block(block_number)?;
617 return Ok(Box::new(block_state.state_provider(latest_historical)));
618 }
619 storage_provider.try_into_history_at_block(block_number)
620 }
621}
622
623impl<N: ProviderNodeTypes> ConsistentProvider<N> {
624 #[inline]
633 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
634 let latest = self.best_block_number()?;
635 if block_number > latest {
636 Err(ProviderError::HeaderNotFound(block_number.into()))
637 } else {
638 Ok(())
639 }
640 }
641}
642
643impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
644 type Primitives = N::Primitives;
645}
646
647impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
648 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
649 self.storage_provider.static_file_provider()
650 }
651
652 fn get_static_file_writer(
653 &self,
654 block: BlockNumber,
655 segment: StaticFileSegment,
656 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
657 self.storage_provider.get_static_file_writer(block, segment)
658 }
659}
660
661impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
662 type Header = HeaderTy<N>;
663
664 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
665 self.get_in_memory_or_storage_by_block(
666 block_hash.into(),
667 |db_provider| db_provider.header(block_hash),
668 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
669 )
670 }
671
672 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
673 self.get_in_memory_or_storage_by_block(
674 num.into(),
675 |db_provider| db_provider.header_by_number(num),
676 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
677 )
678 }
679
680 fn headers_range(
681 &self,
682 range: impl RangeBounds<BlockNumber>,
683 ) -> ProviderResult<Vec<Self::Header>> {
684 self.get_in_memory_or_storage_by_block_range_while(
685 range,
686 |db_provider, range, _| db_provider.headers_range(range),
687 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
688 |_| true,
689 )
690 }
691
692 fn sealed_header(
693 &self,
694 number: BlockNumber,
695 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
696 self.get_in_memory_or_storage_by_block(
697 number.into(),
698 |db_provider| db_provider.sealed_header(number),
699 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
700 )
701 }
702
703 fn sealed_headers_range(
704 &self,
705 range: impl RangeBounds<BlockNumber>,
706 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
707 self.get_in_memory_or_storage_by_block_range_while(
708 range,
709 |db_provider, range, _| db_provider.sealed_headers_range(range),
710 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
711 |_| true,
712 )
713 }
714
715 fn sealed_headers_while(
716 &self,
717 range: impl RangeBounds<BlockNumber>,
718 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
719 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
720 self.get_in_memory_or_storage_by_block_range_while(
721 range,
722 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
723 |block_state, predicate| {
724 let header = block_state.block_ref().recovered_block().sealed_header();
725 predicate(header).then(|| header.clone())
726 },
727 predicate,
728 )
729 }
730}
731
732impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
733 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
734 self.get_in_memory_or_storage_by_block(
735 number.into(),
736 |db_provider| db_provider.block_hash(number),
737 |block_state| Ok(Some(block_state.hash())),
738 )
739 }
740
741 fn canonical_hashes_range(
742 &self,
743 start: BlockNumber,
744 end: BlockNumber,
745 ) -> ProviderResult<Vec<B256>> {
746 self.get_in_memory_or_storage_by_block_range_while(
747 start..end,
748 |db_provider, inclusive_range, _| {
749 db_provider
750 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
751 },
752 |block_state, _| Some(block_state.hash()),
753 |_| true,
754 )
755 }
756}
757
758impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
759 fn chain_info(&self) -> ProviderResult<ChainInfo> {
760 let best_number = self.best_block_number()?;
761 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
762 }
763
764 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
765 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
766 }
767
768 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
769 self.storage_provider.last_block_number()
770 }
771
772 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
773 self.get_in_memory_or_storage_by_block(
774 hash.into(),
775 |db_provider| db_provider.block_number(hash),
776 |block_state| Ok(Some(block_state.number())),
777 )
778 }
779}
780
781impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
782 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
783 Ok(self.canonical_in_memory_state.pending_block_num_hash())
784 }
785
786 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
787 Ok(self.canonical_in_memory_state.get_safe_num_hash())
788 }
789
790 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
791 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
792 }
793}
794
795impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
796 type Block = BlockTy<N>;
797
798 fn find_block_by_hash(
799 &self,
800 hash: B256,
801 source: BlockSource,
802 ) -> ProviderResult<Option<Self::Block>> {
803 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
804 let Some(block) = self.get_in_memory_or_storage_by_block(
805 hash.into(),
806 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
807 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
808 )?
809 {
810 return Ok(Some(block))
811 }
812
813 if matches!(source, BlockSource::Pending | BlockSource::Any) {
814 return Ok(self
815 .canonical_in_memory_state
816 .pending_block()
817 .filter(|b| b.hash() == hash)
818 .map(|b| b.into_block()))
819 }
820
821 Ok(None)
822 }
823
824 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
825 self.get_in_memory_or_storage_by_block(
826 id,
827 |db_provider| db_provider.block(id),
828 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
829 )
830 }
831
832 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
833 Ok(self.canonical_in_memory_state.pending_recovered_block())
834 }
835
836 fn pending_block_and_receipts(
837 &self,
838 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
839 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
840 }
841
842 fn recovered_block(
849 &self,
850 id: BlockHashOrNumber,
851 transaction_kind: TransactionVariant,
852 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
853 self.get_in_memory_or_storage_by_block(
854 id,
855 |db_provider| db_provider.recovered_block(id, transaction_kind),
856 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
857 )
858 }
859
860 fn sealed_block_with_senders(
861 &self,
862 id: BlockHashOrNumber,
863 transaction_kind: TransactionVariant,
864 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
865 self.get_in_memory_or_storage_by_block(
866 id,
867 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
868 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
869 )
870 }
871
872 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
873 self.get_in_memory_or_storage_by_block_range_while(
874 range,
875 |db_provider, range, _| db_provider.block_range(range),
876 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
877 |_| true,
878 )
879 }
880
881 fn block_with_senders_range(
882 &self,
883 range: RangeInclusive<BlockNumber>,
884 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
885 self.get_in_memory_or_storage_by_block_range_while(
886 range,
887 |db_provider, range, _| db_provider.block_with_senders_range(range),
888 |block_state, _| Some(block_state.block().recovered_block().clone()),
889 |_| true,
890 )
891 }
892
893 fn recovered_block_range(
894 &self,
895 range: RangeInclusive<BlockNumber>,
896 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
897 self.get_in_memory_or_storage_by_block_range_while(
898 range,
899 |db_provider, range, _| db_provider.recovered_block_range(range),
900 |block_state, _| Some(block_state.block().recovered_block().clone()),
901 |_| true,
902 )
903 }
904
905 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
906 self.get_in_memory_or_storage_by_tx(
907 id.into(),
908 |db_provider| db_provider.block_by_transaction_id(id),
909 |_, _, block_state| Ok(Some(block_state.number())),
910 )
911 }
912}
913
914impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
915 type Transaction = TxTy<N>;
916
917 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
918 self.get_in_memory_or_storage_by_tx(
919 tx_hash.into(),
920 |db_provider| db_provider.transaction_id(tx_hash),
921 |_, tx_number, _| Ok(Some(tx_number)),
922 )
923 }
924
925 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
926 self.get_in_memory_or_storage_by_tx(
927 id.into(),
928 |provider| provider.transaction_by_id(id),
929 |tx_index, _, block_state| {
930 Ok(block_state
931 .block_ref()
932 .recovered_block()
933 .body()
934 .transactions()
935 .get(tx_index)
936 .cloned())
937 },
938 )
939 }
940
941 fn transaction_by_id_unhashed(
942 &self,
943 id: TxNumber,
944 ) -> ProviderResult<Option<Self::Transaction>> {
945 self.get_in_memory_or_storage_by_tx(
946 id.into(),
947 |provider| provider.transaction_by_id_unhashed(id),
948 |tx_index, _, block_state| {
949 Ok(block_state
950 .block_ref()
951 .recovered_block()
952 .body()
953 .transactions()
954 .get(tx_index)
955 .cloned())
956 },
957 )
958 }
959
960 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
961 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
962 return Ok(Some(tx))
963 }
964
965 self.storage_provider.transaction_by_hash(hash)
966 }
967
968 fn transaction_by_hash_with_meta(
969 &self,
970 tx_hash: TxHash,
971 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
972 if let Some((tx, meta)) =
973 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
974 {
975 return Ok(Some((tx, meta)))
976 }
977
978 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
979 }
980
981 fn transactions_by_block(
982 &self,
983 id: BlockHashOrNumber,
984 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
985 self.get_in_memory_or_storage_by_block(
986 id,
987 |provider| provider.transactions_by_block(id),
988 |block_state| {
989 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
990 },
991 )
992 }
993
994 fn transactions_by_block_range(
995 &self,
996 range: impl RangeBounds<BlockNumber>,
997 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
998 self.get_in_memory_or_storage_by_block_range_while(
999 range,
1000 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1001 |block_state, _| {
1002 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1003 },
1004 |_| true,
1005 )
1006 }
1007
1008 fn transactions_by_tx_range(
1009 &self,
1010 range: impl RangeBounds<TxNumber>,
1011 ) -> ProviderResult<Vec<Self::Transaction>> {
1012 self.get_in_memory_or_storage_by_tx_range(
1013 range,
1014 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1015 |index_range, block_state| {
1016 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1017 .to_vec())
1018 },
1019 )
1020 }
1021
1022 fn senders_by_tx_range(
1023 &self,
1024 range: impl RangeBounds<TxNumber>,
1025 ) -> ProviderResult<Vec<Address>> {
1026 self.get_in_memory_or_storage_by_tx_range(
1027 range,
1028 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1029 |index_range, block_state| {
1030 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1031 },
1032 )
1033 }
1034
1035 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1036 self.get_in_memory_or_storage_by_tx(
1037 id.into(),
1038 |provider| provider.transaction_sender(id),
1039 |tx_index, _, block_state| {
1040 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1041 },
1042 )
1043 }
1044}
1045
1046impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1047 type Receipt = ReceiptTy<N>;
1048
1049 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1050 self.get_in_memory_or_storage_by_tx(
1051 id.into(),
1052 |provider| provider.receipt(id),
1053 |tx_index, _, block_state| {
1054 Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1055 },
1056 )
1057 }
1058
1059 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1060 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1061 let executed_block = block_state.block_ref();
1062 let block = executed_block.recovered_block();
1063 let receipts = block_state.executed_block_receipts_ref();
1064
1065 debug_assert_eq!(
1067 block.body().transactions().len(),
1068 receipts.len(),
1069 "Mismatch between transaction and receipt count"
1070 );
1071
1072 if let Some(tx_index) =
1073 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1074 {
1075 return Ok(receipts.get(tx_index).cloned());
1077 }
1078 }
1079
1080 self.storage_provider.receipt_by_hash(hash)
1081 }
1082
1083 fn receipts_by_block(
1084 &self,
1085 block: BlockHashOrNumber,
1086 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1087 self.get_in_memory_or_storage_by_block(
1088 block,
1089 |db_provider| db_provider.receipts_by_block(block),
1090 |block_state| Ok(Some(block_state.executed_block_receipts())),
1091 )
1092 }
1093
1094 fn receipts_by_tx_range(
1095 &self,
1096 range: impl RangeBounds<TxNumber>,
1097 ) -> ProviderResult<Vec<Self::Receipt>> {
1098 self.get_in_memory_or_storage_by_tx_range(
1099 range,
1100 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1101 |index_range, block_state| {
1102 Ok(block_state.executed_block_receipts_ref()[index_range].to_vec())
1103 },
1104 )
1105 }
1106
1107 fn receipts_by_block_range(
1108 &self,
1109 block_range: RangeInclusive<BlockNumber>,
1110 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1111 self.storage_provider.receipts_by_block_range(block_range)
1112 }
1113}
1114
1115impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1116 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1117 match block {
1118 BlockId::Hash(rpc_block_hash) => {
1119 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1120 if receipts.is_none() &&
1121 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1122 let Some(state) = self
1123 .head_block
1124 .as_ref()
1125 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1126 {
1127 receipts = Some(state.executed_block_receipts());
1128 }
1129 Ok(receipts)
1130 }
1131 BlockId::Number(num_tag) => match num_tag {
1132 BlockNumberOrTag::Pending => Ok(self
1133 .canonical_in_memory_state
1134 .pending_state()
1135 .map(|block_state| block_state.executed_block_receipts())),
1136 _ => {
1137 if let Some(num) = self.convert_block_number(num_tag)? {
1138 self.receipts_by_block(num.into())
1139 } else {
1140 Ok(None)
1141 }
1142 }
1143 },
1144 }
1145 }
1146}
1147
1148impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1149 fn block_body_indices(
1150 &self,
1151 number: BlockNumber,
1152 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1153 self.get_in_memory_or_storage_by_block(
1154 number.into(),
1155 |db_provider| db_provider.block_body_indices(number),
1156 |block_state| {
1157 let last_storage_block_number = block_state.anchor().number;
1159 let mut stored_indices = self
1160 .storage_provider
1161 .block_body_indices(last_storage_block_number)?
1162 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1163
1164 stored_indices.first_tx_num = stored_indices.next_tx_num();
1166 stored_indices.tx_count = 0;
1167
1168 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1170 let block_tx_count =
1171 state.block_ref().recovered_block().body().transactions().len() as u64;
1172 if state.block_ref().recovered_block().number() == number {
1173 stored_indices.tx_count = block_tx_count;
1174 } else {
1175 stored_indices.first_tx_num += block_tx_count;
1176 }
1177 }
1178
1179 Ok(Some(stored_indices))
1180 },
1181 )
1182 }
1183
1184 fn block_body_indices_range(
1185 &self,
1186 range: RangeInclusive<BlockNumber>,
1187 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1188 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1189 }
1190}
1191
1192impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1193 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1194 self.storage_provider.get_stage_checkpoint(id)
1195 }
1196
1197 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1198 self.storage_provider.get_stage_checkpoint_progress(id)
1199 }
1200
1201 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1202 self.storage_provider.get_all_checkpoints()
1203 }
1204}
1205
1206impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1207 fn get_prune_checkpoint(
1208 &self,
1209 segment: PruneSegment,
1210 ) -> ProviderResult<Option<PruneCheckpoint>> {
1211 self.storage_provider.get_prune_checkpoint(segment)
1212 }
1213
1214 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1215 self.storage_provider.get_prune_checkpoints()
1216 }
1217}
1218
1219impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1220 type ChainSpec = N::ChainSpec;
1221
1222 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1223 ChainSpecProvider::chain_spec(&self.storage_provider)
1224 }
1225}
1226
1227impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1228 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1229 match id {
1230 BlockId::Number(num) => self.block_by_number_or_tag(num),
1231 BlockId::Hash(hash) => {
1232 if Some(true) == hash.require_canonical {
1237 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1239 } else {
1240 self.block_by_hash(hash.block_hash)
1241 }
1242 }
1243 }
1244 }
1245
1246 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1247 Ok(match id {
1248 BlockNumberOrTag::Latest => {
1249 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1250 }
1251 BlockNumberOrTag::Finalized => {
1252 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1253 }
1254 BlockNumberOrTag::Safe => {
1255 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1256 }
1257 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1258 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1259
1260 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1261 })
1262 }
1263
1264 fn sealed_header_by_number_or_tag(
1265 &self,
1266 id: BlockNumberOrTag,
1267 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1268 match id {
1269 BlockNumberOrTag::Latest => {
1270 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1271 }
1272 BlockNumberOrTag::Finalized => {
1273 Ok(self.canonical_in_memory_state.get_finalized_header())
1274 }
1275 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1276 BlockNumberOrTag::Earliest => self
1277 .header_by_number(self.earliest_block_number()?)?
1278 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1279 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1280 BlockNumberOrTag::Number(num) => self
1281 .header_by_number(num)?
1282 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1283 }
1284 }
1285
1286 fn sealed_header_by_id(
1287 &self,
1288 id: BlockId,
1289 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1290 Ok(match id {
1291 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1292 BlockId::Hash(hash) => self
1293 .header(hash.block_hash)?
1294 .map(|header| SealedHeader::new(header, hash.block_hash)),
1295 })
1296 }
1297
1298 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1299 Ok(match id {
1300 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1301 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1302 })
1303 }
1304}
1305
1306impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1307 fn storage_changeset(
1308 &self,
1309 block_number: BlockNumber,
1310 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1311 if let Some(state) =
1312 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1313 {
1314 let changesets = state
1315 .block()
1316 .execution_output
1317 .state
1318 .reverts
1319 .to_plain_state_reverts()
1320 .storage
1321 .into_iter()
1322 .flatten()
1323 .flat_map(|revert: PlainStorageRevert| {
1324 revert.storage_revert.into_iter().map(move |(key, value)| {
1325 let plain_key = B256::from(key.to_be_bytes());
1326 (
1327 BlockNumberAddress((block_number, revert.address)),
1328 StorageEntry { key: plain_key, value: value.to_previous_value() },
1329 )
1330 })
1331 })
1332 .collect();
1333 Ok(changesets)
1334 } else {
1335 let storage_history_exists = self
1339 .storage_provider
1340 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1341 .and_then(|checkpoint| {
1342 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1347 })
1348 .unwrap_or(true);
1349
1350 if !storage_history_exists {
1351 return Err(ProviderError::StateAtBlockPruned(block_number))
1352 }
1353
1354 self.storage_provider.storage_changeset(block_number)
1355 }
1356 }
1357
1358 fn get_storage_before_block(
1359 &self,
1360 block_number: BlockNumber,
1361 address: Address,
1362 storage_key: B256,
1363 ) -> ProviderResult<Option<StorageEntry>> {
1364 if let Some(state) =
1365 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1366 {
1367 let changeset = state
1368 .block_ref()
1369 .execution_output
1370 .state
1371 .reverts
1372 .to_plain_state_reverts()
1373 .storage
1374 .into_iter()
1375 .flatten()
1376 .find_map(|revert: PlainStorageRevert| {
1377 if revert.address != address {
1378 return None
1379 }
1380 revert.storage_revert.into_iter().find_map(|(key, value)| {
1381 let plain_key = B256::from(key.to_be_bytes());
1382 (plain_key == storage_key).then(|| StorageEntry {
1383 key: plain_key,
1384 value: value.to_previous_value(),
1385 })
1386 })
1387 });
1388 Ok(changeset)
1389 } else {
1390 let storage_history_exists = self
1391 .storage_provider
1392 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1393 .and_then(|checkpoint| {
1394 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1395 })
1396 .unwrap_or(true);
1397
1398 if !storage_history_exists {
1399 return Err(ProviderError::StateAtBlockPruned(block_number))
1400 }
1401
1402 self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1403 }
1404 }
1405
1406 fn storage_changesets_range(
1407 &self,
1408 range: impl RangeBounds<BlockNumber>,
1409 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1410 let range = to_range(range);
1411 let mut changesets = Vec::new();
1412 let database_start = range.start;
1413 let mut database_end = range.end;
1414
1415 if let Some(head_block) = &self.head_block {
1416 database_end = head_block.anchor().number;
1417
1418 for state in head_block.chain() {
1419 let block_changesets = state
1420 .block_ref()
1421 .execution_output
1422 .state
1423 .reverts
1424 .to_plain_state_reverts()
1425 .storage
1426 .into_iter()
1427 .flatten()
1428 .flat_map(|revert: PlainStorageRevert| {
1429 revert.storage_revert.into_iter().map(move |(key, value)| {
1430 let plain_key = B256::from(key.to_be_bytes());
1431 (
1432 BlockNumberAddress((state.number(), revert.address)),
1433 StorageEntry { key: plain_key, value: value.to_previous_value() },
1434 )
1435 })
1436 });
1437
1438 changesets.extend(block_changesets);
1439 }
1440 }
1441
1442 if database_start < database_end {
1443 let storage_history_exists = self
1444 .storage_provider
1445 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1446 .and_then(|checkpoint| {
1447 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1448 })
1449 .unwrap_or(true);
1450
1451 if !storage_history_exists {
1452 return Err(ProviderError::StateAtBlockPruned(database_start))
1453 }
1454
1455 let db_changesets = self
1456 .storage_provider
1457 .storage_changesets_range(database_start..=database_end - 1)?;
1458 changesets.extend(db_changesets);
1459 }
1460
1461 changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1462
1463 Ok(changesets)
1464 }
1465}
1466
1467impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1468 fn account_block_changeset(
1469 &self,
1470 block_number: BlockNumber,
1471 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1472 if let Some(state) =
1473 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1474 {
1475 let changesets = state
1476 .block_ref()
1477 .execution_output
1478 .state
1479 .reverts
1480 .to_plain_state_reverts()
1481 .accounts
1482 .into_iter()
1483 .flatten()
1484 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1485 .collect();
1486 Ok(changesets)
1487 } else {
1488 let account_history_exists = self
1492 .storage_provider
1493 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1494 .and_then(|checkpoint| {
1495 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1500 })
1501 .unwrap_or(true);
1502
1503 if !account_history_exists {
1504 return Err(ProviderError::StateAtBlockPruned(block_number))
1505 }
1506
1507 self.storage_provider.account_block_changeset(block_number)
1508 }
1509 }
1510
1511 fn get_account_before_block(
1512 &self,
1513 block_number: BlockNumber,
1514 address: Address,
1515 ) -> ProviderResult<Option<AccountBeforeTx>> {
1516 if let Some(state) =
1517 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1518 {
1519 let changeset = state
1521 .block_ref()
1522 .execution_output
1523 .state
1524 .reverts
1525 .to_plain_state_reverts()
1526 .accounts
1527 .into_iter()
1528 .flatten()
1529 .find(|(addr, _)| addr == &address)
1530 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1531 Ok(changeset)
1532 } else {
1533 let account_history_exists = self
1536 .storage_provider
1537 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1538 .and_then(|checkpoint| {
1539 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1544 })
1545 .unwrap_or(true);
1546
1547 if !account_history_exists {
1548 return Err(ProviderError::StateAtBlockPruned(block_number))
1549 }
1550
1551 self.storage_provider.get_account_before_block(block_number, address)
1553 }
1554 }
1555
1556 fn account_changesets_range(
1557 &self,
1558 range: impl core::ops::RangeBounds<BlockNumber>,
1559 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1560 let range = to_range(range);
1561 let mut changesets = Vec::new();
1562 let database_start = range.start;
1563 let mut database_end = range.end;
1564
1565 if let Some(head_block) = &self.head_block {
1567 database_end = head_block.anchor().number;
1569
1570 for state in head_block.chain() {
1571 let block_changesets = state
1573 .block_ref()
1574 .execution_output
1575 .state
1576 .reverts
1577 .to_plain_state_reverts()
1578 .accounts
1579 .into_iter()
1580 .flatten()
1581 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1582
1583 for changeset in block_changesets {
1584 changesets.push((state.number(), changeset));
1585 }
1586 }
1587 }
1588
1589 if database_start < database_end {
1591 let account_history_exists = self
1593 .storage_provider
1594 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1595 .and_then(|checkpoint| {
1596 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1597 })
1598 .unwrap_or(true);
1599
1600 if !account_history_exists {
1601 return Err(ProviderError::StateAtBlockPruned(database_start))
1602 }
1603
1604 let db_changesets =
1605 self.storage_provider.account_changesets_range(database_start..database_end)?;
1606 changesets.extend(db_changesets);
1607 }
1608
1609 changesets.sort_by_key(|(block_num, _)| *block_num);
1610
1611 Ok(changesets)
1612 }
1613}
1614
1615impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1616 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1618 let state_provider = self.latest_ref()?;
1620 state_provider.basic_account(address)
1621 }
1622}
1623
1624impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1625 type Receipt = ReceiptTy<N>;
1626
1627 fn get_state(
1637 &self,
1638 block: BlockNumber,
1639 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1640 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1641 let state = state.block_ref().execution_outcome().clone();
1642 Ok(Some(ExecutionOutcome::from((state, block))))
1643 } else {
1644 Self::get_state(self, block..=block)
1645 }
1646 }
1647}
1648
1649#[cfg(test)]
1650mod tests {
1651 use crate::{
1652 providers::blockchain_provider::BlockchainProvider,
1653 test_utils::create_test_provider_factory, BlockWriter,
1654 };
1655 use alloy_eips::BlockHashOrNumber;
1656 use alloy_primitives::B256;
1657 use itertools::Itertools;
1658 use rand::Rng;
1659 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1660 use reth_db_api::models::AccountBeforeTx;
1661 use reth_ethereum_primitives::Block;
1662 use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1663 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1664 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1665 use reth_testing_utils::generators::{
1666 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1667 };
1668 use revm_database::BundleState;
1669 use std::{
1670 ops::{Bound, Range, RangeBounds},
1671 sync::Arc,
1672 };
1673
1674 const TEST_BLOCKS_COUNT: usize = 5;
1675
1676 fn random_blocks(
1677 rng: &mut impl Rng,
1678 database_blocks: usize,
1679 in_memory_blocks: usize,
1680 requests_count: Option<Range<u8>>,
1681 withdrawals_count: Option<Range<u8>>,
1682 tx_count: impl RangeBounds<u8>,
1683 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1684 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1685
1686 let tx_start = match tx_count.start_bound() {
1687 Bound::Included(&n) | Bound::Excluded(&n) => n,
1688 Bound::Unbounded => u8::MIN,
1689 };
1690 let tx_end = match tx_count.end_bound() {
1691 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1692 Bound::Unbounded => u8::MAX,
1693 };
1694
1695 let blocks = random_block_range(
1696 rng,
1697 0..=block_range,
1698 BlockRangeParams {
1699 parent: Some(B256::ZERO),
1700 tx_count: tx_start..tx_end,
1701 requests_count,
1702 withdrawals_count,
1703 },
1704 );
1705 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1706 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1707 }
1708
1709 #[test]
1710 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1711 let mut rng = generators::rng();
1713 let factory = create_test_provider_factory();
1714
1715 let blocks = random_block_range(
1717 &mut rng,
1718 0..=10,
1719 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1720 );
1721 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1722
1723 let provider_rw = factory.provider_rw()?;
1725 for block in database_blocks {
1726 provider_rw.insert_block(
1727 &block.clone().try_recover().expect("failed to seal block with senders"),
1728 )?;
1729 }
1730 provider_rw.commit()?;
1731
1732 let provider = BlockchainProvider::new(factory)?;
1734 let consistent_provider = provider.consistent_provider()?;
1735
1736 let first_db_block = database_blocks.first().unwrap();
1738 let first_in_mem_block = in_memory_blocks.first().unwrap();
1739 let last_in_mem_block = in_memory_blocks.last().unwrap();
1740
1741 assert_eq!(
1743 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1744 None
1745 );
1746 assert_eq!(
1747 consistent_provider
1748 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1749 None
1750 );
1751 assert_eq!(
1753 consistent_provider
1754 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1755 None
1756 );
1757
1758 let in_memory_block_senders =
1760 first_in_mem_block.senders().expect("failed to recover senders");
1761 let chain = NewCanonicalChain::Commit {
1762 new: vec![ExecutedBlock {
1763 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1764 first_in_mem_block.clone(),
1765 in_memory_block_senders,
1766 )),
1767 ..Default::default()
1768 }],
1769 };
1770 consistent_provider.canonical_in_memory_state.update_chain(chain);
1771 let consistent_provider = provider.consistent_provider()?;
1772
1773 assert_eq!(
1775 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1776 Some(first_in_mem_block.clone().into_block())
1777 );
1778 assert_eq!(
1779 consistent_provider
1780 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1781 Some(first_in_mem_block.clone().into_block())
1782 );
1783
1784 assert_eq!(
1786 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1787 Some(first_db_block.clone().into_block())
1788 );
1789 assert_eq!(
1790 consistent_provider
1791 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1792 Some(first_db_block.clone().into_block())
1793 );
1794
1795 assert_eq!(
1797 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1798 None
1799 );
1800
1801 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1803 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1804 last_in_mem_block.clone(),
1805 Default::default(),
1806 )),
1807 ..Default::default()
1808 });
1809
1810 assert_eq!(
1812 consistent_provider
1813 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1814 Some(last_in_mem_block.clone_block())
1815 );
1816
1817 Ok(())
1818 }
1819
1820 #[test]
1821 fn test_block_reader_block() -> eyre::Result<()> {
1822 let mut rng = generators::rng();
1824 let factory = create_test_provider_factory();
1825
1826 let blocks = random_block_range(
1828 &mut rng,
1829 0..=10,
1830 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1831 );
1832 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1833
1834 let provider_rw = factory.provider_rw()?;
1836 for block in database_blocks {
1837 provider_rw.insert_block(
1838 &block.clone().try_recover().expect("failed to seal block with senders"),
1839 )?;
1840 }
1841 provider_rw.commit()?;
1842
1843 let provider = BlockchainProvider::new(factory)?;
1845 let consistent_provider = provider.consistent_provider()?;
1846
1847 let first_in_mem_block = in_memory_blocks.first().unwrap();
1849 let first_db_block = database_blocks.first().unwrap();
1851
1852 assert_eq!(
1854 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1855 None
1856 );
1857 assert_eq!(
1858 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1859 None
1860 );
1861
1862 let in_memory_block_senders =
1864 first_in_mem_block.senders().expect("failed to recover senders");
1865 let chain = NewCanonicalChain::Commit {
1866 new: vec![ExecutedBlock {
1867 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1868 first_in_mem_block.clone(),
1869 in_memory_block_senders,
1870 )),
1871 ..Default::default()
1872 }],
1873 };
1874 consistent_provider.canonical_in_memory_state.update_chain(chain);
1875
1876 let consistent_provider = provider.consistent_provider()?;
1877
1878 assert_eq!(
1880 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1881 Some(first_in_mem_block.clone().into_block())
1882 );
1883 assert_eq!(
1884 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1885 Some(first_in_mem_block.clone().into_block())
1886 );
1887
1888 assert_eq!(
1890 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1891 Some(first_db_block.clone().into_block())
1892 );
1893 assert_eq!(
1894 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1895 Some(first_db_block.clone().into_block())
1896 );
1897
1898 Ok(())
1899 }
1900
1901 #[test]
1902 fn test_changeset_reader() -> eyre::Result<()> {
1903 let mut rng = generators::rng();
1904
1905 let (database_blocks, in_memory_blocks) =
1906 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1907
1908 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1909 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1910 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1911
1912 let accounts = random_eoa_accounts(&mut rng, 2);
1913
1914 let (database_changesets, database_state) = random_changeset_range(
1915 &mut rng,
1916 &database_blocks,
1917 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1918 0..0,
1919 0..0,
1920 );
1921 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1922 &mut rng,
1923 &in_memory_blocks,
1924 database_state
1925 .iter()
1926 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1927 0..0,
1928 0..0,
1929 );
1930
1931 let factory = create_test_provider_factory();
1932
1933 let provider_rw = factory.provider_rw()?;
1934 provider_rw.append_blocks_with_state(
1935 database_blocks
1936 .into_iter()
1937 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1938 .collect(),
1939 &ExecutionOutcome {
1940 bundle: BundleState::new(
1941 database_state.into_iter().map(|(address, (account, _))| {
1942 (address, None, Some(account.into()), Default::default())
1943 }),
1944 database_changesets.iter().map(|block_changesets| {
1945 block_changesets.iter().map(|(address, account, _)| {
1946 (*address, Some(Some((*account).into())), [])
1947 })
1948 }),
1949 Vec::new(),
1950 ),
1951 first_block: first_database_block,
1952 ..Default::default()
1953 },
1954 Default::default(),
1955 )?;
1956 provider_rw.commit()?;
1957
1958 let provider = BlockchainProvider::new(factory)?;
1959
1960 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1961 let chain = NewCanonicalChain::Commit {
1962 new: vec![in_memory_blocks
1963 .first()
1964 .map(|block| {
1965 let senders = block.senders().expect("failed to recover senders");
1966 ExecutedBlock {
1967 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1968 block.clone(),
1969 senders,
1970 )),
1971 execution_output: Arc::new(BlockExecutionOutput {
1972 state: BundleState::new(
1973 in_memory_state.into_iter().map(|(address, (account, _))| {
1974 (address, None, Some(account.into()), Default::default())
1975 }),
1976 [in_memory_changesets.iter().map(|(address, account, _)| {
1977 (*address, Some(Some((*account).into())), Vec::new())
1978 })],
1979 [],
1980 ),
1981 result: BlockExecutionResult {
1982 receipts: Default::default(),
1983 requests: Default::default(),
1984 gas_used: 0,
1985 blob_gas_used: 0,
1986 },
1987 }),
1988 ..Default::default()
1989 }
1990 })
1991 .unwrap()],
1992 };
1993 provider.canonical_in_memory_state.update_chain(chain);
1994
1995 let consistent_provider = provider.consistent_provider()?;
1996
1997 assert_eq!(
1998 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1999 database_changesets
2000 .into_iter()
2001 .next_back()
2002 .unwrap()
2003 .into_iter()
2004 .sorted_by_key(|(address, _, _)| *address)
2005 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2006 .collect::<Vec<_>>()
2007 );
2008 assert_eq!(
2009 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2010 in_memory_changesets
2011 .into_iter()
2012 .sorted_by_key(|(address, _, _)| *address)
2013 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2014 .collect::<Vec<_>>()
2015 );
2016
2017 Ok(())
2018 }
2019 #[test]
2020 fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
2021 use alloy_primitives::U256;
2022 use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2023 use reth_primitives_traits::StorageEntry;
2024 use reth_storage_api::StorageSettingsCache;
2025 use std::collections::HashMap;
2026
2027 let address = alloy_primitives::Address::with_last_byte(1);
2028 let account = reth_primitives_traits::Account {
2029 nonce: 1,
2030 balance: U256::from(1000),
2031 bytecode_hash: None,
2032 };
2033 let slot = U256::from(0x42);
2034 let slot_b256 = B256::from(slot);
2035
2036 let mut rng = generators::rng();
2037 let factory = create_test_provider_factory();
2038 factory.set_storage_settings_cache(StorageSettings::v1());
2039
2040 let blocks = random_block_range(
2041 &mut rng,
2042 0..=1,
2043 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2044 );
2045
2046 let provider_rw = factory.provider_rw()?;
2047 provider_rw.append_blocks_with_state(
2048 blocks
2049 .into_iter()
2050 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2051 .collect(),
2052 &ExecutionOutcome {
2053 bundle: BundleState::new(
2054 [(address, None, Some(account.into()), {
2055 let mut s = HashMap::default();
2056 s.insert(slot, (U256::ZERO, U256::from(100)));
2057 s
2058 })],
2059 [
2060 Vec::new(),
2061 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2062 ],
2063 [],
2064 ),
2065 first_block: 0,
2066 ..Default::default()
2067 },
2068 Default::default(),
2069 )?;
2070
2071 provider_rw.tx_ref().put::<tables::PlainStorageState>(
2072 address,
2073 StorageEntry { key: slot_b256, value: U256::from(100) },
2074 )?;
2075 provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
2076
2077 provider_rw.commit()?;
2078
2079 let provider = BlockchainProvider::new(factory)?;
2080 let consistent_provider = provider.consistent_provider()?;
2081
2082 let outcome =
2083 consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2084
2085 let state = &outcome.bundle.state;
2086 let account_state = state.get(&address).expect("should have account in bundle state");
2087 let storage = &account_state.storage;
2088
2089 let storage_slot = storage.get(&slot).expect("should have the slot in storage");
2090
2091 assert_eq!(
2092 storage_slot.present_value,
2093 U256::from(100),
2094 "present_value should be 100 (the actual value in PlainStorageState)"
2095 );
2096
2097 Ok(())
2098 }
2099
2100 #[test]
2101 fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
2102 use alloy_primitives::U256;
2103 use reth_db_api::models::StorageSettings;
2104 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2105 use std::collections::HashMap;
2106
2107 let mut rng = generators::rng();
2108 let factory = create_test_provider_factory();
2109 factory.set_storage_settings_cache(StorageSettings::v1());
2110
2111 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2112
2113 let address = alloy_primitives::Address::with_last_byte(1);
2114 let account = reth_primitives_traits::Account {
2115 nonce: 1,
2116 balance: U256::from(1000),
2117 bytecode_hash: None,
2118 };
2119 let slot = U256::from(0x42);
2120
2121 let provider_rw = factory.provider_rw()?;
2122 provider_rw.append_blocks_with_state(
2123 database_blocks
2124 .into_iter()
2125 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2126 .collect(),
2127 &ExecutionOutcome {
2128 bundle: BundleState::new(
2129 [(address, None, Some(account.into()), {
2130 let mut s = HashMap::default();
2131 s.insert(slot, (U256::ZERO, U256::from(100)));
2132 s
2133 })],
2134 [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2135 [],
2136 ),
2137 first_block: 0,
2138 ..Default::default()
2139 },
2140 Default::default(),
2141 )?;
2142 provider_rw.commit()?;
2143
2144 let provider = BlockchainProvider::new(factory)?;
2145
2146 let in_mem_block = in_memory_blocks.first().unwrap();
2147 let senders = in_mem_block.senders().expect("failed to recover senders");
2148 let chain = NewCanonicalChain::Commit {
2149 new: vec![ExecutedBlock {
2150 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2151 in_mem_block.clone(),
2152 senders,
2153 )),
2154 execution_output: Arc::new(BlockExecutionOutput {
2155 state: BundleState::new(
2156 [(address, None, Some(account.into()), {
2157 let mut s = HashMap::default();
2158 s.insert(slot, (U256::from(100), U256::from(200)));
2159 s
2160 })],
2161 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2162 [],
2163 ),
2164 result: BlockExecutionResult {
2165 receipts: Default::default(),
2166 requests: Default::default(),
2167 gas_used: 0,
2168 blob_gas_used: 0,
2169 },
2170 }),
2171 ..Default::default()
2172 }],
2173 };
2174 provider.canonical_in_memory_state.update_chain(chain);
2175
2176 let consistent_provider = provider.consistent_provider()?;
2177
2178 let db_changeset = consistent_provider.storage_changeset(0)?;
2179 let mem_changeset = consistent_provider.storage_changeset(1)?;
2180
2181 let slot_b256 = B256::from(slot);
2182
2183 assert_eq!(db_changeset.len(), 1);
2184 assert_eq!(mem_changeset.len(), 1);
2185
2186 let db_key = db_changeset[0].1.key;
2187 let mem_key = mem_changeset[0].1.key;
2188
2189 assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2190 assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2191 assert_eq!(
2192 db_key, mem_key,
2193 "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2194 );
2195
2196 Ok(())
2197 }
2198
2199 #[test]
2200 fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2201 use alloy_primitives::U256;
2202 use reth_db_api::models::StorageSettings;
2203 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2204 use std::collections::HashMap;
2205
2206 let mut rng = generators::rng();
2207 let factory = create_test_provider_factory();
2208 factory.set_storage_settings_cache(StorageSettings::v1());
2209
2210 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2211
2212 let address = alloy_primitives::Address::with_last_byte(1);
2213 let account = reth_primitives_traits::Account {
2214 nonce: 1,
2215 balance: U256::from(1000),
2216 bytecode_hash: None,
2217 };
2218 let slot = U256::from(0x42);
2219
2220 let provider_rw = factory.provider_rw()?;
2221 provider_rw.append_blocks_with_state(
2222 database_blocks
2223 .into_iter()
2224 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2225 .collect(),
2226 &ExecutionOutcome {
2227 bundle: BundleState::new(
2228 [(address, None, Some(account.into()), {
2229 let mut s = HashMap::default();
2230 s.insert(slot, (U256::ZERO, U256::from(100)));
2231 s
2232 })],
2233 vec![
2234 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2235 vec![],
2236 ],
2237 [],
2238 ),
2239 first_block: 0,
2240 ..Default::default()
2241 },
2242 Default::default(),
2243 )?;
2244 provider_rw.commit()?;
2245
2246 let provider = BlockchainProvider::new(factory)?;
2247
2248 let in_mem_block = in_memory_blocks.first().unwrap();
2249 let senders = in_mem_block.senders().expect("failed to recover senders");
2250 let chain = NewCanonicalChain::Commit {
2251 new: vec![ExecutedBlock {
2252 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2253 in_mem_block.clone(),
2254 senders,
2255 )),
2256 execution_output: Arc::new(BlockExecutionOutput {
2257 state: BundleState::new(
2258 [(address, None, Some(account.into()), {
2259 let mut s = HashMap::default();
2260 s.insert(slot, (U256::from(100), U256::from(200)));
2261 s
2262 })],
2263 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2264 [],
2265 ),
2266 result: BlockExecutionResult {
2267 receipts: Default::default(),
2268 requests: Default::default(),
2269 gas_used: 0,
2270 blob_gas_used: 0,
2271 },
2272 }),
2273 ..Default::default()
2274 }],
2275 };
2276 provider.canonical_in_memory_state.update_chain(chain);
2277
2278 let consistent_provider = provider.consistent_provider()?;
2279
2280 let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2281
2282 assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2283
2284 let slot_b256 = B256::from(slot);
2285 let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key).collect();
2286
2287 assert_eq!(
2288 keys[0], keys[1],
2289 "same logical slot should produce identical keys whether from DB or memory"
2290 );
2291 assert_eq!(
2292 keys[0], slot_b256,
2293 "keys should be plain/unhashed when use_hashed_state is false"
2294 );
2295
2296 Ok(())
2297 }
2298}