1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12 HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
24use reth_prune_types::{PruneCheckpoint, PruneSegment};
25use reth_stages_types::{StageCheckpoint, StageId};
26use reth_storage_api::{
27 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
28 StorageChangeSetReader, TryIntoHistoricalStateProvider,
29};
30use reth_storage_errors::provider::ProviderResult;
31use revm_database::states::PlainStorageRevert;
32use std::{
33 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
34 sync::Arc,
35};
36use tracing::trace;
37
38#[derive(Debug)]
45#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
47 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
49 head_block: Option<Arc<BlockState<N::Primitives>>>,
51 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
53}
54
55impl<N: ProviderNodeTypes> ConsistentProvider<N> {
56 pub fn new(
62 storage_provider_factory: ProviderFactory<N>,
63 state: CanonicalInMemoryState<N::Primitives>,
64 ) -> ProviderResult<Self> {
65 let head_block = state.head_state();
73 let storage_provider = storage_provider_factory.database_provider_ro()?;
74 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
75 }
76
77 fn convert_range_bounds<T>(
79 &self,
80 range: impl RangeBounds<T>,
81 end_unbounded: impl FnOnce() -> T,
82 ) -> (T, T)
83 where
84 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
85 {
86 let start = match range.start_bound() {
87 Bound::Included(&n) => n,
88 Bound::Excluded(&n) => n + T::from(1u8),
89 Bound::Unbounded => T::from(0u8),
90 };
91
92 let end = match range.end_bound() {
93 Bound::Included(&n) => n,
94 Bound::Excluded(&n) => n - T::from(1u8),
95 Bound::Unbounded => end_unbounded(),
96 };
97
98 (start, end)
99 }
100
101 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
103 trace!(target: "providers::blockchain", "Getting latest block state provider");
104
105 if let Some(state) = &self.head_block {
107 trace!(target: "providers::blockchain", "Using head state for latest state provider");
108 Ok(self.block_state_provider_ref(state)?.boxed())
109 } else {
110 trace!(target: "providers::blockchain", "Using database state for latest state provider");
111 Ok(self.storage_provider.latest())
112 }
113 }
114
115 fn history_by_block_hash_ref<'a>(
116 &'a self,
117 block_hash: BlockHash,
118 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
119 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
120
121 self.get_in_memory_or_storage_by_block(
122 block_hash.into(),
123 |_| self.storage_provider.history_by_block_hash(block_hash),
124 |block_state| {
125 let state_provider = self.block_state_provider_ref(block_state)?;
126 Ok(Box::new(state_provider))
127 },
128 )
129 }
130
131 fn state_by_block_number_ref<'a>(
133 &'a self,
134 number: BlockNumber,
135 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
136 let hash =
137 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
138 self.history_by_block_hash_ref(hash)
139 }
140
141 pub fn get_state(
145 &self,
146 range: RangeInclusive<BlockNumber>,
147 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
148 if range.is_empty() {
149 return Ok(None)
150 }
151 let start_block_number = *range.start();
152 let end_block_number = *range.end();
153
154 let mut block_bodies = Vec::new();
156 for block_num in range.clone() {
157 let block_body = self
158 .block_body_indices(block_num)?
159 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
160 block_bodies.push((block_num, block_body))
161 }
162
163 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
165 else {
166 return Ok(None)
167 };
168 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
169 return Ok(None)
170 };
171
172 let mut account_changeset = Vec::new();
173 for block_num in range.clone() {
174 let changeset =
175 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
176 account_changeset.extend(changeset);
177 }
178
179 let mut storage_changeset = Vec::new();
180 for block_num in range {
181 let changeset = self.storage_changeset(block_num)?;
182 storage_changeset.extend(changeset);
183 }
184
185 let (state, reverts) =
186 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
187
188 let mut receipt_iter =
189 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
190
191 let mut receipts = Vec::with_capacity(block_bodies.len());
192 for (_, block_body) in block_bodies {
194 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
195 for tx_num in block_body.tx_num_range() {
196 let receipt = receipt_iter
197 .next()
198 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
199 block_receipts.push(receipt);
200 }
201 receipts.push(block_receipts);
202 }
203
204 Ok(Some(ExecutionOutcome::new_init(
205 state,
206 reverts,
207 Vec::new(),
209 receipts,
210 start_block_number,
211 Vec::new(),
212 )))
213 }
214
215 fn populate_bundle_state(
219 &self,
220 account_changeset: Vec<(u64, AccountBeforeTx)>,
221 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
222 block_range_end: BlockNumber,
223 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
224 let mut state: BundleStateInit = HashMap::default();
225 let mut reverts: RevertsInit = HashMap::default();
226 let state_provider = self.state_by_block_number_ref(block_range_end)?;
227
228 for (block_number, account_before) in account_changeset.into_iter().rev() {
230 let AccountBeforeTx { info: old_info, address } = account_before;
231 match state.entry(address) {
232 hash_map::Entry::Vacant(entry) => {
233 let new_info = state_provider.basic_account(&address)?;
234 entry.insert((old_info, new_info, HashMap::default()));
235 }
236 hash_map::Entry::Occupied(mut entry) => {
237 entry.get_mut().0 = old_info;
239 }
240 }
241 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
243 }
244
245 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
247 let BlockNumberAddress((block_number, address)) = block_and_address;
248 let account_state = match state.entry(address) {
250 hash_map::Entry::Vacant(entry) => {
251 let present_info = state_provider.basic_account(&address)?;
252 entry.insert((present_info, present_info, HashMap::default()))
253 }
254 hash_map::Entry::Occupied(entry) => entry.into_mut(),
255 };
256
257 match account_state.2.entry(old_storage.key) {
259 hash_map::Entry::Vacant(entry) => {
260 let new_storage_value =
261 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
262 entry.insert((old_storage.value, new_storage_value));
263 }
264 hash_map::Entry::Occupied(mut entry) => {
265 entry.get_mut().0 = old_storage.value;
266 }
267 };
268
269 reverts
270 .entry(block_number)
271 .or_default()
272 .entry(address)
273 .or_default()
274 .1
275 .push(old_storage);
276 }
277
278 Ok((state, reverts))
279 }
280
281 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
293 &self,
294 range: impl RangeBounds<BlockNumber>,
295 fetch_db_range: F,
296 map_block_state_item: G,
297 mut predicate: P,
298 ) -> ProviderResult<Vec<T>>
299 where
300 F: FnOnce(
301 &DatabaseProviderRO<N::DB, N>,
302 RangeInclusive<BlockNumber>,
303 &mut P,
304 ) -> ProviderResult<Vec<T>>,
305 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
306 P: FnMut(&T) -> bool,
307 {
308 let mut in_memory_chain =
316 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
317 let db_provider = &self.storage_provider;
318
319 let (start, end) = self.convert_range_bounds(range, || {
320 in_memory_chain
322 .first()
323 .map(|b| b.number())
324 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
325 });
326
327 if start > end {
328 return Ok(vec![])
329 }
330
331 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
336 Some(lowest_memory_block) if lowest_memory_block <= end => {
337 let highest_memory_block =
338 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
339
340 let in_memory_range =
344 lowest_memory_block.max(start)..=end.min(highest_memory_block);
345
346 in_memory_chain.truncate(
349 in_memory_chain
350 .len()
351 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
352 );
353
354 let storage_range =
355 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
356
357 (Some((in_memory_chain, in_memory_range)), storage_range)
358 }
359 _ => {
360 drop(in_memory_chain);
362
363 (None, Some(start..=end))
364 }
365 };
366
367 let mut items = Vec::with_capacity((end - start + 1) as usize);
368
369 if let Some(storage_range) = storage_range {
370 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
371 items.append(&mut db_items);
372
373 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
376 return Ok(items)
377 }
378 }
379
380 if let Some((in_memory_chain, in_memory_range)) = in_memory {
381 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
382 debug_assert!(num == block.number());
383 if let Some(item) = map_block_state_item(block, &mut predicate) {
384 items.push(item);
385 } else {
386 break
387 }
388 }
389 }
390
391 Ok(items)
392 }
393
394 fn block_state_provider_ref(
396 &self,
397 state: &BlockState<N::Primitives>,
398 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
399 let anchor_hash = state.anchor().hash;
400 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
401 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
402 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
403 }
404
405 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
411 &self,
412 range: impl RangeBounds<BlockNumber>,
413 fetch_from_db: S,
414 fetch_from_block_state: M,
415 ) -> ProviderResult<Vec<R>>
416 where
417 S: FnOnce(
418 &DatabaseProviderRO<N::DB, N>,
419 RangeInclusive<TxNumber>,
420 ) -> ProviderResult<Vec<R>>,
421 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
422 {
423 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
424 let provider = &self.storage_provider;
425
426 let last_database_block_number = in_mem_chain
429 .last()
430 .map(|b| Ok(b.anchor().number))
431 .unwrap_or_else(|| provider.last_block_number())?;
432
433 let last_block_body_index = provider
436 .block_body_indices(last_database_block_number)?
437 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
438 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
439
440 let (start, end) = self.convert_range_bounds(range, || {
441 in_mem_chain
442 .iter()
443 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
444 .sum::<u64>() +
445 last_block_body_index.last_tx_num()
446 });
447
448 if start > end {
449 return Ok(vec![])
450 }
451
452 let mut tx_range = start..=end;
453
454 if *tx_range.end() < in_memory_tx_num {
457 return fetch_from_db(provider, tx_range);
458 }
459
460 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
461
462 if *tx_range.start() < in_memory_tx_num {
464 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
466
467 tx_range = in_memory_tx_num..=*tx_range.end();
469
470 items.extend(fetch_from_db(provider, db_range)?);
471 }
472
473 for block_state in in_mem_chain.iter().rev() {
475 let block_tx_count =
476 block_state.block_ref().recovered_block().body().transactions().len();
477 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
478
479 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
482 in_memory_tx_num += block_tx_count as u64;
483 continue
484 }
485
486 let skip = (tx_range.start() - in_memory_tx_num) as usize;
488
489 items.extend(fetch_from_block_state(
490 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
491 block_state,
492 )?);
493
494 in_memory_tx_num += block_tx_count as u64;
495
496 if in_memory_tx_num > *tx_range.end() {
498 break
499 }
500
501 tx_range = in_memory_tx_num..=*tx_range.end();
503 }
504
505 Ok(items)
506 }
507
508 fn get_in_memory_or_storage_by_tx<S, M, R>(
511 &self,
512 id: HashOrNumber,
513 fetch_from_db: S,
514 fetch_from_block_state: M,
515 ) -> ProviderResult<Option<R>>
516 where
517 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
518 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
519 {
520 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
521 let provider = &self.storage_provider;
522
523 let last_database_block_number = in_mem_chain
526 .last()
527 .map(|b| Ok(b.anchor().number))
528 .unwrap_or_else(|| provider.last_block_number())?;
529
530 let last_block_body_index = provider
533 .block_body_indices(last_database_block_number)?
534 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
535 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
536
537 if let HashOrNumber::Number(id) = id &&
540 id < in_memory_tx_num
541 {
542 return fetch_from_db(provider)
543 }
544
545 for block_state in in_mem_chain.iter().rev() {
547 let executed_block = block_state.block_ref();
548 let block = executed_block.recovered_block();
549
550 for tx_index in 0..block.body().transactions().len() {
551 match id {
552 HashOrNumber::Hash(tx_hash) => {
553 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
554 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
555 }
556 }
557 HashOrNumber::Number(id) => {
558 if id == in_memory_tx_num {
559 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560 }
561 }
562 }
563
564 in_memory_tx_num += 1;
565 }
566 }
567
568 if let HashOrNumber::Hash(_) = id {
570 return fetch_from_db(provider)
571 }
572
573 Ok(None)
574 }
575
576 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
578 &self,
579 id: BlockHashOrNumber,
580 fetch_from_db: S,
581 fetch_from_block_state: M,
582 ) -> ProviderResult<R>
583 where
584 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
585 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
586 {
587 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
588 return fetch_from_block_state(block_state)
589 }
590 fetch_from_db(&self.storage_provider)
591 }
592
593 pub(crate) fn into_state_provider_at_block_hash(
595 self,
596 block_hash: BlockHash,
597 ) -> ProviderResult<Box<dyn StateProvider>> {
598 let Self { storage_provider, head_block, .. } = self;
599 let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
600 let block_number = storage_provider
601 .block_number(block_hash)?
602 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
603 storage_provider.try_into_history_at_block(block_number)
604 };
605 if let Some(Some(block_state)) =
606 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
607 {
608 let anchor_hash = block_state.anchor().hash;
609 let latest_historical = into_history_at_block_hash(anchor_hash)?;
610 return Ok(Box::new(block_state.state_provider(latest_historical)));
611 }
612 into_history_at_block_hash(block_hash)
613 }
614}
615
616impl<N: ProviderNodeTypes> ConsistentProvider<N> {
617 #[inline]
626 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
627 let latest = self.best_block_number()?;
628 if block_number > latest {
629 Err(ProviderError::HeaderNotFound(block_number.into()))
630 } else {
631 Ok(())
632 }
633 }
634}
635
636impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
637 type Primitives = N::Primitives;
638}
639
640impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
641 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
642 self.storage_provider.static_file_provider()
643 }
644}
645
646impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
647 type Header = HeaderTy<N>;
648
649 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
650 self.get_in_memory_or_storage_by_block(
651 block_hash.into(),
652 |db_provider| db_provider.header(block_hash),
653 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
654 )
655 }
656
657 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
658 self.get_in_memory_or_storage_by_block(
659 num.into(),
660 |db_provider| db_provider.header_by_number(num),
661 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
662 )
663 }
664
665 fn header_td(&self, hash: BlockHash) -> ProviderResult<Option<U256>> {
666 if let Some(num) = self.block_number(hash)? {
667 self.header_td_by_number(num)
668 } else {
669 Ok(None)
670 }
671 }
672
673 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
674 let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
675 {
676 if let Some(last_finalized_num_hash) =
683 self.canonical_in_memory_state.get_finalized_num_hash()
684 {
685 last_finalized_num_hash.number
686 } else {
687 self.last_block_number()?
688 }
689 } else {
690 number
692 };
693 self.storage_provider.header_td_by_number(number)
694 }
695
696 fn headers_range(
697 &self,
698 range: impl RangeBounds<BlockNumber>,
699 ) -> ProviderResult<Vec<Self::Header>> {
700 self.get_in_memory_or_storage_by_block_range_while(
701 range,
702 |db_provider, range, _| db_provider.headers_range(range),
703 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
704 |_| true,
705 )
706 }
707
708 fn sealed_header(
709 &self,
710 number: BlockNumber,
711 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
712 self.get_in_memory_or_storage_by_block(
713 number.into(),
714 |db_provider| db_provider.sealed_header(number),
715 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
716 )
717 }
718
719 fn sealed_headers_range(
720 &self,
721 range: impl RangeBounds<BlockNumber>,
722 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
723 self.get_in_memory_or_storage_by_block_range_while(
724 range,
725 |db_provider, range, _| db_provider.sealed_headers_range(range),
726 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
727 |_| true,
728 )
729 }
730
731 fn sealed_headers_while(
732 &self,
733 range: impl RangeBounds<BlockNumber>,
734 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
735 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
736 self.get_in_memory_or_storage_by_block_range_while(
737 range,
738 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
739 |block_state, predicate| {
740 let header = block_state.block_ref().recovered_block().sealed_header();
741 predicate(header).then(|| header.clone())
742 },
743 predicate,
744 )
745 }
746}
747
748impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
749 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
750 self.get_in_memory_or_storage_by_block(
751 number.into(),
752 |db_provider| db_provider.block_hash(number),
753 |block_state| Ok(Some(block_state.hash())),
754 )
755 }
756
757 fn canonical_hashes_range(
758 &self,
759 start: BlockNumber,
760 end: BlockNumber,
761 ) -> ProviderResult<Vec<B256>> {
762 self.get_in_memory_or_storage_by_block_range_while(
763 start..end,
764 |db_provider, inclusive_range, _| {
765 db_provider
766 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
767 },
768 |block_state, _| Some(block_state.hash()),
769 |_| true,
770 )
771 }
772}
773
774impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
775 fn chain_info(&self) -> ProviderResult<ChainInfo> {
776 let best_number = self.best_block_number()?;
777 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
778 }
779
780 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
781 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
782 }
783
784 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
785 self.storage_provider.last_block_number()
786 }
787
788 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
789 self.get_in_memory_or_storage_by_block(
790 hash.into(),
791 |db_provider| db_provider.block_number(hash),
792 |block_state| Ok(Some(block_state.number())),
793 )
794 }
795}
796
797impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
798 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
799 Ok(self.canonical_in_memory_state.pending_block_num_hash())
800 }
801
802 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
803 Ok(self.canonical_in_memory_state.get_safe_num_hash())
804 }
805
806 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
807 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
808 }
809}
810
811impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
812 type Block = BlockTy<N>;
813
814 fn find_block_by_hash(
815 &self,
816 hash: B256,
817 source: BlockSource,
818 ) -> ProviderResult<Option<Self::Block>> {
819 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
820 let Some(block) = self.get_in_memory_or_storage_by_block(
821 hash.into(),
822 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
823 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
824 )?
825 {
826 return Ok(Some(block))
827 }
828
829 if matches!(source, BlockSource::Pending | BlockSource::Any) {
830 return Ok(self
831 .canonical_in_memory_state
832 .pending_block()
833 .filter(|b| b.hash() == hash)
834 .map(|b| b.into_block()))
835 }
836
837 Ok(None)
838 }
839
840 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
841 self.get_in_memory_or_storage_by_block(
842 id,
843 |db_provider| db_provider.block(id),
844 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
845 )
846 }
847
848 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
849 Ok(self.canonical_in_memory_state.pending_recovered_block())
850 }
851
852 fn pending_block_and_receipts(
853 &self,
854 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
855 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
856 }
857
858 fn recovered_block(
865 &self,
866 id: BlockHashOrNumber,
867 transaction_kind: TransactionVariant,
868 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
869 self.get_in_memory_or_storage_by_block(
870 id,
871 |db_provider| db_provider.recovered_block(id, transaction_kind),
872 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
873 )
874 }
875
876 fn sealed_block_with_senders(
877 &self,
878 id: BlockHashOrNumber,
879 transaction_kind: TransactionVariant,
880 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
881 self.get_in_memory_or_storage_by_block(
882 id,
883 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
884 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
885 )
886 }
887
888 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
889 self.get_in_memory_or_storage_by_block_range_while(
890 range,
891 |db_provider, range, _| db_provider.block_range(range),
892 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
893 |_| true,
894 )
895 }
896
897 fn block_with_senders_range(
898 &self,
899 range: RangeInclusive<BlockNumber>,
900 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
901 self.get_in_memory_or_storage_by_block_range_while(
902 range,
903 |db_provider, range, _| db_provider.block_with_senders_range(range),
904 |block_state, _| Some(block_state.block().recovered_block().clone()),
905 |_| true,
906 )
907 }
908
909 fn recovered_block_range(
910 &self,
911 range: RangeInclusive<BlockNumber>,
912 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
913 self.get_in_memory_or_storage_by_block_range_while(
914 range,
915 |db_provider, range, _| db_provider.recovered_block_range(range),
916 |block_state, _| Some(block_state.block().recovered_block().clone()),
917 |_| true,
918 )
919 }
920
921 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
922 self.get_in_memory_or_storage_by_tx(
923 id.into(),
924 |db_provider| db_provider.block_by_transaction_id(id),
925 |_, _, block_state| Ok(Some(block_state.number())),
926 )
927 }
928}
929
930impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
931 type Transaction = TxTy<N>;
932
933 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
934 self.get_in_memory_or_storage_by_tx(
935 tx_hash.into(),
936 |db_provider| db_provider.transaction_id(tx_hash),
937 |_, tx_number, _| Ok(Some(tx_number)),
938 )
939 }
940
941 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
942 self.get_in_memory_or_storage_by_tx(
943 id.into(),
944 |provider| provider.transaction_by_id(id),
945 |tx_index, _, block_state| {
946 Ok(block_state
947 .block_ref()
948 .recovered_block()
949 .body()
950 .transactions()
951 .get(tx_index)
952 .cloned())
953 },
954 )
955 }
956
957 fn transaction_by_id_unhashed(
958 &self,
959 id: TxNumber,
960 ) -> ProviderResult<Option<Self::Transaction>> {
961 self.get_in_memory_or_storage_by_tx(
962 id.into(),
963 |provider| provider.transaction_by_id_unhashed(id),
964 |tx_index, _, block_state| {
965 Ok(block_state
966 .block_ref()
967 .recovered_block()
968 .body()
969 .transactions()
970 .get(tx_index)
971 .cloned())
972 },
973 )
974 }
975
976 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
977 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
978 return Ok(Some(tx))
979 }
980
981 self.storage_provider.transaction_by_hash(hash)
982 }
983
984 fn transaction_by_hash_with_meta(
985 &self,
986 tx_hash: TxHash,
987 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
988 if let Some((tx, meta)) =
989 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
990 {
991 return Ok(Some((tx, meta)))
992 }
993
994 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
995 }
996
997 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
998 self.get_in_memory_or_storage_by_tx(
999 id.into(),
1000 |provider| provider.transaction_block(id),
1001 |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
1002 )
1003 }
1004
1005 fn transactions_by_block(
1006 &self,
1007 id: BlockHashOrNumber,
1008 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1009 self.get_in_memory_or_storage_by_block(
1010 id,
1011 |provider| provider.transactions_by_block(id),
1012 |block_state| {
1013 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
1014 },
1015 )
1016 }
1017
1018 fn transactions_by_block_range(
1019 &self,
1020 range: impl RangeBounds<BlockNumber>,
1021 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1022 self.get_in_memory_or_storage_by_block_range_while(
1023 range,
1024 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1025 |block_state, _| {
1026 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1027 },
1028 |_| true,
1029 )
1030 }
1031
1032 fn transactions_by_tx_range(
1033 &self,
1034 range: impl RangeBounds<TxNumber>,
1035 ) -> ProviderResult<Vec<Self::Transaction>> {
1036 self.get_in_memory_or_storage_by_tx_range(
1037 range,
1038 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1039 |index_range, block_state| {
1040 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1041 .to_vec())
1042 },
1043 )
1044 }
1045
1046 fn senders_by_tx_range(
1047 &self,
1048 range: impl RangeBounds<TxNumber>,
1049 ) -> ProviderResult<Vec<Address>> {
1050 self.get_in_memory_or_storage_by_tx_range(
1051 range,
1052 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1053 |index_range, block_state| {
1054 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1055 },
1056 )
1057 }
1058
1059 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1060 self.get_in_memory_or_storage_by_tx(
1061 id.into(),
1062 |provider| provider.transaction_sender(id),
1063 |tx_index, _, block_state| {
1064 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1065 },
1066 )
1067 }
1068}
1069
1070impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1071 type Receipt = ReceiptTy<N>;
1072
1073 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1074 self.get_in_memory_or_storage_by_tx(
1075 id.into(),
1076 |provider| provider.receipt(id),
1077 |tx_index, _, block_state| {
1078 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1079 },
1080 )
1081 }
1082
1083 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1084 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1085 let executed_block = block_state.block_ref();
1086 let block = executed_block.recovered_block();
1087 let receipts = block_state.executed_block_receipts();
1088
1089 debug_assert_eq!(
1091 block.body().transactions().len(),
1092 receipts.len(),
1093 "Mismatch between transaction and receipt count"
1094 );
1095
1096 if let Some(tx_index) =
1097 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1098 {
1099 return Ok(receipts.get(tx_index).cloned());
1101 }
1102 }
1103
1104 self.storage_provider.receipt_by_hash(hash)
1105 }
1106
1107 fn receipts_by_block(
1108 &self,
1109 block: BlockHashOrNumber,
1110 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1111 self.get_in_memory_or_storage_by_block(
1112 block,
1113 |db_provider| db_provider.receipts_by_block(block),
1114 |block_state| Ok(Some(block_state.executed_block_receipts())),
1115 )
1116 }
1117
1118 fn receipts_by_tx_range(
1119 &self,
1120 range: impl RangeBounds<TxNumber>,
1121 ) -> ProviderResult<Vec<Self::Receipt>> {
1122 self.get_in_memory_or_storage_by_tx_range(
1123 range,
1124 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1125 |index_range, block_state| {
1126 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1127 },
1128 )
1129 }
1130
1131 fn receipts_by_block_range(
1132 &self,
1133 block_range: RangeInclusive<BlockNumber>,
1134 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1135 self.storage_provider.receipts_by_block_range(block_range)
1136 }
1137}
1138
1139impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1140 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1141 match block {
1142 BlockId::Hash(rpc_block_hash) => {
1143 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1144 if receipts.is_none() &&
1145 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1146 let Some(state) = self
1147 .head_block
1148 .as_ref()
1149 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1150 {
1151 receipts = Some(state.executed_block_receipts());
1152 }
1153 Ok(receipts)
1154 }
1155 BlockId::Number(num_tag) => match num_tag {
1156 BlockNumberOrTag::Pending => Ok(self
1157 .canonical_in_memory_state
1158 .pending_state()
1159 .map(|block_state| block_state.executed_block_receipts())),
1160 _ => {
1161 if let Some(num) = self.convert_block_number(num_tag)? {
1162 self.receipts_by_block(num.into())
1163 } else {
1164 Ok(None)
1165 }
1166 }
1167 },
1168 }
1169 }
1170}
1171
1172impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1173 fn block_body_indices(
1174 &self,
1175 number: BlockNumber,
1176 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1177 self.get_in_memory_or_storage_by_block(
1178 number.into(),
1179 |db_provider| db_provider.block_body_indices(number),
1180 |block_state| {
1181 let last_storage_block_number = block_state.anchor().number;
1183 let mut stored_indices = self
1184 .storage_provider
1185 .block_body_indices(last_storage_block_number)?
1186 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1187
1188 stored_indices.first_tx_num = stored_indices.next_tx_num();
1190 stored_indices.tx_count = 0;
1191
1192 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1194 let block_tx_count =
1195 state.block_ref().recovered_block().body().transactions().len() as u64;
1196 if state.block_ref().recovered_block().number() == number {
1197 stored_indices.tx_count = block_tx_count;
1198 } else {
1199 stored_indices.first_tx_num += block_tx_count;
1200 }
1201 }
1202
1203 Ok(Some(stored_indices))
1204 },
1205 )
1206 }
1207
1208 fn block_body_indices_range(
1209 &self,
1210 range: RangeInclusive<BlockNumber>,
1211 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1212 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1213 }
1214}
1215
1216impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1217 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1218 self.storage_provider.get_stage_checkpoint(id)
1219 }
1220
1221 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1222 self.storage_provider.get_stage_checkpoint_progress(id)
1223 }
1224
1225 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1226 self.storage_provider.get_all_checkpoints()
1227 }
1228}
1229
1230impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1231 fn get_prune_checkpoint(
1232 &self,
1233 segment: PruneSegment,
1234 ) -> ProviderResult<Option<PruneCheckpoint>> {
1235 self.storage_provider.get_prune_checkpoint(segment)
1236 }
1237
1238 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1239 self.storage_provider.get_prune_checkpoints()
1240 }
1241}
1242
1243impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1244 type ChainSpec = N::ChainSpec;
1245
1246 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1247 ChainSpecProvider::chain_spec(&self.storage_provider)
1248 }
1249}
1250
1251impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1252 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1253 match id {
1254 BlockId::Number(num) => self.block_by_number_or_tag(num),
1255 BlockId::Hash(hash) => {
1256 if Some(true) == hash.require_canonical {
1261 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1263 } else {
1264 self.block_by_hash(hash.block_hash)
1265 }
1266 }
1267 }
1268 }
1269
1270 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1271 Ok(match id {
1272 BlockNumberOrTag::Latest => {
1273 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1274 }
1275 BlockNumberOrTag::Finalized => {
1276 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1277 }
1278 BlockNumberOrTag::Safe => {
1279 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1280 }
1281 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1282 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1283
1284 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1285 })
1286 }
1287
1288 fn sealed_header_by_number_or_tag(
1289 &self,
1290 id: BlockNumberOrTag,
1291 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1292 match id {
1293 BlockNumberOrTag::Latest => {
1294 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1295 }
1296 BlockNumberOrTag::Finalized => {
1297 Ok(self.canonical_in_memory_state.get_finalized_header())
1298 }
1299 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1300 BlockNumberOrTag::Earliest => self
1301 .header_by_number(self.earliest_block_number()?)?
1302 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1303 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1304 BlockNumberOrTag::Number(num) => self
1305 .header_by_number(num)?
1306 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1307 }
1308 }
1309
1310 fn sealed_header_by_id(
1311 &self,
1312 id: BlockId,
1313 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1314 Ok(match id {
1315 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1316 BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1317 })
1318 }
1319
1320 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1321 Ok(match id {
1322 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1323 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1324 })
1325 }
1326}
1327
1328impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1329 fn storage_changeset(
1330 &self,
1331 block_number: BlockNumber,
1332 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1333 if let Some(state) =
1334 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1335 {
1336 let changesets = state
1337 .block()
1338 .execution_output
1339 .bundle
1340 .reverts
1341 .clone()
1342 .to_plain_state_reverts()
1343 .storage
1344 .into_iter()
1345 .flatten()
1346 .flat_map(|revert: PlainStorageRevert| {
1347 revert.storage_revert.into_iter().map(move |(key, value)| {
1348 (
1349 BlockNumberAddress((block_number, revert.address)),
1350 StorageEntry { key: key.into(), value: value.to_previous_value() },
1351 )
1352 })
1353 })
1354 .collect();
1355 Ok(changesets)
1356 } else {
1357 let storage_history_exists = self
1361 .storage_provider
1362 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1363 .and_then(|checkpoint| {
1364 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1369 })
1370 .unwrap_or(true);
1371
1372 if !storage_history_exists {
1373 return Err(ProviderError::StateAtBlockPruned(block_number))
1374 }
1375
1376 self.storage_provider.storage_changeset(block_number)
1377 }
1378 }
1379}
1380
1381impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1382 fn account_block_changeset(
1383 &self,
1384 block_number: BlockNumber,
1385 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1386 if let Some(state) =
1387 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1388 {
1389 let changesets = state
1390 .block_ref()
1391 .execution_output
1392 .bundle
1393 .reverts
1394 .clone()
1395 .to_plain_state_reverts()
1396 .accounts
1397 .into_iter()
1398 .flatten()
1399 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1400 .collect();
1401 Ok(changesets)
1402 } else {
1403 let account_history_exists = self
1407 .storage_provider
1408 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1409 .and_then(|checkpoint| {
1410 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1415 })
1416 .unwrap_or(true);
1417
1418 if !account_history_exists {
1419 return Err(ProviderError::StateAtBlockPruned(block_number))
1420 }
1421
1422 self.storage_provider.account_block_changeset(block_number)
1423 }
1424 }
1425
1426 fn get_account_before_block(
1427 &self,
1428 block_number: BlockNumber,
1429 address: Address,
1430 ) -> ProviderResult<Option<AccountBeforeTx>> {
1431 if let Some(state) =
1432 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1433 {
1434 let changeset = state
1436 .block_ref()
1437 .execution_output
1438 .bundle
1439 .reverts
1440 .clone()
1441 .to_plain_state_reverts()
1442 .accounts
1443 .into_iter()
1444 .flatten()
1445 .find(|(addr, _)| addr == &address)
1446 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1447 Ok(changeset)
1448 } else {
1449 let account_history_exists = self
1452 .storage_provider
1453 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1454 .and_then(|checkpoint| {
1455 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1460 })
1461 .unwrap_or(true);
1462
1463 if !account_history_exists {
1464 return Err(ProviderError::StateAtBlockPruned(block_number))
1465 }
1466
1467 self.storage_provider.get_account_before_block(block_number, address)
1469 }
1470 }
1471}
1472
1473impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1474 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1476 let state_provider = self.latest_ref()?;
1478 state_provider.basic_account(address)
1479 }
1480}
1481
1482impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1483 type Receipt = ReceiptTy<N>;
1484
1485 fn get_state(
1495 &self,
1496 block: BlockNumber,
1497 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1498 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1499 let state = state.block_ref().execution_outcome().clone();
1500 Ok(Some(state))
1501 } else {
1502 Self::get_state(self, block..=block)
1503 }
1504 }
1505}
1506
1507#[cfg(test)]
1508mod tests {
1509 use crate::{
1510 providers::blockchain_provider::BlockchainProvider,
1511 test_utils::create_test_provider_factory, BlockWriter,
1512 };
1513 use alloy_eips::BlockHashOrNumber;
1514 use alloy_primitives::B256;
1515 use itertools::Itertools;
1516 use rand::Rng;
1517 use reth_chain_state::{
1518 ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates, NewCanonicalChain,
1519 };
1520 use reth_db_api::models::AccountBeforeTx;
1521 use reth_ethereum_primitives::Block;
1522 use reth_execution_types::ExecutionOutcome;
1523 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1524 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1525 use reth_testing_utils::generators::{
1526 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1527 };
1528 use revm_database::BundleState;
1529 use std::{
1530 ops::{Bound, Range, RangeBounds},
1531 sync::Arc,
1532 };
1533
1534 const TEST_BLOCKS_COUNT: usize = 5;
1535
1536 fn random_blocks(
1537 rng: &mut impl Rng,
1538 database_blocks: usize,
1539 in_memory_blocks: usize,
1540 requests_count: Option<Range<u8>>,
1541 withdrawals_count: Option<Range<u8>>,
1542 tx_count: impl RangeBounds<u8>,
1543 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1544 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1545
1546 let tx_start = match tx_count.start_bound() {
1547 Bound::Included(&n) | Bound::Excluded(&n) => n,
1548 Bound::Unbounded => u8::MIN,
1549 };
1550 let tx_end = match tx_count.end_bound() {
1551 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1552 Bound::Unbounded => u8::MAX,
1553 };
1554
1555 let blocks = random_block_range(
1556 rng,
1557 0..=block_range,
1558 BlockRangeParams {
1559 parent: Some(B256::ZERO),
1560 tx_count: tx_start..tx_end,
1561 requests_count,
1562 withdrawals_count,
1563 },
1564 );
1565 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1566 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1567 }
1568
1569 #[test]
1570 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1571 let mut rng = generators::rng();
1573 let factory = create_test_provider_factory();
1574
1575 let blocks = random_block_range(
1577 &mut rng,
1578 0..=10,
1579 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1580 );
1581 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1582
1583 let provider_rw = factory.provider_rw()?;
1585 for block in database_blocks {
1586 provider_rw.insert_block(
1587 block.clone().try_recover().expect("failed to seal block with senders"),
1588 )?;
1589 }
1590 provider_rw.commit()?;
1591
1592 let provider = BlockchainProvider::new(factory)?;
1594 let consistent_provider = provider.consistent_provider()?;
1595
1596 let first_db_block = database_blocks.first().unwrap();
1598 let first_in_mem_block = in_memory_blocks.first().unwrap();
1599 let last_in_mem_block = in_memory_blocks.last().unwrap();
1600
1601 assert_eq!(
1603 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1604 None
1605 );
1606 assert_eq!(
1607 consistent_provider
1608 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1609 None
1610 );
1611 assert_eq!(
1613 consistent_provider
1614 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1615 None
1616 );
1617
1618 let in_memory_block_senders =
1620 first_in_mem_block.senders().expect("failed to recover senders");
1621 let chain = NewCanonicalChain::Commit {
1622 new: vec![ExecutedBlockWithTrieUpdates::new(
1623 Arc::new(RecoveredBlock::new_sealed(
1624 first_in_mem_block.clone(),
1625 in_memory_block_senders,
1626 )),
1627 Default::default(),
1628 Default::default(),
1629 ExecutedTrieUpdates::empty(),
1630 )],
1631 };
1632 consistent_provider.canonical_in_memory_state.update_chain(chain);
1633 let consistent_provider = provider.consistent_provider()?;
1634
1635 assert_eq!(
1637 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1638 Some(first_in_mem_block.clone().into_block())
1639 );
1640 assert_eq!(
1641 consistent_provider
1642 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1643 Some(first_in_mem_block.clone().into_block())
1644 );
1645
1646 assert_eq!(
1648 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1649 Some(first_db_block.clone().into_block())
1650 );
1651 assert_eq!(
1652 consistent_provider
1653 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1654 Some(first_db_block.clone().into_block())
1655 );
1656
1657 assert_eq!(
1659 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1660 None
1661 );
1662
1663 provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1665 block: ExecutedBlock {
1666 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1667 last_in_mem_block.clone(),
1668 Default::default(),
1669 )),
1670 execution_output: Default::default(),
1671 hashed_state: Default::default(),
1672 },
1673 trie: ExecutedTrieUpdates::empty(),
1674 });
1675
1676 assert_eq!(
1678 consistent_provider
1679 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1680 Some(last_in_mem_block.clone_block())
1681 );
1682
1683 Ok(())
1684 }
1685
1686 #[test]
1687 fn test_block_reader_block() -> eyre::Result<()> {
1688 let mut rng = generators::rng();
1690 let factory = create_test_provider_factory();
1691
1692 let blocks = random_block_range(
1694 &mut rng,
1695 0..=10,
1696 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1697 );
1698 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1699
1700 let provider_rw = factory.provider_rw()?;
1702 for block in database_blocks {
1703 provider_rw.insert_block(
1704 block.clone().try_recover().expect("failed to seal block with senders"),
1705 )?;
1706 }
1707 provider_rw.commit()?;
1708
1709 let provider = BlockchainProvider::new(factory)?;
1711 let consistent_provider = provider.consistent_provider()?;
1712
1713 let first_in_mem_block = in_memory_blocks.first().unwrap();
1715 let first_db_block = database_blocks.first().unwrap();
1717
1718 assert_eq!(
1720 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1721 None
1722 );
1723 assert_eq!(
1724 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1725 None
1726 );
1727
1728 let in_memory_block_senders =
1730 first_in_mem_block.senders().expect("failed to recover senders");
1731 let chain = NewCanonicalChain::Commit {
1732 new: vec![ExecutedBlockWithTrieUpdates::new(
1733 Arc::new(RecoveredBlock::new_sealed(
1734 first_in_mem_block.clone(),
1735 in_memory_block_senders,
1736 )),
1737 Default::default(),
1738 Default::default(),
1739 ExecutedTrieUpdates::empty(),
1740 )],
1741 };
1742 consistent_provider.canonical_in_memory_state.update_chain(chain);
1743
1744 let consistent_provider = provider.consistent_provider()?;
1745
1746 assert_eq!(
1748 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1749 Some(first_in_mem_block.clone().into_block())
1750 );
1751 assert_eq!(
1752 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1753 Some(first_in_mem_block.clone().into_block())
1754 );
1755
1756 assert_eq!(
1758 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1759 Some(first_db_block.clone().into_block())
1760 );
1761 assert_eq!(
1762 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1763 Some(first_db_block.clone().into_block())
1764 );
1765
1766 Ok(())
1767 }
1768
1769 #[test]
1770 fn test_changeset_reader() -> eyre::Result<()> {
1771 let mut rng = generators::rng();
1772
1773 let (database_blocks, in_memory_blocks) =
1774 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1775
1776 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1777 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1778 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1779
1780 let accounts = random_eoa_accounts(&mut rng, 2);
1781
1782 let (database_changesets, database_state) = random_changeset_range(
1783 &mut rng,
1784 &database_blocks,
1785 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1786 0..0,
1787 0..0,
1788 );
1789 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1790 &mut rng,
1791 &in_memory_blocks,
1792 database_state
1793 .iter()
1794 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1795 0..0,
1796 0..0,
1797 );
1798
1799 let factory = create_test_provider_factory();
1800
1801 let provider_rw = factory.provider_rw()?;
1802 provider_rw.append_blocks_with_state(
1803 database_blocks
1804 .into_iter()
1805 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1806 .collect(),
1807 &ExecutionOutcome {
1808 bundle: BundleState::new(
1809 database_state.into_iter().map(|(address, (account, _))| {
1810 (address, None, Some(account.into()), Default::default())
1811 }),
1812 database_changesets
1813 .iter()
1814 .map(|block_changesets| {
1815 block_changesets.iter().map(|(address, account, _)| {
1816 (*address, Some(Some((*account).into())), [])
1817 })
1818 })
1819 .collect::<Vec<_>>(),
1820 Vec::new(),
1821 ),
1822 first_block: first_database_block,
1823 ..Default::default()
1824 },
1825 Default::default(),
1826 )?;
1827 provider_rw.commit()?;
1828
1829 let provider = BlockchainProvider::new(factory)?;
1830
1831 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1832 let chain = NewCanonicalChain::Commit {
1833 new: vec![in_memory_blocks
1834 .first()
1835 .map(|block| {
1836 let senders = block.senders().expect("failed to recover senders");
1837 ExecutedBlockWithTrieUpdates::new(
1838 Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1839 Arc::new(ExecutionOutcome {
1840 bundle: BundleState::new(
1841 in_memory_state.into_iter().map(|(address, (account, _))| {
1842 (address, None, Some(account.into()), Default::default())
1843 }),
1844 [in_memory_changesets.iter().map(|(address, account, _)| {
1845 (*address, Some(Some((*account).into())), Vec::new())
1846 })],
1847 [],
1848 ),
1849 first_block: first_in_memory_block,
1850 ..Default::default()
1851 }),
1852 Default::default(),
1853 ExecutedTrieUpdates::empty(),
1854 )
1855 })
1856 .unwrap()],
1857 };
1858 provider.canonical_in_memory_state.update_chain(chain);
1859
1860 let consistent_provider = provider.consistent_provider()?;
1861
1862 assert_eq!(
1863 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1864 database_changesets
1865 .into_iter()
1866 .next_back()
1867 .unwrap()
1868 .into_iter()
1869 .sorted_by_key(|(address, _, _)| *address)
1870 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1871 .collect::<Vec<_>>()
1872 );
1873 assert_eq!(
1874 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1875 in_memory_changesets
1876 .into_iter()
1877 .sorted_by_key(|(address, _, _)| *address)
1878 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1879 .collect::<Vec<_>>()
1880 );
1881
1882 Ok(())
1883 }
1884}