1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider, WithdrawalsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber, BlockId, BlockNumHash,
12 BlockNumberOrTag, HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::{ChainInfo, EthereumHardforks};
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives::{Account, RecoveredBlock, SealedBlock, SealedHeader, StorageEntry};
24use reth_primitives_traits::BlockBody;
25use reth_prune_types::{PruneCheckpoint, PruneSegment};
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_storage_api::{
28 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, OmmersProvider,
29 StateProvider, StorageChangeSetReader,
30};
31use reth_storage_errors::provider::ProviderResult;
32use revm_database::states::PlainStorageRevert;
33use std::{
34 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
35 sync::Arc,
36};
37use tracing::trace;
38
39#[derive(Debug)]
46#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
48 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
50 head_block: Option<Arc<BlockState<N::Primitives>>>,
52 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
54}
55
56impl<N: ProviderNodeTypes> ConsistentProvider<N> {
57 pub fn new(
63 storage_provider_factory: ProviderFactory<N>,
64 state: CanonicalInMemoryState<N::Primitives>,
65 ) -> ProviderResult<Self> {
66 let head_block = state.head_state();
74 let storage_provider = storage_provider_factory.database_provider_ro()?;
75 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
76 }
77
78 fn convert_range_bounds<T>(
80 &self,
81 range: impl RangeBounds<T>,
82 end_unbounded: impl FnOnce() -> T,
83 ) -> (T, T)
84 where
85 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
86 {
87 let start = match range.start_bound() {
88 Bound::Included(&n) => n,
89 Bound::Excluded(&n) => n + T::from(1u8),
90 Bound::Unbounded => T::from(0u8),
91 };
92
93 let end = match range.end_bound() {
94 Bound::Included(&n) => n,
95 Bound::Excluded(&n) => n - T::from(1u8),
96 Bound::Unbounded => end_unbounded(),
97 };
98
99 (start, end)
100 }
101
102 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
104 trace!(target: "providers::blockchain", "Getting latest block state provider");
105
106 if let Some(state) = &self.head_block {
108 trace!(target: "providers::blockchain", "Using head state for latest state provider");
109 Ok(self.block_state_provider_ref(state)?.boxed())
110 } else {
111 trace!(target: "providers::blockchain", "Using database state for latest state provider");
112 Ok(self.storage_provider.latest())
113 }
114 }
115
116 fn history_by_block_hash_ref<'a>(
117 &'a self,
118 block_hash: BlockHash,
119 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
120 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
121
122 self.get_in_memory_or_storage_by_block(
123 block_hash.into(),
124 |_| self.storage_provider.history_by_block_hash(block_hash),
125 |block_state| {
126 let state_provider = self.block_state_provider_ref(block_state)?;
127 Ok(Box::new(state_provider))
128 },
129 )
130 }
131
132 fn state_by_block_number_ref<'a>(
134 &'a self,
135 number: BlockNumber,
136 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
137 let hash =
138 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
139 self.history_by_block_hash_ref(hash)
140 }
141
142 pub fn get_state(
146 &self,
147 range: RangeInclusive<BlockNumber>,
148 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
149 if range.is_empty() {
150 return Ok(None)
151 }
152 let start_block_number = *range.start();
153 let end_block_number = *range.end();
154
155 let mut block_bodies = Vec::new();
157 for block_num in range.clone() {
158 let block_body = self
159 .block_body_indices(block_num)?
160 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
161 block_bodies.push((block_num, block_body))
162 }
163
164 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
166 else {
167 return Ok(None)
168 };
169 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
170 return Ok(None)
171 };
172
173 let mut account_changeset = Vec::new();
174 for block_num in range.clone() {
175 let changeset =
176 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
177 account_changeset.extend(changeset);
178 }
179
180 let mut storage_changeset = Vec::new();
181 for block_num in range {
182 let changeset = self.storage_changeset(block_num)?;
183 storage_changeset.extend(changeset);
184 }
185
186 let (state, reverts) =
187 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
188
189 let mut receipt_iter =
190 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
191
192 let mut receipts = Vec::with_capacity(block_bodies.len());
193 for (_, block_body) in block_bodies {
195 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
196 for tx_num in block_body.tx_num_range() {
197 let receipt = receipt_iter
198 .next()
199 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
200 block_receipts.push(receipt);
201 }
202 receipts.push(block_receipts);
203 }
204
205 Ok(Some(ExecutionOutcome::new_init(
206 state,
207 reverts,
208 Vec::new(),
210 receipts,
211 start_block_number,
212 Vec::new(),
213 )))
214 }
215
216 fn populate_bundle_state(
220 &self,
221 account_changeset: Vec<(u64, AccountBeforeTx)>,
222 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
223 block_range_end: BlockNumber,
224 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
225 let mut state: BundleStateInit = HashMap::default();
226 let mut reverts: RevertsInit = HashMap::default();
227 let state_provider = self.state_by_block_number_ref(block_range_end)?;
228
229 for (block_number, account_before) in account_changeset.into_iter().rev() {
231 let AccountBeforeTx { info: old_info, address } = account_before;
232 match state.entry(address) {
233 hash_map::Entry::Vacant(entry) => {
234 let new_info = state_provider.basic_account(&address)?;
235 entry.insert((old_info, new_info, HashMap::default()));
236 }
237 hash_map::Entry::Occupied(mut entry) => {
238 entry.get_mut().0 = old_info;
240 }
241 }
242 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
244 }
245
246 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
248 let BlockNumberAddress((block_number, address)) = block_and_address;
249 let account_state = match state.entry(address) {
251 hash_map::Entry::Vacant(entry) => {
252 let present_info = state_provider.basic_account(&address)?;
253 entry.insert((present_info, present_info, HashMap::default()))
254 }
255 hash_map::Entry::Occupied(entry) => entry.into_mut(),
256 };
257
258 match account_state.2.entry(old_storage.key) {
260 hash_map::Entry::Vacant(entry) => {
261 let new_storage_value =
262 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
263 entry.insert((old_storage.value, new_storage_value));
264 }
265 hash_map::Entry::Occupied(mut entry) => {
266 entry.get_mut().0 = old_storage.value;
267 }
268 };
269
270 reverts
271 .entry(block_number)
272 .or_default()
273 .entry(address)
274 .or_default()
275 .1
276 .push(old_storage);
277 }
278
279 Ok((state, reverts))
280 }
281
282 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
294 &self,
295 range: impl RangeBounds<BlockNumber>,
296 fetch_db_range: F,
297 map_block_state_item: G,
298 mut predicate: P,
299 ) -> ProviderResult<Vec<T>>
300 where
301 F: FnOnce(
302 &DatabaseProviderRO<N::DB, N>,
303 RangeInclusive<BlockNumber>,
304 &mut P,
305 ) -> ProviderResult<Vec<T>>,
306 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
307 P: FnMut(&T) -> bool,
308 {
309 let mut in_memory_chain =
317 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
318 let db_provider = &self.storage_provider;
319
320 let (start, end) = self.convert_range_bounds(range, || {
321 in_memory_chain
323 .first()
324 .map(|b| b.number())
325 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
326 });
327
328 if start > end {
329 return Ok(vec![])
330 }
331
332 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
337 Some(lowest_memory_block) if lowest_memory_block <= end => {
338 let highest_memory_block =
339 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
340
341 let in_memory_range =
345 lowest_memory_block.max(start)..=end.min(highest_memory_block);
346
347 in_memory_chain.truncate(
350 in_memory_chain
351 .len()
352 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
353 );
354
355 let storage_range =
356 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
357
358 (Some((in_memory_chain, in_memory_range)), storage_range)
359 }
360 _ => {
361 drop(in_memory_chain);
363
364 (None, Some(start..=end))
365 }
366 };
367
368 let mut items = Vec::with_capacity((end - start + 1) as usize);
369
370 if let Some(storage_range) = storage_range {
371 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
372 items.append(&mut db_items);
373
374 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
377 return Ok(items)
378 }
379 }
380
381 if let Some((in_memory_chain, in_memory_range)) = in_memory {
382 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
383 debug_assert!(num == block.number());
384 if let Some(item) = map_block_state_item(block, &mut predicate) {
385 items.push(item);
386 } else {
387 break
388 }
389 }
390 }
391
392 Ok(items)
393 }
394
395 fn block_state_provider_ref(
397 &self,
398 state: &BlockState<N::Primitives>,
399 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
400 let anchor_hash = state.anchor().hash;
401 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
402 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
403 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
404 }
405
406 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
412 &self,
413 range: impl RangeBounds<BlockNumber>,
414 fetch_from_db: S,
415 fetch_from_block_state: M,
416 ) -> ProviderResult<Vec<R>>
417 where
418 S: FnOnce(
419 &DatabaseProviderRO<N::DB, N>,
420 RangeInclusive<TxNumber>,
421 ) -> ProviderResult<Vec<R>>,
422 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
423 {
424 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
425 let provider = &self.storage_provider;
426
427 let last_database_block_number = in_mem_chain
430 .last()
431 .map(|b| Ok(b.anchor().number))
432 .unwrap_or_else(|| provider.last_block_number())?;
433
434 let last_block_body_index = provider
437 .block_body_indices(last_database_block_number)?
438 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
439 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
440
441 let (start, end) = self.convert_range_bounds(range, || {
442 in_mem_chain
443 .iter()
444 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
445 .sum::<u64>() +
446 last_block_body_index.last_tx_num()
447 });
448
449 if start > end {
450 return Ok(vec![])
451 }
452
453 let mut tx_range = start..=end;
454
455 if *tx_range.end() < in_memory_tx_num {
458 return fetch_from_db(provider, tx_range);
459 }
460
461 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
462
463 if *tx_range.start() < in_memory_tx_num {
465 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
467
468 tx_range = in_memory_tx_num..=*tx_range.end();
470
471 items.extend(fetch_from_db(provider, db_range)?);
472 }
473
474 for block_state in in_mem_chain.iter().rev() {
476 let block_tx_count =
477 block_state.block_ref().recovered_block().body().transactions().len();
478 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
479
480 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
483 in_memory_tx_num += block_tx_count as u64;
484 continue
485 }
486
487 let skip = (tx_range.start() - in_memory_tx_num) as usize;
489
490 items.extend(fetch_from_block_state(
491 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
492 block_state,
493 )?);
494
495 in_memory_tx_num += block_tx_count as u64;
496
497 if in_memory_tx_num > *tx_range.end() {
499 break
500 }
501
502 tx_range = in_memory_tx_num..=*tx_range.end();
504 }
505
506 Ok(items)
507 }
508
509 fn get_in_memory_or_storage_by_tx<S, M, R>(
512 &self,
513 id: HashOrNumber,
514 fetch_from_db: S,
515 fetch_from_block_state: M,
516 ) -> ProviderResult<Option<R>>
517 where
518 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
519 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
520 {
521 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
522 let provider = &self.storage_provider;
523
524 let last_database_block_number = in_mem_chain
527 .last()
528 .map(|b| Ok(b.anchor().number))
529 .unwrap_or_else(|| provider.last_block_number())?;
530
531 let last_block_body_index = provider
534 .block_body_indices(last_database_block_number)?
535 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
536 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
537
538 if let HashOrNumber::Number(id) = id {
541 if id < in_memory_tx_num {
542 return fetch_from_db(provider)
543 }
544 }
545
546 for block_state in in_mem_chain.iter().rev() {
548 let executed_block = block_state.block_ref();
549 let block = executed_block.recovered_block();
550
551 for tx_index in 0..block.body().transactions().len() {
552 match id {
553 HashOrNumber::Hash(tx_hash) => {
554 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
555 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
556 }
557 }
558 HashOrNumber::Number(id) => {
559 if id == in_memory_tx_num {
560 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
561 }
562 }
563 }
564
565 in_memory_tx_num += 1;
566 }
567 }
568
569 if let HashOrNumber::Hash(_) = id {
571 return fetch_from_db(provider)
572 }
573
574 Ok(None)
575 }
576
577 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
579 &self,
580 id: BlockHashOrNumber,
581 fetch_from_db: S,
582 fetch_from_block_state: M,
583 ) -> ProviderResult<R>
584 where
585 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
586 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
587 {
588 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
589 return fetch_from_block_state(block_state)
590 }
591 fetch_from_db(&self.storage_provider)
592 }
593}
594
595impl<N: ProviderNodeTypes> ConsistentProvider<N> {
596 #[inline]
605 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
606 let latest = self.best_block_number()?;
607 if block_number > latest {
608 Err(ProviderError::HeaderNotFound(block_number.into()))
609 } else {
610 Ok(())
611 }
612 }
613}
614
615impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
616 type Primitives = N::Primitives;
617}
618
619impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
620 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
621 self.storage_provider.static_file_provider()
622 }
623}
624
625impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
626 type Header = HeaderTy<N>;
627
628 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
629 self.get_in_memory_or_storage_by_block(
630 (*block_hash).into(),
631 |db_provider| db_provider.header(block_hash),
632 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
633 )
634 }
635
636 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
637 self.get_in_memory_or_storage_by_block(
638 num.into(),
639 |db_provider| db_provider.header_by_number(num),
640 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
641 )
642 }
643
644 fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
645 if let Some(num) = self.block_number(*hash)? {
646 self.header_td_by_number(num)
647 } else {
648 Ok(None)
649 }
650 }
651
652 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
653 let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
654 {
655 if let Some(last_finalized_num_hash) =
662 self.canonical_in_memory_state.get_finalized_num_hash()
663 {
664 last_finalized_num_hash.number
665 } else {
666 self.last_block_number()?
667 }
668 } else {
669 number
671 };
672 self.storage_provider.header_td_by_number(number)
673 }
674
675 fn headers_range(
676 &self,
677 range: impl RangeBounds<BlockNumber>,
678 ) -> ProviderResult<Vec<Self::Header>> {
679 self.get_in_memory_or_storage_by_block_range_while(
680 range,
681 |db_provider, range, _| db_provider.headers_range(range),
682 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
683 |_| true,
684 )
685 }
686
687 fn sealed_header(
688 &self,
689 number: BlockNumber,
690 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
691 self.get_in_memory_or_storage_by_block(
692 number.into(),
693 |db_provider| db_provider.sealed_header(number),
694 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
695 )
696 }
697
698 fn sealed_headers_range(
699 &self,
700 range: impl RangeBounds<BlockNumber>,
701 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
702 self.get_in_memory_or_storage_by_block_range_while(
703 range,
704 |db_provider, range, _| db_provider.sealed_headers_range(range),
705 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
706 |_| true,
707 )
708 }
709
710 fn sealed_headers_while(
711 &self,
712 range: impl RangeBounds<BlockNumber>,
713 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
714 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
715 self.get_in_memory_or_storage_by_block_range_while(
716 range,
717 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
718 |block_state, predicate| {
719 let header = block_state.block_ref().recovered_block().sealed_header();
720 predicate(header).then(|| header.clone())
721 },
722 predicate,
723 )
724 }
725}
726
727impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
728 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
729 self.get_in_memory_or_storage_by_block(
730 number.into(),
731 |db_provider| db_provider.block_hash(number),
732 |block_state| Ok(Some(block_state.hash())),
733 )
734 }
735
736 fn canonical_hashes_range(
737 &self,
738 start: BlockNumber,
739 end: BlockNumber,
740 ) -> ProviderResult<Vec<B256>> {
741 self.get_in_memory_or_storage_by_block_range_while(
742 start..end,
743 |db_provider, inclusive_range, _| {
744 db_provider
745 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
746 },
747 |block_state, _| Some(block_state.hash()),
748 |_| true,
749 )
750 }
751}
752
753impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
754 fn chain_info(&self) -> ProviderResult<ChainInfo> {
755 let best_number = self.best_block_number()?;
756 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
757 }
758
759 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
760 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
761 }
762
763 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
764 self.storage_provider.last_block_number()
765 }
766
767 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
768 self.get_in_memory_or_storage_by_block(
769 hash.into(),
770 |db_provider| db_provider.block_number(hash),
771 |block_state| Ok(Some(block_state.number())),
772 )
773 }
774}
775
776impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
777 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
778 Ok(self.canonical_in_memory_state.pending_block_num_hash())
779 }
780
781 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
782 Ok(self.canonical_in_memory_state.get_safe_num_hash())
783 }
784
785 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
786 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
787 }
788}
789
790impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
791 type Block = BlockTy<N>;
792
793 fn find_block_by_hash(
794 &self,
795 hash: B256,
796 source: BlockSource,
797 ) -> ProviderResult<Option<Self::Block>> {
798 if matches!(source, BlockSource::Canonical | BlockSource::Any) {
799 if let Some(block) = self.get_in_memory_or_storage_by_block(
800 hash.into(),
801 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
802 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
803 )? {
804 return Ok(Some(block))
805 }
806 }
807
808 if matches!(source, BlockSource::Pending | BlockSource::Any) {
809 return Ok(self
810 .canonical_in_memory_state
811 .pending_block()
812 .filter(|b| b.hash() == hash)
813 .map(|b| b.into_block()))
814 }
815
816 Ok(None)
817 }
818
819 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
820 self.get_in_memory_or_storage_by_block(
821 id,
822 |db_provider| db_provider.block(id),
823 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
824 )
825 }
826
827 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
828 Ok(self.canonical_in_memory_state.pending_block())
829 }
830
831 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
832 Ok(self.canonical_in_memory_state.pending_recovered_block())
833 }
834
835 fn pending_block_and_receipts(
836 &self,
837 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
838 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
839 }
840
841 fn recovered_block(
848 &self,
849 id: BlockHashOrNumber,
850 transaction_kind: TransactionVariant,
851 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
852 self.get_in_memory_or_storage_by_block(
853 id,
854 |db_provider| db_provider.recovered_block(id, transaction_kind),
855 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
856 )
857 }
858
859 fn sealed_block_with_senders(
860 &self,
861 id: BlockHashOrNumber,
862 transaction_kind: TransactionVariant,
863 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
864 self.get_in_memory_or_storage_by_block(
865 id,
866 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
867 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
868 )
869 }
870
871 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
872 self.get_in_memory_or_storage_by_block_range_while(
873 range,
874 |db_provider, range, _| db_provider.block_range(range),
875 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
876 |_| true,
877 )
878 }
879
880 fn block_with_senders_range(
881 &self,
882 range: RangeInclusive<BlockNumber>,
883 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
884 self.get_in_memory_or_storage_by_block_range_while(
885 range,
886 |db_provider, range, _| db_provider.block_with_senders_range(range),
887 |block_state, _| Some(block_state.block().recovered_block().clone()),
888 |_| true,
889 )
890 }
891
892 fn recovered_block_range(
893 &self,
894 range: RangeInclusive<BlockNumber>,
895 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
896 self.get_in_memory_or_storage_by_block_range_while(
897 range,
898 |db_provider, range, _| db_provider.recovered_block_range(range),
899 |block_state, _| Some(block_state.block().recovered_block().clone()),
900 |_| true,
901 )
902 }
903}
904
905impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
906 type Transaction = TxTy<N>;
907
908 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
909 self.get_in_memory_or_storage_by_tx(
910 tx_hash.into(),
911 |db_provider| db_provider.transaction_id(tx_hash),
912 |_, tx_number, _| Ok(Some(tx_number)),
913 )
914 }
915
916 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
917 self.get_in_memory_or_storage_by_tx(
918 id.into(),
919 |provider| provider.transaction_by_id(id),
920 |tx_index, _, block_state| {
921 Ok(block_state
922 .block_ref()
923 .recovered_block()
924 .body()
925 .transactions()
926 .get(tx_index)
927 .cloned())
928 },
929 )
930 }
931
932 fn transaction_by_id_unhashed(
933 &self,
934 id: TxNumber,
935 ) -> ProviderResult<Option<Self::Transaction>> {
936 self.get_in_memory_or_storage_by_tx(
937 id.into(),
938 |provider| provider.transaction_by_id_unhashed(id),
939 |tx_index, _, block_state| {
940 Ok(block_state
941 .block_ref()
942 .recovered_block()
943 .body()
944 .transactions()
945 .get(tx_index)
946 .cloned())
947 },
948 )
949 }
950
951 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
952 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
953 return Ok(Some(tx))
954 }
955
956 self.storage_provider.transaction_by_hash(hash)
957 }
958
959 fn transaction_by_hash_with_meta(
960 &self,
961 tx_hash: TxHash,
962 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
963 if let Some((tx, meta)) =
964 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
965 {
966 return Ok(Some((tx, meta)))
967 }
968
969 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
970 }
971
972 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
973 self.get_in_memory_or_storage_by_tx(
974 id.into(),
975 |provider| provider.transaction_block(id),
976 |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
977 )
978 }
979
980 fn transactions_by_block(
981 &self,
982 id: BlockHashOrNumber,
983 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
984 self.get_in_memory_or_storage_by_block(
985 id,
986 |provider| provider.transactions_by_block(id),
987 |block_state| {
988 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
989 },
990 )
991 }
992
993 fn transactions_by_block_range(
994 &self,
995 range: impl RangeBounds<BlockNumber>,
996 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
997 self.get_in_memory_or_storage_by_block_range_while(
998 range,
999 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1000 |block_state, _| {
1001 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1002 },
1003 |_| true,
1004 )
1005 }
1006
1007 fn transactions_by_tx_range(
1008 &self,
1009 range: impl RangeBounds<TxNumber>,
1010 ) -> ProviderResult<Vec<Self::Transaction>> {
1011 self.get_in_memory_or_storage_by_tx_range(
1012 range,
1013 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1014 |index_range, block_state| {
1015 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1016 .to_vec())
1017 },
1018 )
1019 }
1020
1021 fn senders_by_tx_range(
1022 &self,
1023 range: impl RangeBounds<TxNumber>,
1024 ) -> ProviderResult<Vec<Address>> {
1025 self.get_in_memory_or_storage_by_tx_range(
1026 range,
1027 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1028 |index_range, block_state| {
1029 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1030 },
1031 )
1032 }
1033
1034 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1035 self.get_in_memory_or_storage_by_tx(
1036 id.into(),
1037 |provider| provider.transaction_sender(id),
1038 |tx_index, _, block_state| {
1039 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1040 },
1041 )
1042 }
1043}
1044
1045impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1046 type Receipt = ReceiptTy<N>;
1047
1048 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1049 self.get_in_memory_or_storage_by_tx(
1050 id.into(),
1051 |provider| provider.receipt(id),
1052 |tx_index, _, block_state| {
1053 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1054 },
1055 )
1056 }
1057
1058 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1059 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1060 let executed_block = block_state.block_ref();
1061 let block = executed_block.recovered_block();
1062 let receipts = block_state.executed_block_receipts();
1063
1064 debug_assert_eq!(
1066 block.body().transactions().len(),
1067 receipts.len(),
1068 "Mismatch between transaction and receipt count"
1069 );
1070
1071 if let Some(tx_index) =
1072 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1073 {
1074 return Ok(receipts.get(tx_index).cloned());
1076 }
1077 }
1078
1079 self.storage_provider.receipt_by_hash(hash)
1080 }
1081
1082 fn receipts_by_block(
1083 &self,
1084 block: BlockHashOrNumber,
1085 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1086 self.get_in_memory_or_storage_by_block(
1087 block,
1088 |db_provider| db_provider.receipts_by_block(block),
1089 |block_state| Ok(Some(block_state.executed_block_receipts())),
1090 )
1091 }
1092
1093 fn receipts_by_tx_range(
1094 &self,
1095 range: impl RangeBounds<TxNumber>,
1096 ) -> ProviderResult<Vec<Self::Receipt>> {
1097 self.get_in_memory_or_storage_by_tx_range(
1098 range,
1099 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1100 |index_range, block_state| {
1101 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1102 },
1103 )
1104 }
1105}
1106
1107impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1108 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1109 match block {
1110 BlockId::Hash(rpc_block_hash) => {
1111 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1112 if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1113 if let Some(state) = self
1114 .head_block
1115 .as_ref()
1116 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1117 {
1118 receipts = Some(state.executed_block_receipts());
1119 }
1120 }
1121 Ok(receipts)
1122 }
1123 BlockId::Number(num_tag) => match num_tag {
1124 BlockNumberOrTag::Pending => Ok(self
1125 .canonical_in_memory_state
1126 .pending_state()
1127 .map(|block_state| block_state.executed_block_receipts())),
1128 _ => {
1129 if let Some(num) = self.convert_block_number(num_tag)? {
1130 self.receipts_by_block(num.into())
1131 } else {
1132 Ok(None)
1133 }
1134 }
1135 },
1136 }
1137 }
1138}
1139
1140impl<N: ProviderNodeTypes> WithdrawalsProvider for ConsistentProvider<N> {
1141 fn withdrawals_by_block(
1142 &self,
1143 id: BlockHashOrNumber,
1144 timestamp: u64,
1145 ) -> ProviderResult<Option<Withdrawals>> {
1146 if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) {
1147 return Ok(None)
1148 }
1149
1150 self.get_in_memory_or_storage_by_block(
1151 id,
1152 |db_provider| db_provider.withdrawals_by_block(id, timestamp),
1153 |block_state| {
1154 Ok(block_state.block_ref().recovered_block().body().withdrawals().cloned())
1155 },
1156 )
1157 }
1158}
1159
1160impl<N: ProviderNodeTypes> OmmersProvider for ConsistentProvider<N> {
1161 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1162 self.get_in_memory_or_storage_by_block(
1163 id,
1164 |db_provider| db_provider.ommers(id),
1165 |block_state| {
1166 if self.chain_spec().is_paris_active_at_block(block_state.number()) {
1167 return Ok(Some(Vec::new()))
1168 }
1169
1170 Ok(block_state.block_ref().recovered_block().body().ommers().map(|o| o.to_vec()))
1171 },
1172 )
1173 }
1174}
1175
1176impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1177 fn block_body_indices(
1178 &self,
1179 number: BlockNumber,
1180 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1181 self.get_in_memory_or_storage_by_block(
1182 number.into(),
1183 |db_provider| db_provider.block_body_indices(number),
1184 |block_state| {
1185 let last_storage_block_number = block_state.anchor().number;
1187 let mut stored_indices = self
1188 .storage_provider
1189 .block_body_indices(last_storage_block_number)?
1190 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1191
1192 stored_indices.first_tx_num = stored_indices.next_tx_num();
1194 stored_indices.tx_count = 0;
1195
1196 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1198 let block_tx_count =
1199 state.block_ref().recovered_block().body().transactions().len() as u64;
1200 if state.block_ref().recovered_block().number() == number {
1201 stored_indices.tx_count = block_tx_count;
1202 } else {
1203 stored_indices.first_tx_num += block_tx_count;
1204 }
1205 }
1206
1207 Ok(Some(stored_indices))
1208 },
1209 )
1210 }
1211
1212 fn block_body_indices_range(
1213 &self,
1214 range: RangeInclusive<BlockNumber>,
1215 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1216 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1217 }
1218}
1219
1220impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1221 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1222 self.storage_provider.get_stage_checkpoint(id)
1223 }
1224
1225 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1226 self.storage_provider.get_stage_checkpoint_progress(id)
1227 }
1228
1229 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1230 self.storage_provider.get_all_checkpoints()
1231 }
1232}
1233
1234impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1235 fn get_prune_checkpoint(
1236 &self,
1237 segment: PruneSegment,
1238 ) -> ProviderResult<Option<PruneCheckpoint>> {
1239 self.storage_provider.get_prune_checkpoint(segment)
1240 }
1241
1242 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1243 self.storage_provider.get_prune_checkpoints()
1244 }
1245}
1246
1247impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1248 type ChainSpec = N::ChainSpec;
1249
1250 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1251 ChainSpecProvider::chain_spec(&self.storage_provider)
1252 }
1253}
1254
1255impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1256 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1257 match id {
1258 BlockId::Number(num) => self.block_by_number_or_tag(num),
1259 BlockId::Hash(hash) => {
1260 if Some(true) == hash.require_canonical {
1265 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1267 } else {
1268 self.block_by_hash(hash.block_hash)
1269 }
1270 }
1271 }
1272 }
1273
1274 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1275 Ok(match id {
1276 BlockNumberOrTag::Latest => {
1277 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1278 }
1279 BlockNumberOrTag::Finalized => {
1280 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1281 }
1282 BlockNumberOrTag::Safe => {
1283 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1284 }
1285 BlockNumberOrTag::Earliest => self.header_by_number(0)?,
1286 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1287
1288 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1289 })
1290 }
1291
1292 fn sealed_header_by_number_or_tag(
1293 &self,
1294 id: BlockNumberOrTag,
1295 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1296 match id {
1297 BlockNumberOrTag::Latest => {
1298 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1299 }
1300 BlockNumberOrTag::Finalized => {
1301 Ok(self.canonical_in_memory_state.get_finalized_header())
1302 }
1303 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1304 BlockNumberOrTag::Earliest => self
1305 .header_by_number(0)?
1306 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1307 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1308 BlockNumberOrTag::Number(num) => self
1309 .header_by_number(num)?
1310 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1311 }
1312 }
1313
1314 fn sealed_header_by_id(
1315 &self,
1316 id: BlockId,
1317 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1318 Ok(match id {
1319 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1320 BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1321 })
1322 }
1323
1324 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1325 Ok(match id {
1326 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1327 BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1328 })
1329 }
1330
1331 fn ommers_by_id(&self, id: BlockId) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1332 match id {
1333 BlockId::Number(num) => self.ommers_by_number_or_tag(num),
1334 BlockId::Hash(hash) => {
1335 self.ommers(BlockHashOrNumber::Hash(hash.block_hash))
1338 }
1339 }
1340 }
1341}
1342
1343impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1344 fn storage_changeset(
1345 &self,
1346 block_number: BlockNumber,
1347 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1348 if let Some(state) =
1349 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1350 {
1351 let changesets = state
1352 .block()
1353 .execution_output
1354 .bundle
1355 .reverts
1356 .clone()
1357 .to_plain_state_reverts()
1358 .storage
1359 .into_iter()
1360 .flatten()
1361 .flat_map(|revert: PlainStorageRevert| {
1362 revert.storage_revert.into_iter().map(move |(key, value)| {
1363 (
1364 BlockNumberAddress((block_number, revert.address)),
1365 StorageEntry { key: key.into(), value: value.to_previous_value() },
1366 )
1367 })
1368 })
1369 .collect();
1370 Ok(changesets)
1371 } else {
1372 let storage_history_exists = self
1376 .storage_provider
1377 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1378 .and_then(|checkpoint| {
1379 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1384 })
1385 .unwrap_or(true);
1386
1387 if !storage_history_exists {
1388 return Err(ProviderError::StateAtBlockPruned(block_number))
1389 }
1390
1391 self.storage_provider.storage_changeset(block_number)
1392 }
1393 }
1394}
1395
1396impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1397 fn account_block_changeset(
1398 &self,
1399 block_number: BlockNumber,
1400 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1401 if let Some(state) =
1402 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1403 {
1404 let changesets = state
1405 .block_ref()
1406 .execution_output
1407 .bundle
1408 .reverts
1409 .clone()
1410 .to_plain_state_reverts()
1411 .accounts
1412 .into_iter()
1413 .flatten()
1414 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1415 .collect();
1416 Ok(changesets)
1417 } else {
1418 let account_history_exists = self
1422 .storage_provider
1423 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1424 .and_then(|checkpoint| {
1425 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1430 })
1431 .unwrap_or(true);
1432
1433 if !account_history_exists {
1434 return Err(ProviderError::StateAtBlockPruned(block_number))
1435 }
1436
1437 self.storage_provider.account_block_changeset(block_number)
1438 }
1439 }
1440}
1441
1442impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1443 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1445 let state_provider = self.latest_ref()?;
1447 state_provider.basic_account(address)
1448 }
1449}
1450
1451impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1452 type Receipt = ReceiptTy<N>;
1453
1454 fn get_state(
1464 &self,
1465 block: BlockNumber,
1466 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1467 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1468 let state = state.block_ref().execution_outcome().clone();
1469 Ok(Some(state))
1470 } else {
1471 Self::get_state(self, block..=block)
1472 }
1473 }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478 use crate::{
1479 providers::blockchain_provider::BlockchainProvider,
1480 test_utils::create_test_provider_factory, BlockWriter,
1481 };
1482 use alloy_eips::BlockHashOrNumber;
1483 use alloy_primitives::B256;
1484 use itertools::Itertools;
1485 use rand::Rng;
1486 use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, NewCanonicalChain};
1487 use reth_db_api::models::AccountBeforeTx;
1488 use reth_execution_types::ExecutionOutcome;
1489 use reth_primitives::{RecoveredBlock, SealedBlock};
1490 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1491 use reth_testing_utils::generators::{
1492 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1493 };
1494 use revm_database::BundleState;
1495 use std::{
1496 ops::{Bound, Range, RangeBounds},
1497 sync::Arc,
1498 };
1499
1500 const TEST_BLOCKS_COUNT: usize = 5;
1501
1502 fn random_blocks(
1503 rng: &mut impl Rng,
1504 database_blocks: usize,
1505 in_memory_blocks: usize,
1506 requests_count: Option<Range<u8>>,
1507 withdrawals_count: Option<Range<u8>>,
1508 tx_count: impl RangeBounds<u8>,
1509 ) -> (Vec<SealedBlock>, Vec<SealedBlock>) {
1510 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1511
1512 let tx_start = match tx_count.start_bound() {
1513 Bound::Included(&n) | Bound::Excluded(&n) => n,
1514 Bound::Unbounded => u8::MIN,
1515 };
1516 let tx_end = match tx_count.end_bound() {
1517 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1518 Bound::Unbounded => u8::MAX,
1519 };
1520
1521 let blocks = random_block_range(
1522 rng,
1523 0..=block_range,
1524 BlockRangeParams {
1525 parent: Some(B256::ZERO),
1526 tx_count: tx_start..tx_end,
1527 requests_count,
1528 withdrawals_count,
1529 },
1530 );
1531 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1532 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1533 }
1534
1535 #[test]
1536 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1537 let mut rng = generators::rng();
1539 let factory = create_test_provider_factory();
1540
1541 let blocks = random_block_range(
1543 &mut rng,
1544 0..=10,
1545 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1546 );
1547 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1548
1549 let provider_rw = factory.provider_rw()?;
1551 for block in database_blocks {
1552 provider_rw.insert_historical_block(
1553 block.clone().try_recover().expect("failed to seal block with senders"),
1554 )?;
1555 }
1556 provider_rw.commit()?;
1557
1558 let provider = BlockchainProvider::new(factory)?;
1560 let consistent_provider = provider.consistent_provider()?;
1561
1562 let first_db_block = database_blocks.first().unwrap();
1564 let first_in_mem_block = in_memory_blocks.first().unwrap();
1565 let last_in_mem_block = in_memory_blocks.last().unwrap();
1566
1567 assert_eq!(
1569 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1570 None
1571 );
1572 assert_eq!(
1573 consistent_provider
1574 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1575 None
1576 );
1577 assert_eq!(
1579 consistent_provider
1580 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1581 None
1582 );
1583
1584 let in_memory_block_senders =
1586 first_in_mem_block.senders().expect("failed to recover senders");
1587 let chain = NewCanonicalChain::Commit {
1588 new: vec![ExecutedBlockWithTrieUpdates::new(
1589 Arc::new(RecoveredBlock::new_sealed(
1590 first_in_mem_block.clone(),
1591 in_memory_block_senders,
1592 )),
1593 Default::default(),
1594 Default::default(),
1595 Default::default(),
1596 )],
1597 };
1598 consistent_provider.canonical_in_memory_state.update_chain(chain);
1599 let consistent_provider = provider.consistent_provider()?;
1600
1601 assert_eq!(
1603 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1604 Some(first_in_mem_block.clone().into_block())
1605 );
1606 assert_eq!(
1607 consistent_provider
1608 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1609 Some(first_in_mem_block.clone().into_block())
1610 );
1611
1612 assert_eq!(
1614 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1615 Some(first_db_block.clone().into_block())
1616 );
1617 assert_eq!(
1618 consistent_provider
1619 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1620 Some(first_db_block.clone().into_block())
1621 );
1622
1623 assert_eq!(
1625 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1626 None
1627 );
1628
1629 provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1631 block: ExecutedBlock {
1632 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1633 last_in_mem_block.clone(),
1634 Default::default(),
1635 )),
1636 execution_output: Default::default(),
1637 hashed_state: Default::default(),
1638 },
1639 trie: Default::default(),
1640 });
1641
1642 assert_eq!(
1644 consistent_provider
1645 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1646 Some(last_in_mem_block.clone_block())
1647 );
1648
1649 Ok(())
1650 }
1651
1652 #[test]
1653 fn test_block_reader_block() -> eyre::Result<()> {
1654 let mut rng = generators::rng();
1656 let factory = create_test_provider_factory();
1657
1658 let blocks = random_block_range(
1660 &mut rng,
1661 0..=10,
1662 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1663 );
1664 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1665
1666 let provider_rw = factory.provider_rw()?;
1668 for block in database_blocks {
1669 provider_rw.insert_historical_block(
1670 block.clone().try_recover().expect("failed to seal block with senders"),
1671 )?;
1672 }
1673 provider_rw.commit()?;
1674
1675 let provider = BlockchainProvider::new(factory)?;
1677 let consistent_provider = provider.consistent_provider()?;
1678
1679 let first_in_mem_block = in_memory_blocks.first().unwrap();
1681 let first_db_block = database_blocks.first().unwrap();
1683
1684 assert_eq!(
1686 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1687 None
1688 );
1689 assert_eq!(
1690 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1691 None
1692 );
1693
1694 let in_memory_block_senders =
1696 first_in_mem_block.senders().expect("failed to recover senders");
1697 let chain = NewCanonicalChain::Commit {
1698 new: vec![ExecutedBlockWithTrieUpdates::new(
1699 Arc::new(RecoveredBlock::new_sealed(
1700 first_in_mem_block.clone(),
1701 in_memory_block_senders,
1702 )),
1703 Default::default(),
1704 Default::default(),
1705 Default::default(),
1706 )],
1707 };
1708 consistent_provider.canonical_in_memory_state.update_chain(chain);
1709
1710 let consistent_provider = provider.consistent_provider()?;
1711
1712 assert_eq!(
1714 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1715 Some(first_in_mem_block.clone().into_block())
1716 );
1717 assert_eq!(
1718 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1719 Some(first_in_mem_block.clone().into_block())
1720 );
1721
1722 assert_eq!(
1724 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1725 Some(first_db_block.clone().into_block())
1726 );
1727 assert_eq!(
1728 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1729 Some(first_db_block.clone().into_block())
1730 );
1731
1732 Ok(())
1733 }
1734
1735 #[test]
1736 fn test_changeset_reader() -> eyre::Result<()> {
1737 let mut rng = generators::rng();
1738
1739 let (database_blocks, in_memory_blocks) =
1740 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1741
1742 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1743 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1744 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1745
1746 let accounts = random_eoa_accounts(&mut rng, 2);
1747
1748 let (database_changesets, database_state) = random_changeset_range(
1749 &mut rng,
1750 &database_blocks,
1751 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1752 0..0,
1753 0..0,
1754 );
1755 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1756 &mut rng,
1757 &in_memory_blocks,
1758 database_state
1759 .iter()
1760 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1761 0..0,
1762 0..0,
1763 );
1764
1765 let factory = create_test_provider_factory();
1766
1767 let provider_rw = factory.provider_rw()?;
1768 provider_rw.append_blocks_with_state(
1769 database_blocks
1770 .into_iter()
1771 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1772 .collect(),
1773 &ExecutionOutcome {
1774 bundle: BundleState::new(
1775 database_state.into_iter().map(|(address, (account, _))| {
1776 (address, None, Some(account.into()), Default::default())
1777 }),
1778 database_changesets
1779 .iter()
1780 .map(|block_changesets| {
1781 block_changesets.iter().map(|(address, account, _)| {
1782 (*address, Some(Some((*account).into())), [])
1783 })
1784 })
1785 .collect::<Vec<_>>(),
1786 Vec::new(),
1787 ),
1788 first_block: first_database_block,
1789 ..Default::default()
1790 },
1791 Default::default(),
1792 Default::default(),
1793 )?;
1794 provider_rw.commit()?;
1795
1796 let provider = BlockchainProvider::new(factory)?;
1797
1798 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1799 let chain = NewCanonicalChain::Commit {
1800 new: vec![in_memory_blocks
1801 .first()
1802 .map(|block| {
1803 let senders = block.senders().expect("failed to recover senders");
1804 ExecutedBlockWithTrieUpdates::new(
1805 Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1806 Arc::new(ExecutionOutcome {
1807 bundle: BundleState::new(
1808 in_memory_state.into_iter().map(|(address, (account, _))| {
1809 (address, None, Some(account.into()), Default::default())
1810 }),
1811 [in_memory_changesets.iter().map(|(address, account, _)| {
1812 (*address, Some(Some((*account).into())), Vec::new())
1813 })],
1814 [],
1815 ),
1816 first_block: first_in_memory_block,
1817 ..Default::default()
1818 }),
1819 Default::default(),
1820 Default::default(),
1821 )
1822 })
1823 .unwrap()],
1824 };
1825 provider.canonical_in_memory_state.update_chain(chain);
1826
1827 let consistent_provider = provider.consistent_provider()?;
1828
1829 assert_eq!(
1830 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1831 database_changesets
1832 .into_iter()
1833 .next_back()
1834 .unwrap()
1835 .into_iter()
1836 .sorted_by_key(|(address, _, _)| *address)
1837 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1838 .collect::<Vec<_>>()
1839 );
1840 assert_eq!(
1841 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1842 in_memory_changesets
1843 .into_iter()
1844 .sorted_by_key(|(address, _, _)| *address)
1845 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1846 .collect::<Vec<_>>()
1847 );
1848
1849 Ok(())
1850 }
1851}