1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4 to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5 BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8 TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13 HashOrNumber,
14};
15use alloy_primitives::{
16 map::{hash_map, HashMap},
17 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
25use reth_prune_types::{PruneCheckpoint, PruneSegment};
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::{
29 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30 StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
224 &self,
225 account_changeset: Vec<(u64, AccountBeforeTx)>,
226 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
227 block_range_end: BlockNumber,
228 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
229 let mut state: BundleStateInit = HashMap::default();
230 let mut reverts: RevertsInit = HashMap::default();
231 let state_provider = self.state_by_block_number_ref(block_range_end)?;
232
233 for (block_number, account_before) in account_changeset.into_iter().rev() {
235 let AccountBeforeTx { info: old_info, address } = account_before;
236 match state.entry(address) {
237 hash_map::Entry::Vacant(entry) => {
238 let new_info = state_provider.basic_account(&address)?;
239 entry.insert((old_info, new_info, HashMap::default()));
240 }
241 hash_map::Entry::Occupied(mut entry) => {
242 entry.get_mut().0 = old_info;
244 }
245 }
246 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
248 }
249
250 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
252 let BlockNumberAddress((block_number, address)) = block_and_address;
253 let account_state = match state.entry(address) {
255 hash_map::Entry::Vacant(entry) => {
256 let present_info = state_provider.basic_account(&address)?;
257 entry.insert((present_info, present_info, HashMap::default()))
258 }
259 hash_map::Entry::Occupied(entry) => entry.into_mut(),
260 };
261
262 match account_state.2.entry(old_storage.key) {
264 hash_map::Entry::Vacant(entry) => {
265 let new_storage_value =
266 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
267 entry.insert((old_storage.value, new_storage_value));
268 }
269 hash_map::Entry::Occupied(mut entry) => {
270 entry.get_mut().0 = old_storage.value;
271 }
272 };
273
274 reverts
275 .entry(block_number)
276 .or_default()
277 .entry(address)
278 .or_default()
279 .1
280 .push(old_storage);
281 }
282
283 Ok((state, reverts))
284 }
285
286 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
298 &self,
299 range: impl RangeBounds<BlockNumber>,
300 fetch_db_range: F,
301 map_block_state_item: G,
302 mut predicate: P,
303 ) -> ProviderResult<Vec<T>>
304 where
305 F: FnOnce(
306 &DatabaseProviderRO<N::DB, N>,
307 RangeInclusive<BlockNumber>,
308 &mut P,
309 ) -> ProviderResult<Vec<T>>,
310 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
311 P: FnMut(&T) -> bool,
312 {
313 let mut in_memory_chain =
321 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
322 let db_provider = &self.storage_provider;
323
324 let (start, end) = self.convert_range_bounds(range, || {
325 in_memory_chain
327 .first()
328 .map(|b| b.number())
329 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
330 });
331
332 if start > end {
333 return Ok(vec![])
334 }
335
336 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
341 Some(lowest_memory_block) if lowest_memory_block <= end => {
342 let highest_memory_block =
343 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
344
345 let in_memory_range =
349 lowest_memory_block.max(start)..=end.min(highest_memory_block);
350
351 in_memory_chain.truncate(
354 in_memory_chain
355 .len()
356 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
357 );
358
359 let storage_range =
360 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
361
362 (Some((in_memory_chain, in_memory_range)), storage_range)
363 }
364 _ => {
365 drop(in_memory_chain);
367
368 (None, Some(start..=end))
369 }
370 };
371
372 let mut items = Vec::with_capacity((end - start + 1) as usize);
373
374 if let Some(storage_range) = storage_range {
375 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
376 items.append(&mut db_items);
377
378 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
381 return Ok(items)
382 }
383 }
384
385 if let Some((in_memory_chain, in_memory_range)) = in_memory {
386 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
387 debug_assert!(num == block.number());
388 if let Some(item) = map_block_state_item(block, &mut predicate) {
389 items.push(item);
390 } else {
391 break
392 }
393 }
394 }
395
396 Ok(items)
397 }
398
399 fn block_state_provider_ref(
401 &self,
402 state: &BlockState<N::Primitives>,
403 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
404 let anchor_hash = state.anchor().hash;
405 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
406 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
407 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
408 }
409
410 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
416 &self,
417 range: impl RangeBounds<BlockNumber>,
418 fetch_from_db: S,
419 fetch_from_block_state: M,
420 ) -> ProviderResult<Vec<R>>
421 where
422 S: FnOnce(
423 &DatabaseProviderRO<N::DB, N>,
424 RangeInclusive<TxNumber>,
425 ) -> ProviderResult<Vec<R>>,
426 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
427 {
428 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
429 let provider = &self.storage_provider;
430
431 let last_database_block_number = in_mem_chain
434 .last()
435 .map(|b| Ok(b.anchor().number))
436 .unwrap_or_else(|| provider.last_block_number())?;
437
438 let last_block_body_index = provider
441 .block_body_indices(last_database_block_number)?
442 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
443 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
444
445 let (start, end) = self.convert_range_bounds(range, || {
446 in_mem_chain
447 .iter()
448 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
449 .sum::<u64>() +
450 last_block_body_index.last_tx_num()
451 });
452
453 if start > end {
454 return Ok(vec![])
455 }
456
457 let mut tx_range = start..=end;
458
459 if *tx_range.end() < in_memory_tx_num {
462 return fetch_from_db(provider, tx_range);
463 }
464
465 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
466
467 if *tx_range.start() < in_memory_tx_num {
469 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
471
472 tx_range = in_memory_tx_num..=*tx_range.end();
474
475 items.extend(fetch_from_db(provider, db_range)?);
476 }
477
478 for block_state in in_mem_chain.iter().rev() {
480 let block_tx_count =
481 block_state.block_ref().recovered_block().body().transactions().len();
482 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
483
484 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
487 in_memory_tx_num += block_tx_count as u64;
488 continue
489 }
490
491 let skip = (tx_range.start() - in_memory_tx_num) as usize;
493
494 items.extend(fetch_from_block_state(
495 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
496 block_state,
497 )?);
498
499 in_memory_tx_num += block_tx_count as u64;
500
501 if in_memory_tx_num > *tx_range.end() {
503 break
504 }
505
506 tx_range = in_memory_tx_num..=*tx_range.end();
508 }
509
510 Ok(items)
511 }
512
513 fn get_in_memory_or_storage_by_tx<S, M, R>(
516 &self,
517 id: HashOrNumber,
518 fetch_from_db: S,
519 fetch_from_block_state: M,
520 ) -> ProviderResult<Option<R>>
521 where
522 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
523 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
524 {
525 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
526 let provider = &self.storage_provider;
527
528 let last_database_block_number = in_mem_chain
531 .last()
532 .map(|b| Ok(b.anchor().number))
533 .unwrap_or_else(|| provider.last_block_number())?;
534
535 let last_block_body_index = provider
538 .block_body_indices(last_database_block_number)?
539 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
540 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
541
542 if let HashOrNumber::Number(id) = id &&
545 id < in_memory_tx_num
546 {
547 return fetch_from_db(provider)
548 }
549
550 for block_state in in_mem_chain.iter().rev() {
552 let executed_block = block_state.block_ref();
553 let block = executed_block.recovered_block();
554
555 for tx_index in 0..block.body().transactions().len() {
556 match id {
557 HashOrNumber::Hash(tx_hash) => {
558 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
559 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560 }
561 }
562 HashOrNumber::Number(id) => {
563 if id == in_memory_tx_num {
564 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565 }
566 }
567 }
568
569 in_memory_tx_num += 1;
570 }
571 }
572
573 if let HashOrNumber::Hash(_) = id {
575 return fetch_from_db(provider)
576 }
577
578 Ok(None)
579 }
580
581 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
583 &self,
584 id: BlockHashOrNumber,
585 fetch_from_db: S,
586 fetch_from_block_state: M,
587 ) -> ProviderResult<R>
588 where
589 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
590 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
591 {
592 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
593 return fetch_from_block_state(block_state)
594 }
595 fetch_from_db(&self.storage_provider)
596 }
597
598 pub(crate) fn into_state_provider_at_block_hash(
600 self,
601 block_hash: BlockHash,
602 ) -> ProviderResult<StateProviderBox> {
603 let block_number =
605 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
606 self.ensure_canonical_block(block_number)?;
607
608 let Self { storage_provider, head_block, .. } = self;
609 if let Some(Some(block_state)) =
610 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
611 {
612 let anchor_hash = block_state.anchor().hash;
613 let block_number = storage_provider
614 .block_number(anchor_hash)?
615 .ok_or(ProviderError::BlockHashNotFound(anchor_hash))?;
616 let latest_historical = storage_provider.try_into_history_at_block(block_number)?;
617 return Ok(Box::new(block_state.state_provider(latest_historical)));
618 }
619 storage_provider.try_into_history_at_block(block_number)
620 }
621}
622
623impl<N: ProviderNodeTypes> ConsistentProvider<N> {
624 #[inline]
633 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
634 let latest = self.best_block_number()?;
635 if block_number > latest {
636 Err(ProviderError::HeaderNotFound(block_number.into()))
637 } else {
638 Ok(())
639 }
640 }
641}
642
643impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
644 type Primitives = N::Primitives;
645}
646
647impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
648 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
649 self.storage_provider.static_file_provider()
650 }
651
652 fn get_static_file_writer(
653 &self,
654 block: BlockNumber,
655 segment: StaticFileSegment,
656 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
657 self.storage_provider.get_static_file_writer(block, segment)
658 }
659}
660
661impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
662 type Header = HeaderTy<N>;
663
664 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
665 self.get_in_memory_or_storage_by_block(
666 block_hash.into(),
667 |db_provider| db_provider.header(block_hash),
668 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
669 )
670 }
671
672 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
673 self.get_in_memory_or_storage_by_block(
674 num.into(),
675 |db_provider| db_provider.header_by_number(num),
676 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
677 )
678 }
679
680 fn headers_range(
681 &self,
682 range: impl RangeBounds<BlockNumber>,
683 ) -> ProviderResult<Vec<Self::Header>> {
684 self.get_in_memory_or_storage_by_block_range_while(
685 range,
686 |db_provider, range, _| db_provider.headers_range(range),
687 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
688 |_| true,
689 )
690 }
691
692 fn sealed_header(
693 &self,
694 number: BlockNumber,
695 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
696 self.get_in_memory_or_storage_by_block(
697 number.into(),
698 |db_provider| db_provider.sealed_header(number),
699 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
700 )
701 }
702
703 fn sealed_headers_range(
704 &self,
705 range: impl RangeBounds<BlockNumber>,
706 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
707 self.get_in_memory_or_storage_by_block_range_while(
708 range,
709 |db_provider, range, _| db_provider.sealed_headers_range(range),
710 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
711 |_| true,
712 )
713 }
714
715 fn sealed_headers_while(
716 &self,
717 range: impl RangeBounds<BlockNumber>,
718 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
719 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
720 self.get_in_memory_or_storage_by_block_range_while(
721 range,
722 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
723 |block_state, predicate| {
724 let header = block_state.block_ref().recovered_block().sealed_header();
725 predicate(header).then(|| header.clone())
726 },
727 predicate,
728 )
729 }
730}
731
732impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
733 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
734 self.get_in_memory_or_storage_by_block(
735 number.into(),
736 |db_provider| db_provider.block_hash(number),
737 |block_state| Ok(Some(block_state.hash())),
738 )
739 }
740
741 fn canonical_hashes_range(
742 &self,
743 start: BlockNumber,
744 end: BlockNumber,
745 ) -> ProviderResult<Vec<B256>> {
746 self.get_in_memory_or_storage_by_block_range_while(
747 start..end,
748 |db_provider, inclusive_range, _| {
749 db_provider
750 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
751 },
752 |block_state, _| Some(block_state.hash()),
753 |_| true,
754 )
755 }
756}
757
758impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
759 fn chain_info(&self) -> ProviderResult<ChainInfo> {
760 let best_number = self.best_block_number()?;
761 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
762 }
763
764 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
765 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
766 }
767
768 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
769 self.storage_provider.last_block_number()
770 }
771
772 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
773 self.get_in_memory_or_storage_by_block(
774 hash.into(),
775 |db_provider| db_provider.block_number(hash),
776 |block_state| Ok(Some(block_state.number())),
777 )
778 }
779}
780
781impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
782 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
783 Ok(self.canonical_in_memory_state.pending_block_num_hash())
784 }
785
786 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
787 Ok(self.canonical_in_memory_state.get_safe_num_hash())
788 }
789
790 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
791 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
792 }
793}
794
795impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
796 type Block = BlockTy<N>;
797
798 fn find_block_by_hash(
799 &self,
800 hash: B256,
801 source: BlockSource,
802 ) -> ProviderResult<Option<Self::Block>> {
803 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
804 let Some(block) = self.get_in_memory_or_storage_by_block(
805 hash.into(),
806 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
807 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
808 )?
809 {
810 return Ok(Some(block))
811 }
812
813 if matches!(source, BlockSource::Pending | BlockSource::Any) {
814 return Ok(self
815 .canonical_in_memory_state
816 .pending_block()
817 .filter(|b| b.hash() == hash)
818 .map(|b| b.into_block()))
819 }
820
821 Ok(None)
822 }
823
824 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
825 self.get_in_memory_or_storage_by_block(
826 id,
827 |db_provider| db_provider.block(id),
828 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
829 )
830 }
831
832 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
833 Ok(self.canonical_in_memory_state.pending_recovered_block())
834 }
835
836 fn pending_block_and_receipts(
837 &self,
838 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
839 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
840 }
841
842 fn recovered_block(
849 &self,
850 id: BlockHashOrNumber,
851 transaction_kind: TransactionVariant,
852 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
853 self.get_in_memory_or_storage_by_block(
854 id,
855 |db_provider| db_provider.recovered_block(id, transaction_kind),
856 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
857 )
858 }
859
860 fn sealed_block_with_senders(
861 &self,
862 id: BlockHashOrNumber,
863 transaction_kind: TransactionVariant,
864 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
865 self.get_in_memory_or_storage_by_block(
866 id,
867 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
868 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
869 )
870 }
871
872 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
873 self.get_in_memory_or_storage_by_block_range_while(
874 range,
875 |db_provider, range, _| db_provider.block_range(range),
876 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
877 |_| true,
878 )
879 }
880
881 fn block_with_senders_range(
882 &self,
883 range: RangeInclusive<BlockNumber>,
884 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
885 self.get_in_memory_or_storage_by_block_range_while(
886 range,
887 |db_provider, range, _| db_provider.block_with_senders_range(range),
888 |block_state, _| Some(block_state.block().recovered_block().clone()),
889 |_| true,
890 )
891 }
892
893 fn recovered_block_range(
894 &self,
895 range: RangeInclusive<BlockNumber>,
896 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
897 self.get_in_memory_or_storage_by_block_range_while(
898 range,
899 |db_provider, range, _| db_provider.recovered_block_range(range),
900 |block_state, _| Some(block_state.block().recovered_block().clone()),
901 |_| true,
902 )
903 }
904
905 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
906 self.get_in_memory_or_storage_by_tx(
907 id.into(),
908 |db_provider| db_provider.block_by_transaction_id(id),
909 |_, _, block_state| Ok(Some(block_state.number())),
910 )
911 }
912}
913
914impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
915 type Transaction = TxTy<N>;
916
917 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
918 self.get_in_memory_or_storage_by_tx(
919 tx_hash.into(),
920 |db_provider| db_provider.transaction_id(tx_hash),
921 |_, tx_number, _| Ok(Some(tx_number)),
922 )
923 }
924
925 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
926 self.get_in_memory_or_storage_by_tx(
927 id.into(),
928 |provider| provider.transaction_by_id(id),
929 |tx_index, _, block_state| {
930 Ok(block_state
931 .block_ref()
932 .recovered_block()
933 .body()
934 .transactions()
935 .get(tx_index)
936 .cloned())
937 },
938 )
939 }
940
941 fn transaction_by_id_unhashed(
942 &self,
943 id: TxNumber,
944 ) -> ProviderResult<Option<Self::Transaction>> {
945 self.get_in_memory_or_storage_by_tx(
946 id.into(),
947 |provider| provider.transaction_by_id_unhashed(id),
948 |tx_index, _, block_state| {
949 Ok(block_state
950 .block_ref()
951 .recovered_block()
952 .body()
953 .transactions()
954 .get(tx_index)
955 .cloned())
956 },
957 )
958 }
959
960 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
961 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
962 return Ok(Some(tx))
963 }
964
965 self.storage_provider.transaction_by_hash(hash)
966 }
967
968 fn transaction_by_hash_with_meta(
969 &self,
970 tx_hash: TxHash,
971 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
972 if let Some((tx, meta)) =
973 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
974 {
975 return Ok(Some((tx, meta)))
976 }
977
978 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
979 }
980
981 fn transactions_by_block(
982 &self,
983 id: BlockHashOrNumber,
984 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
985 self.get_in_memory_or_storage_by_block(
986 id,
987 |provider| provider.transactions_by_block(id),
988 |block_state| {
989 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
990 },
991 )
992 }
993
994 fn transactions_by_block_range(
995 &self,
996 range: impl RangeBounds<BlockNumber>,
997 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
998 self.get_in_memory_or_storage_by_block_range_while(
999 range,
1000 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1001 |block_state, _| {
1002 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1003 },
1004 |_| true,
1005 )
1006 }
1007
1008 fn transactions_by_tx_range(
1009 &self,
1010 range: impl RangeBounds<TxNumber>,
1011 ) -> ProviderResult<Vec<Self::Transaction>> {
1012 self.get_in_memory_or_storage_by_tx_range(
1013 range,
1014 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1015 |index_range, block_state| {
1016 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1017 .to_vec())
1018 },
1019 )
1020 }
1021
1022 fn senders_by_tx_range(
1023 &self,
1024 range: impl RangeBounds<TxNumber>,
1025 ) -> ProviderResult<Vec<Address>> {
1026 self.get_in_memory_or_storage_by_tx_range(
1027 range,
1028 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1029 |index_range, block_state| {
1030 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1031 },
1032 )
1033 }
1034
1035 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1036 self.get_in_memory_or_storage_by_tx(
1037 id.into(),
1038 |provider| provider.transaction_sender(id),
1039 |tx_index, _, block_state| {
1040 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1041 },
1042 )
1043 }
1044}
1045
1046impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1047 type Receipt = ReceiptTy<N>;
1048
1049 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1050 self.get_in_memory_or_storage_by_tx(
1051 id.into(),
1052 |provider| provider.receipt(id),
1053 |tx_index, _, block_state| {
1054 Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1055 },
1056 )
1057 }
1058
1059 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1060 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1061 let executed_block = block_state.block_ref();
1062 let block = executed_block.recovered_block();
1063 let receipts = block_state.executed_block_receipts_ref();
1064
1065 debug_assert_eq!(
1067 block.body().transactions().len(),
1068 receipts.len(),
1069 "Mismatch between transaction and receipt count"
1070 );
1071
1072 if let Some(tx_index) =
1073 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1074 {
1075 return Ok(receipts.get(tx_index).cloned());
1077 }
1078 }
1079
1080 self.storage_provider.receipt_by_hash(hash)
1081 }
1082
1083 fn receipts_by_block(
1084 &self,
1085 block: BlockHashOrNumber,
1086 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1087 self.get_in_memory_or_storage_by_block(
1088 block,
1089 |db_provider| db_provider.receipts_by_block(block),
1090 |block_state| Ok(Some(block_state.executed_block_receipts())),
1091 )
1092 }
1093
1094 fn receipts_by_tx_range(
1095 &self,
1096 range: impl RangeBounds<TxNumber>,
1097 ) -> ProviderResult<Vec<Self::Receipt>> {
1098 self.get_in_memory_or_storage_by_tx_range(
1099 range,
1100 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1101 |index_range, block_state| {
1102 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1103 },
1104 )
1105 }
1106
1107 fn receipts_by_block_range(
1108 &self,
1109 block_range: RangeInclusive<BlockNumber>,
1110 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1111 self.storage_provider.receipts_by_block_range(block_range)
1112 }
1113}
1114
1115impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1116 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1117 match block {
1118 BlockId::Hash(rpc_block_hash) => {
1119 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1120 if receipts.is_none() &&
1121 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1122 let Some(state) = self
1123 .head_block
1124 .as_ref()
1125 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1126 {
1127 receipts = Some(state.executed_block_receipts());
1128 }
1129 Ok(receipts)
1130 }
1131 BlockId::Number(num_tag) => match num_tag {
1132 BlockNumberOrTag::Pending => Ok(self
1133 .canonical_in_memory_state
1134 .pending_state()
1135 .map(|block_state| block_state.executed_block_receipts())),
1136 _ => {
1137 if let Some(num) = self.convert_block_number(num_tag)? {
1138 self.receipts_by_block(num.into())
1139 } else {
1140 Ok(None)
1141 }
1142 }
1143 },
1144 }
1145 }
1146}
1147
1148impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1149 fn block_body_indices(
1150 &self,
1151 number: BlockNumber,
1152 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1153 self.get_in_memory_or_storage_by_block(
1154 number.into(),
1155 |db_provider| db_provider.block_body_indices(number),
1156 |block_state| {
1157 let last_storage_block_number = block_state.anchor().number;
1159 let mut stored_indices = self
1160 .storage_provider
1161 .block_body_indices(last_storage_block_number)?
1162 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1163
1164 stored_indices.first_tx_num = stored_indices.next_tx_num();
1166 stored_indices.tx_count = 0;
1167
1168 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1170 let block_tx_count =
1171 state.block_ref().recovered_block().body().transactions().len() as u64;
1172 if state.block_ref().recovered_block().number() == number {
1173 stored_indices.tx_count = block_tx_count;
1174 } else {
1175 stored_indices.first_tx_num += block_tx_count;
1176 }
1177 }
1178
1179 Ok(Some(stored_indices))
1180 },
1181 )
1182 }
1183
1184 fn block_body_indices_range(
1185 &self,
1186 range: RangeInclusive<BlockNumber>,
1187 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1188 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1189 }
1190}
1191
1192impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1193 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1194 self.storage_provider.get_stage_checkpoint(id)
1195 }
1196
1197 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1198 self.storage_provider.get_stage_checkpoint_progress(id)
1199 }
1200
1201 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1202 self.storage_provider.get_all_checkpoints()
1203 }
1204}
1205
1206impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1207 fn get_prune_checkpoint(
1208 &self,
1209 segment: PruneSegment,
1210 ) -> ProviderResult<Option<PruneCheckpoint>> {
1211 self.storage_provider.get_prune_checkpoint(segment)
1212 }
1213
1214 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1215 self.storage_provider.get_prune_checkpoints()
1216 }
1217}
1218
1219impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1220 type ChainSpec = N::ChainSpec;
1221
1222 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1223 ChainSpecProvider::chain_spec(&self.storage_provider)
1224 }
1225}
1226
1227impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1228 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1229 match id {
1230 BlockId::Number(num) => self.block_by_number_or_tag(num),
1231 BlockId::Hash(hash) => {
1232 if Some(true) == hash.require_canonical {
1237 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1239 } else {
1240 self.block_by_hash(hash.block_hash)
1241 }
1242 }
1243 }
1244 }
1245
1246 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1247 Ok(match id {
1248 BlockNumberOrTag::Latest => {
1249 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1250 }
1251 BlockNumberOrTag::Finalized => {
1252 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1253 }
1254 BlockNumberOrTag::Safe => {
1255 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1256 }
1257 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1258 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1259
1260 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1261 })
1262 }
1263
1264 fn sealed_header_by_number_or_tag(
1265 &self,
1266 id: BlockNumberOrTag,
1267 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1268 match id {
1269 BlockNumberOrTag::Latest => {
1270 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1271 }
1272 BlockNumberOrTag::Finalized => {
1273 Ok(self.canonical_in_memory_state.get_finalized_header())
1274 }
1275 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1276 BlockNumberOrTag::Earliest => self
1277 .header_by_number(self.earliest_block_number()?)?
1278 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1279 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1280 BlockNumberOrTag::Number(num) => self
1281 .header_by_number(num)?
1282 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1283 }
1284 }
1285
1286 fn sealed_header_by_id(
1287 &self,
1288 id: BlockId,
1289 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1290 Ok(match id {
1291 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1292 BlockId::Hash(hash) => self
1293 .header(hash.block_hash)?
1294 .map(|header| SealedHeader::new(header, hash.block_hash)),
1295 })
1296 }
1297
1298 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1299 Ok(match id {
1300 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1301 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1302 })
1303 }
1304}
1305
1306impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1307 fn storage_changeset(
1308 &self,
1309 block_number: BlockNumber,
1310 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1311 if let Some(state) =
1312 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1313 {
1314 let changesets = state
1315 .block()
1316 .execution_output
1317 .state
1318 .reverts
1319 .to_plain_state_reverts()
1320 .storage
1321 .into_iter()
1322 .flatten()
1323 .flat_map(|revert: PlainStorageRevert| {
1324 revert.storage_revert.into_iter().map(move |(key, value)| {
1325 let plain_key = B256::from(key.to_be_bytes());
1326 (
1327 BlockNumberAddress((block_number, revert.address)),
1328 StorageEntry { key: plain_key, value: value.to_previous_value() },
1329 )
1330 })
1331 })
1332 .collect();
1333 Ok(changesets)
1334 } else {
1335 let storage_history_exists = self
1339 .storage_provider
1340 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1341 .and_then(|checkpoint| {
1342 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1347 })
1348 .unwrap_or(true);
1349
1350 if !storage_history_exists {
1351 return Err(ProviderError::StateAtBlockPruned(block_number))
1352 }
1353
1354 self.storage_provider.storage_changeset(block_number)
1355 }
1356 }
1357
1358 fn get_storage_before_block(
1359 &self,
1360 block_number: BlockNumber,
1361 address: Address,
1362 storage_key: B256,
1363 ) -> ProviderResult<Option<StorageEntry>> {
1364 if let Some(state) =
1365 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1366 {
1367 let changeset = state
1368 .block_ref()
1369 .execution_output
1370 .state
1371 .reverts
1372 .to_plain_state_reverts()
1373 .storage
1374 .into_iter()
1375 .flatten()
1376 .find_map(|revert: PlainStorageRevert| {
1377 if revert.address != address {
1378 return None
1379 }
1380 revert.storage_revert.into_iter().find_map(|(key, value)| {
1381 let plain_key = B256::from(key.to_be_bytes());
1382 (plain_key == storage_key).then(|| StorageEntry {
1383 key: plain_key,
1384 value: value.to_previous_value(),
1385 })
1386 })
1387 });
1388 Ok(changeset)
1389 } else {
1390 let storage_history_exists = self
1391 .storage_provider
1392 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1393 .and_then(|checkpoint| {
1394 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1395 })
1396 .unwrap_or(true);
1397
1398 if !storage_history_exists {
1399 return Err(ProviderError::StateAtBlockPruned(block_number))
1400 }
1401
1402 self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1403 }
1404 }
1405
1406 fn storage_changesets_range(
1407 &self,
1408 range: impl RangeBounds<BlockNumber>,
1409 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1410 let range = to_range(range);
1411 let mut changesets = Vec::new();
1412 let database_start = range.start;
1413 let mut database_end = range.end;
1414
1415 if let Some(head_block) = &self.head_block {
1416 database_end = head_block.anchor().number;
1417
1418 for state in head_block.chain() {
1419 let block_changesets = state
1420 .block_ref()
1421 .execution_output
1422 .state
1423 .reverts
1424 .to_plain_state_reverts()
1425 .storage
1426 .into_iter()
1427 .flatten()
1428 .flat_map(|revert: PlainStorageRevert| {
1429 revert.storage_revert.into_iter().map(move |(key, value)| {
1430 let plain_key = B256::from(key.to_be_bytes());
1431 (
1432 BlockNumberAddress((state.number(), revert.address)),
1433 StorageEntry { key: plain_key, value: value.to_previous_value() },
1434 )
1435 })
1436 });
1437
1438 changesets.extend(block_changesets);
1439 }
1440 }
1441
1442 if database_start < database_end {
1443 let storage_history_exists = self
1444 .storage_provider
1445 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1446 .and_then(|checkpoint| {
1447 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1448 })
1449 .unwrap_or(true);
1450
1451 if !storage_history_exists {
1452 return Err(ProviderError::StateAtBlockPruned(database_start))
1453 }
1454
1455 let db_changesets = self
1456 .storage_provider
1457 .storage_changesets_range(database_start..=database_end - 1)?;
1458 changesets.extend(db_changesets);
1459 }
1460
1461 changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1462
1463 Ok(changesets)
1464 }
1465
1466 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1467 let mut count = 0;
1468 if let Some(head_block) = &self.head_block {
1469 for state in head_block.chain() {
1470 count += state
1471 .block_ref()
1472 .execution_output
1473 .state
1474 .reverts
1475 .iter()
1476 .flatten()
1477 .map(|(_, revert)| revert.storage.len())
1478 .sum::<usize>();
1479 }
1480 }
1481
1482 count += self.storage_provider.storage_changeset_count()?;
1483
1484 Ok(count)
1485 }
1486}
1487
1488impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1489 fn account_block_changeset(
1490 &self,
1491 block_number: BlockNumber,
1492 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1493 if let Some(state) =
1494 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1495 {
1496 let changesets = state
1497 .block_ref()
1498 .execution_output
1499 .state
1500 .reverts
1501 .to_plain_state_reverts()
1502 .accounts
1503 .into_iter()
1504 .flatten()
1505 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1506 .collect();
1507 Ok(changesets)
1508 } else {
1509 let account_history_exists = self
1513 .storage_provider
1514 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1515 .and_then(|checkpoint| {
1516 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1521 })
1522 .unwrap_or(true);
1523
1524 if !account_history_exists {
1525 return Err(ProviderError::StateAtBlockPruned(block_number))
1526 }
1527
1528 self.storage_provider.account_block_changeset(block_number)
1529 }
1530 }
1531
1532 fn get_account_before_block(
1533 &self,
1534 block_number: BlockNumber,
1535 address: Address,
1536 ) -> ProviderResult<Option<AccountBeforeTx>> {
1537 if let Some(state) =
1538 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1539 {
1540 let changeset = state
1542 .block_ref()
1543 .execution_output
1544 .state
1545 .reverts
1546 .to_plain_state_reverts()
1547 .accounts
1548 .into_iter()
1549 .flatten()
1550 .find(|(addr, _)| addr == &address)
1551 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1552 Ok(changeset)
1553 } else {
1554 let account_history_exists = self
1557 .storage_provider
1558 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1559 .and_then(|checkpoint| {
1560 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1565 })
1566 .unwrap_or(true);
1567
1568 if !account_history_exists {
1569 return Err(ProviderError::StateAtBlockPruned(block_number))
1570 }
1571
1572 self.storage_provider.get_account_before_block(block_number, address)
1574 }
1575 }
1576
1577 fn account_changesets_range(
1578 &self,
1579 range: impl core::ops::RangeBounds<BlockNumber>,
1580 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1581 let range = to_range(range);
1582 let mut changesets = Vec::new();
1583 let database_start = range.start;
1584 let mut database_end = range.end;
1585
1586 if let Some(head_block) = &self.head_block {
1588 database_end = head_block.anchor().number;
1590
1591 for state in head_block.chain() {
1592 let block_changesets = state
1594 .block_ref()
1595 .execution_output
1596 .state
1597 .reverts
1598 .to_plain_state_reverts()
1599 .accounts
1600 .into_iter()
1601 .flatten()
1602 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1603
1604 for changeset in block_changesets {
1605 changesets.push((state.number(), changeset));
1606 }
1607 }
1608 }
1609
1610 if database_start < database_end {
1612 let account_history_exists = self
1614 .storage_provider
1615 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1616 .and_then(|checkpoint| {
1617 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1618 })
1619 .unwrap_or(true);
1620
1621 if !account_history_exists {
1622 return Err(ProviderError::StateAtBlockPruned(database_start))
1623 }
1624
1625 let db_changesets =
1626 self.storage_provider.account_changesets_range(database_start..database_end)?;
1627 changesets.extend(db_changesets);
1628 }
1629
1630 changesets.sort_by_key(|(block_num, _)| *block_num);
1631
1632 Ok(changesets)
1633 }
1634
1635 fn account_changeset_count(&self) -> ProviderResult<usize> {
1636 let mut count = 0;
1638 if let Some(head_block) = &self.head_block {
1639 for state in head_block.chain() {
1640 count += state.block_ref().execution_output.state.reverts.len();
1641 }
1642 }
1643
1644 count += self.storage_provider.account_changeset_count()?;
1646
1647 Ok(count)
1648 }
1649}
1650
1651impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1652 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1654 let state_provider = self.latest_ref()?;
1656 state_provider.basic_account(address)
1657 }
1658}
1659
1660impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1661 type Receipt = ReceiptTy<N>;
1662
1663 fn get_state(
1673 &self,
1674 block: BlockNumber,
1675 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1676 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1677 let state = state.block_ref().execution_outcome().clone();
1678 Ok(Some(ExecutionOutcome::from((state, block))))
1679 } else {
1680 Self::get_state(self, block..=block)
1681 }
1682 }
1683}
1684
1685#[cfg(test)]
1686mod tests {
1687 use crate::{
1688 providers::blockchain_provider::BlockchainProvider,
1689 test_utils::create_test_provider_factory, BlockWriter,
1690 };
1691 use alloy_eips::BlockHashOrNumber;
1692 use alloy_primitives::B256;
1693 use itertools::Itertools;
1694 use rand::Rng;
1695 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1696 use reth_db_api::models::AccountBeforeTx;
1697 use reth_ethereum_primitives::Block;
1698 use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1699 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1700 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1701 use reth_testing_utils::generators::{
1702 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1703 };
1704 use revm_database::BundleState;
1705 use std::{
1706 ops::{Bound, Range, RangeBounds},
1707 sync::Arc,
1708 };
1709
1710 const TEST_BLOCKS_COUNT: usize = 5;
1711
1712 fn random_blocks(
1713 rng: &mut impl Rng,
1714 database_blocks: usize,
1715 in_memory_blocks: usize,
1716 requests_count: Option<Range<u8>>,
1717 withdrawals_count: Option<Range<u8>>,
1718 tx_count: impl RangeBounds<u8>,
1719 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1720 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1721
1722 let tx_start = match tx_count.start_bound() {
1723 Bound::Included(&n) | Bound::Excluded(&n) => n,
1724 Bound::Unbounded => u8::MIN,
1725 };
1726 let tx_end = match tx_count.end_bound() {
1727 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1728 Bound::Unbounded => u8::MAX,
1729 };
1730
1731 let blocks = random_block_range(
1732 rng,
1733 0..=block_range,
1734 BlockRangeParams {
1735 parent: Some(B256::ZERO),
1736 tx_count: tx_start..tx_end,
1737 requests_count,
1738 withdrawals_count,
1739 },
1740 );
1741 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1742 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1743 }
1744
1745 #[test]
1746 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1747 let mut rng = generators::rng();
1749 let factory = create_test_provider_factory();
1750
1751 let blocks = random_block_range(
1753 &mut rng,
1754 0..=10,
1755 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1756 );
1757 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1758
1759 let provider_rw = factory.provider_rw()?;
1761 for block in database_blocks {
1762 provider_rw.insert_block(
1763 &block.clone().try_recover().expect("failed to seal block with senders"),
1764 )?;
1765 }
1766 provider_rw.commit()?;
1767
1768 let provider = BlockchainProvider::new(factory)?;
1770 let consistent_provider = provider.consistent_provider()?;
1771
1772 let first_db_block = database_blocks.first().unwrap();
1774 let first_in_mem_block = in_memory_blocks.first().unwrap();
1775 let last_in_mem_block = in_memory_blocks.last().unwrap();
1776
1777 assert_eq!(
1779 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1780 None
1781 );
1782 assert_eq!(
1783 consistent_provider
1784 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1785 None
1786 );
1787 assert_eq!(
1789 consistent_provider
1790 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1791 None
1792 );
1793
1794 let in_memory_block_senders =
1796 first_in_mem_block.senders().expect("failed to recover senders");
1797 let chain = NewCanonicalChain::Commit {
1798 new: vec![ExecutedBlock {
1799 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1800 first_in_mem_block.clone(),
1801 in_memory_block_senders,
1802 )),
1803 ..Default::default()
1804 }],
1805 };
1806 consistent_provider.canonical_in_memory_state.update_chain(chain);
1807 let consistent_provider = provider.consistent_provider()?;
1808
1809 assert_eq!(
1811 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1812 Some(first_in_mem_block.clone().into_block())
1813 );
1814 assert_eq!(
1815 consistent_provider
1816 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1817 Some(first_in_mem_block.clone().into_block())
1818 );
1819
1820 assert_eq!(
1822 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1823 Some(first_db_block.clone().into_block())
1824 );
1825 assert_eq!(
1826 consistent_provider
1827 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1828 Some(first_db_block.clone().into_block())
1829 );
1830
1831 assert_eq!(
1833 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1834 None
1835 );
1836
1837 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1839 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1840 last_in_mem_block.clone(),
1841 Default::default(),
1842 )),
1843 ..Default::default()
1844 });
1845
1846 assert_eq!(
1848 consistent_provider
1849 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1850 Some(last_in_mem_block.clone_block())
1851 );
1852
1853 Ok(())
1854 }
1855
1856 #[test]
1857 fn test_block_reader_block() -> eyre::Result<()> {
1858 let mut rng = generators::rng();
1860 let factory = create_test_provider_factory();
1861
1862 let blocks = random_block_range(
1864 &mut rng,
1865 0..=10,
1866 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1867 );
1868 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1869
1870 let provider_rw = factory.provider_rw()?;
1872 for block in database_blocks {
1873 provider_rw.insert_block(
1874 &block.clone().try_recover().expect("failed to seal block with senders"),
1875 )?;
1876 }
1877 provider_rw.commit()?;
1878
1879 let provider = BlockchainProvider::new(factory)?;
1881 let consistent_provider = provider.consistent_provider()?;
1882
1883 let first_in_mem_block = in_memory_blocks.first().unwrap();
1885 let first_db_block = database_blocks.first().unwrap();
1887
1888 assert_eq!(
1890 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1891 None
1892 );
1893 assert_eq!(
1894 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1895 None
1896 );
1897
1898 let in_memory_block_senders =
1900 first_in_mem_block.senders().expect("failed to recover senders");
1901 let chain = NewCanonicalChain::Commit {
1902 new: vec![ExecutedBlock {
1903 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1904 first_in_mem_block.clone(),
1905 in_memory_block_senders,
1906 )),
1907 ..Default::default()
1908 }],
1909 };
1910 consistent_provider.canonical_in_memory_state.update_chain(chain);
1911
1912 let consistent_provider = provider.consistent_provider()?;
1913
1914 assert_eq!(
1916 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1917 Some(first_in_mem_block.clone().into_block())
1918 );
1919 assert_eq!(
1920 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1921 Some(first_in_mem_block.clone().into_block())
1922 );
1923
1924 assert_eq!(
1926 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1927 Some(first_db_block.clone().into_block())
1928 );
1929 assert_eq!(
1930 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1931 Some(first_db_block.clone().into_block())
1932 );
1933
1934 Ok(())
1935 }
1936
1937 #[test]
1938 fn test_changeset_reader() -> eyre::Result<()> {
1939 let mut rng = generators::rng();
1940
1941 let (database_blocks, in_memory_blocks) =
1942 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1943
1944 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1945 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1946 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1947
1948 let accounts = random_eoa_accounts(&mut rng, 2);
1949
1950 let (database_changesets, database_state) = random_changeset_range(
1951 &mut rng,
1952 &database_blocks,
1953 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1954 0..0,
1955 0..0,
1956 );
1957 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1958 &mut rng,
1959 &in_memory_blocks,
1960 database_state
1961 .iter()
1962 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1963 0..0,
1964 0..0,
1965 );
1966
1967 let factory = create_test_provider_factory();
1968
1969 let provider_rw = factory.provider_rw()?;
1970 provider_rw.append_blocks_with_state(
1971 database_blocks
1972 .into_iter()
1973 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1974 .collect(),
1975 &ExecutionOutcome {
1976 bundle: BundleState::new(
1977 database_state.into_iter().map(|(address, (account, _))| {
1978 (address, None, Some(account.into()), Default::default())
1979 }),
1980 database_changesets.iter().map(|block_changesets| {
1981 block_changesets.iter().map(|(address, account, _)| {
1982 (*address, Some(Some((*account).into())), [])
1983 })
1984 }),
1985 Vec::new(),
1986 ),
1987 first_block: first_database_block,
1988 ..Default::default()
1989 },
1990 Default::default(),
1991 )?;
1992 provider_rw.commit()?;
1993
1994 let provider = BlockchainProvider::new(factory)?;
1995
1996 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1997 let chain = NewCanonicalChain::Commit {
1998 new: vec![in_memory_blocks
1999 .first()
2000 .map(|block| {
2001 let senders = block.senders().expect("failed to recover senders");
2002 ExecutedBlock {
2003 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2004 block.clone(),
2005 senders,
2006 )),
2007 execution_output: Arc::new(BlockExecutionOutput {
2008 state: BundleState::new(
2009 in_memory_state.into_iter().map(|(address, (account, _))| {
2010 (address, None, Some(account.into()), Default::default())
2011 }),
2012 [in_memory_changesets.iter().map(|(address, account, _)| {
2013 (*address, Some(Some((*account).into())), Vec::new())
2014 })],
2015 [],
2016 ),
2017 result: BlockExecutionResult {
2018 receipts: Default::default(),
2019 requests: Default::default(),
2020 gas_used: 0,
2021 blob_gas_used: 0,
2022 },
2023 }),
2024 ..Default::default()
2025 }
2026 })
2027 .unwrap()],
2028 };
2029 provider.canonical_in_memory_state.update_chain(chain);
2030
2031 let consistent_provider = provider.consistent_provider()?;
2032
2033 assert_eq!(
2034 consistent_provider.account_block_changeset(last_database_block).unwrap(),
2035 database_changesets
2036 .into_iter()
2037 .next_back()
2038 .unwrap()
2039 .into_iter()
2040 .sorted_by_key(|(address, _, _)| *address)
2041 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2042 .collect::<Vec<_>>()
2043 );
2044 assert_eq!(
2045 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2046 in_memory_changesets
2047 .into_iter()
2048 .sorted_by_key(|(address, _, _)| *address)
2049 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2050 .collect::<Vec<_>>()
2051 );
2052
2053 Ok(())
2054 }
2055 #[test]
2056 fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
2057 use alloy_primitives::U256;
2058 use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
2059 use reth_primitives_traits::StorageEntry;
2060 use reth_storage_api::StorageSettingsCache;
2061 use std::collections::HashMap;
2062
2063 let address = alloy_primitives::Address::with_last_byte(1);
2064 let account = reth_primitives_traits::Account {
2065 nonce: 1,
2066 balance: U256::from(1000),
2067 bytecode_hash: None,
2068 };
2069 let slot = U256::from(0x42);
2070 let slot_b256 = B256::from(slot);
2071
2072 let mut rng = generators::rng();
2073 let factory = create_test_provider_factory();
2074 factory.set_storage_settings_cache(StorageSettings::v1());
2075
2076 let blocks = random_block_range(
2077 &mut rng,
2078 0..=1,
2079 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
2080 );
2081
2082 let provider_rw = factory.provider_rw()?;
2083 provider_rw.append_blocks_with_state(
2084 blocks
2085 .into_iter()
2086 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2087 .collect(),
2088 &ExecutionOutcome {
2089 bundle: BundleState::new(
2090 [(address, None, Some(account.into()), {
2091 let mut s = HashMap::default();
2092 s.insert(slot, (U256::ZERO, U256::from(100)));
2093 s
2094 })],
2095 [
2096 Vec::new(),
2097 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2098 ],
2099 [],
2100 ),
2101 first_block: 0,
2102 ..Default::default()
2103 },
2104 Default::default(),
2105 )?;
2106
2107 provider_rw.tx_ref().put::<tables::PlainStorageState>(
2108 address,
2109 StorageEntry { key: slot_b256, value: U256::from(100) },
2110 )?;
2111 provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
2112
2113 provider_rw.commit()?;
2114
2115 let provider = BlockchainProvider::new(factory)?;
2116 let consistent_provider = provider.consistent_provider()?;
2117
2118 let outcome =
2119 consistent_provider.get_state(1..=1)?.expect("should return execution outcome");
2120
2121 let state = &outcome.bundle.state;
2122 let account_state = state.get(&address).expect("should have account in bundle state");
2123 let storage = &account_state.storage;
2124
2125 let storage_slot = storage.get(&slot).expect("should have the slot in storage");
2126
2127 assert_eq!(
2128 storage_slot.present_value,
2129 U256::from(100),
2130 "present_value should be 100 (the actual value in PlainStorageState)"
2131 );
2132
2133 Ok(())
2134 }
2135
2136 #[test]
2137 fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
2138 use alloy_primitives::U256;
2139 use reth_db_api::models::StorageSettings;
2140 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2141 use std::collections::HashMap;
2142
2143 let mut rng = generators::rng();
2144 let factory = create_test_provider_factory();
2145 factory.set_storage_settings_cache(StorageSettings::v1());
2146
2147 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
2148
2149 let address = alloy_primitives::Address::with_last_byte(1);
2150 let account = reth_primitives_traits::Account {
2151 nonce: 1,
2152 balance: U256::from(1000),
2153 bytecode_hash: None,
2154 };
2155 let slot = U256::from(0x42);
2156
2157 let provider_rw = factory.provider_rw()?;
2158 provider_rw.append_blocks_with_state(
2159 database_blocks
2160 .into_iter()
2161 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2162 .collect(),
2163 &ExecutionOutcome {
2164 bundle: BundleState::new(
2165 [(address, None, Some(account.into()), {
2166 let mut s = HashMap::default();
2167 s.insert(slot, (U256::ZERO, U256::from(100)));
2168 s
2169 })],
2170 [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
2171 [],
2172 ),
2173 first_block: 0,
2174 ..Default::default()
2175 },
2176 Default::default(),
2177 )?;
2178 provider_rw.commit()?;
2179
2180 let provider = BlockchainProvider::new(factory)?;
2181
2182 let in_mem_block = in_memory_blocks.first().unwrap();
2183 let senders = in_mem_block.senders().expect("failed to recover senders");
2184 let chain = NewCanonicalChain::Commit {
2185 new: vec![ExecutedBlock {
2186 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2187 in_mem_block.clone(),
2188 senders,
2189 )),
2190 execution_output: Arc::new(BlockExecutionOutput {
2191 state: BundleState::new(
2192 [(address, None, Some(account.into()), {
2193 let mut s = HashMap::default();
2194 s.insert(slot, (U256::from(100), U256::from(200)));
2195 s
2196 })],
2197 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2198 [],
2199 ),
2200 result: BlockExecutionResult {
2201 receipts: Default::default(),
2202 requests: Default::default(),
2203 gas_used: 0,
2204 blob_gas_used: 0,
2205 },
2206 }),
2207 ..Default::default()
2208 }],
2209 };
2210 provider.canonical_in_memory_state.update_chain(chain);
2211
2212 let consistent_provider = provider.consistent_provider()?;
2213
2214 let db_changeset = consistent_provider.storage_changeset(0)?;
2215 let mem_changeset = consistent_provider.storage_changeset(1)?;
2216
2217 let slot_b256 = B256::from(slot);
2218
2219 assert_eq!(db_changeset.len(), 1);
2220 assert_eq!(mem_changeset.len(), 1);
2221
2222 let db_key = db_changeset[0].1.key;
2223 let mem_key = mem_changeset[0].1.key;
2224
2225 assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2226 assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2227 assert_eq!(
2228 db_key, mem_key,
2229 "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2230 );
2231
2232 Ok(())
2233 }
2234
2235 #[test]
2236 fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2237 use alloy_primitives::U256;
2238 use reth_db_api::models::StorageSettings;
2239 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2240 use std::collections::HashMap;
2241
2242 let mut rng = generators::rng();
2243 let factory = create_test_provider_factory();
2244 factory.set_storage_settings_cache(StorageSettings::v1());
2245
2246 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2247
2248 let address = alloy_primitives::Address::with_last_byte(1);
2249 let account = reth_primitives_traits::Account {
2250 nonce: 1,
2251 balance: U256::from(1000),
2252 bytecode_hash: None,
2253 };
2254 let slot = U256::from(0x42);
2255
2256 let provider_rw = factory.provider_rw()?;
2257 provider_rw.append_blocks_with_state(
2258 database_blocks
2259 .into_iter()
2260 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2261 .collect(),
2262 &ExecutionOutcome {
2263 bundle: BundleState::new(
2264 [(address, None, Some(account.into()), {
2265 let mut s = HashMap::default();
2266 s.insert(slot, (U256::ZERO, U256::from(100)));
2267 s
2268 })],
2269 vec![
2270 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2271 vec![],
2272 ],
2273 [],
2274 ),
2275 first_block: 0,
2276 ..Default::default()
2277 },
2278 Default::default(),
2279 )?;
2280 provider_rw.commit()?;
2281
2282 let provider = BlockchainProvider::new(factory)?;
2283
2284 let in_mem_block = in_memory_blocks.first().unwrap();
2285 let senders = in_mem_block.senders().expect("failed to recover senders");
2286 let chain = NewCanonicalChain::Commit {
2287 new: vec![ExecutedBlock {
2288 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2289 in_mem_block.clone(),
2290 senders,
2291 )),
2292 execution_output: Arc::new(BlockExecutionOutput {
2293 state: BundleState::new(
2294 [(address, None, Some(account.into()), {
2295 let mut s = HashMap::default();
2296 s.insert(slot, (U256::from(100), U256::from(200)));
2297 s
2298 })],
2299 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2300 [],
2301 ),
2302 result: BlockExecutionResult {
2303 receipts: Default::default(),
2304 requests: Default::default(),
2305 gas_used: 0,
2306 blob_gas_used: 0,
2307 },
2308 }),
2309 ..Default::default()
2310 }],
2311 };
2312 provider.canonical_in_memory_state.update_chain(chain);
2313
2314 let consistent_provider = provider.consistent_provider()?;
2315
2316 let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2317
2318 assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2319
2320 let slot_b256 = B256::from(slot);
2321 let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key).collect();
2322
2323 assert_eq!(
2324 keys[0], keys[1],
2325 "same logical slot should produce identical keys whether from DB or memory"
2326 );
2327 assert_eq!(
2328 keys[0], slot_b256,
2329 "keys should be plain/unhashed when use_hashed_state is false"
2330 );
2331
2332 Ok(())
2333 }
2334}