1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider, TrieReader,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12 HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
24use reth_prune_types::{PruneCheckpoint, PruneSegment};
25use reth_stages_types::{StageCheckpoint, StageId};
26use reth_storage_api::{
27 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
28 StorageChangeSetReader, TryIntoHistoricalStateProvider,
29};
30use reth_storage_errors::provider::ProviderResult;
31use reth_trie::updates::TrieUpdatesSorted;
32use revm_database::states::PlainStorageRevert;
33use std::{
34 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
35 sync::Arc,
36};
37use tracing::trace;
38
39#[derive(Debug)]
46#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
48 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
50 head_block: Option<Arc<BlockState<N::Primitives>>>,
52 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
54}
55
56impl<N: ProviderNodeTypes> ConsistentProvider<N> {
57 pub fn new(
63 storage_provider_factory: ProviderFactory<N>,
64 state: CanonicalInMemoryState<N::Primitives>,
65 ) -> ProviderResult<Self> {
66 let head_block = state.head_state();
74 let storage_provider = storage_provider_factory.database_provider_ro()?;
75 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
76 }
77
78 fn convert_range_bounds<T>(
80 &self,
81 range: impl RangeBounds<T>,
82 end_unbounded: impl FnOnce() -> T,
83 ) -> (T, T)
84 where
85 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
86 {
87 let start = match range.start_bound() {
88 Bound::Included(&n) => n,
89 Bound::Excluded(&n) => n + T::from(1u8),
90 Bound::Unbounded => T::from(0u8),
91 };
92
93 let end = match range.end_bound() {
94 Bound::Included(&n) => n,
95 Bound::Excluded(&n) => n - T::from(1u8),
96 Bound::Unbounded => end_unbounded(),
97 };
98
99 (start, end)
100 }
101
102 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
104 trace!(target: "providers::blockchain", "Getting latest block state provider");
105
106 if let Some(state) = &self.head_block {
108 trace!(target: "providers::blockchain", "Using head state for latest state provider");
109 Ok(self.block_state_provider_ref(state)?.boxed())
110 } else {
111 trace!(target: "providers::blockchain", "Using database state for latest state provider");
112 Ok(self.storage_provider.latest())
113 }
114 }
115
116 fn history_by_block_hash_ref<'a>(
117 &'a self,
118 block_hash: BlockHash,
119 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
120 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
121
122 self.get_in_memory_or_storage_by_block(
123 block_hash.into(),
124 |_| self.storage_provider.history_by_block_hash(block_hash),
125 |block_state| {
126 let state_provider = self.block_state_provider_ref(block_state)?;
127 Ok(Box::new(state_provider))
128 },
129 )
130 }
131
132 fn state_by_block_number_ref<'a>(
134 &'a self,
135 number: BlockNumber,
136 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
137 let hash =
138 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
139 self.history_by_block_hash_ref(hash)
140 }
141
142 pub fn get_state(
146 &self,
147 range: RangeInclusive<BlockNumber>,
148 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
149 if range.is_empty() {
150 return Ok(None)
151 }
152 let start_block_number = *range.start();
153 let end_block_number = *range.end();
154
155 let mut block_bodies = Vec::new();
157 for block_num in range.clone() {
158 let block_body = self
159 .block_body_indices(block_num)?
160 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
161 block_bodies.push((block_num, block_body))
162 }
163
164 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
166 else {
167 return Ok(None)
168 };
169 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
170 return Ok(None)
171 };
172
173 let mut account_changeset = Vec::new();
174 for block_num in range.clone() {
175 let changeset =
176 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
177 account_changeset.extend(changeset);
178 }
179
180 let mut storage_changeset = Vec::new();
181 for block_num in range {
182 let changeset = self.storage_changeset(block_num)?;
183 storage_changeset.extend(changeset);
184 }
185
186 let (state, reverts) =
187 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
188
189 let mut receipt_iter =
190 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
191
192 let mut receipts = Vec::with_capacity(block_bodies.len());
193 for (_, block_body) in block_bodies {
195 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
196 for tx_num in block_body.tx_num_range() {
197 let receipt = receipt_iter
198 .next()
199 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
200 block_receipts.push(receipt);
201 }
202 receipts.push(block_receipts);
203 }
204
205 Ok(Some(ExecutionOutcome::new_init(
206 state,
207 reverts,
208 Vec::new(),
210 receipts,
211 start_block_number,
212 Vec::new(),
213 )))
214 }
215
216 fn populate_bundle_state(
220 &self,
221 account_changeset: Vec<(u64, AccountBeforeTx)>,
222 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
223 block_range_end: BlockNumber,
224 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
225 let mut state: BundleStateInit = HashMap::default();
226 let mut reverts: RevertsInit = HashMap::default();
227 let state_provider = self.state_by_block_number_ref(block_range_end)?;
228
229 for (block_number, account_before) in account_changeset.into_iter().rev() {
231 let AccountBeforeTx { info: old_info, address } = account_before;
232 match state.entry(address) {
233 hash_map::Entry::Vacant(entry) => {
234 let new_info = state_provider.basic_account(&address)?;
235 entry.insert((old_info, new_info, HashMap::default()));
236 }
237 hash_map::Entry::Occupied(mut entry) => {
238 entry.get_mut().0 = old_info;
240 }
241 }
242 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
244 }
245
246 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
248 let BlockNumberAddress((block_number, address)) = block_and_address;
249 let account_state = match state.entry(address) {
251 hash_map::Entry::Vacant(entry) => {
252 let present_info = state_provider.basic_account(&address)?;
253 entry.insert((present_info, present_info, HashMap::default()))
254 }
255 hash_map::Entry::Occupied(entry) => entry.into_mut(),
256 };
257
258 match account_state.2.entry(old_storage.key) {
260 hash_map::Entry::Vacant(entry) => {
261 let new_storage_value =
262 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
263 entry.insert((old_storage.value, new_storage_value));
264 }
265 hash_map::Entry::Occupied(mut entry) => {
266 entry.get_mut().0 = old_storage.value;
267 }
268 };
269
270 reverts
271 .entry(block_number)
272 .or_default()
273 .entry(address)
274 .or_default()
275 .1
276 .push(old_storage);
277 }
278
279 Ok((state, reverts))
280 }
281
282 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
294 &self,
295 range: impl RangeBounds<BlockNumber>,
296 fetch_db_range: F,
297 map_block_state_item: G,
298 mut predicate: P,
299 ) -> ProviderResult<Vec<T>>
300 where
301 F: FnOnce(
302 &DatabaseProviderRO<N::DB, N>,
303 RangeInclusive<BlockNumber>,
304 &mut P,
305 ) -> ProviderResult<Vec<T>>,
306 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
307 P: FnMut(&T) -> bool,
308 {
309 let mut in_memory_chain =
317 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
318 let db_provider = &self.storage_provider;
319
320 let (start, end) = self.convert_range_bounds(range, || {
321 in_memory_chain
323 .first()
324 .map(|b| b.number())
325 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
326 });
327
328 if start > end {
329 return Ok(vec![])
330 }
331
332 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
337 Some(lowest_memory_block) if lowest_memory_block <= end => {
338 let highest_memory_block =
339 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
340
341 let in_memory_range =
345 lowest_memory_block.max(start)..=end.min(highest_memory_block);
346
347 in_memory_chain.truncate(
350 in_memory_chain
351 .len()
352 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
353 );
354
355 let storage_range =
356 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
357
358 (Some((in_memory_chain, in_memory_range)), storage_range)
359 }
360 _ => {
361 drop(in_memory_chain);
363
364 (None, Some(start..=end))
365 }
366 };
367
368 let mut items = Vec::with_capacity((end - start + 1) as usize);
369
370 if let Some(storage_range) = storage_range {
371 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
372 items.append(&mut db_items);
373
374 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
377 return Ok(items)
378 }
379 }
380
381 if let Some((in_memory_chain, in_memory_range)) = in_memory {
382 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
383 debug_assert!(num == block.number());
384 if let Some(item) = map_block_state_item(block, &mut predicate) {
385 items.push(item);
386 } else {
387 break
388 }
389 }
390 }
391
392 Ok(items)
393 }
394
395 fn block_state_provider_ref(
397 &self,
398 state: &BlockState<N::Primitives>,
399 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
400 let anchor_hash = state.anchor().hash;
401 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
402 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
403 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
404 }
405
406 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
412 &self,
413 range: impl RangeBounds<BlockNumber>,
414 fetch_from_db: S,
415 fetch_from_block_state: M,
416 ) -> ProviderResult<Vec<R>>
417 where
418 S: FnOnce(
419 &DatabaseProviderRO<N::DB, N>,
420 RangeInclusive<TxNumber>,
421 ) -> ProviderResult<Vec<R>>,
422 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
423 {
424 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
425 let provider = &self.storage_provider;
426
427 let last_database_block_number = in_mem_chain
430 .last()
431 .map(|b| Ok(b.anchor().number))
432 .unwrap_or_else(|| provider.last_block_number())?;
433
434 let last_block_body_index = provider
437 .block_body_indices(last_database_block_number)?
438 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
439 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
440
441 let (start, end) = self.convert_range_bounds(range, || {
442 in_mem_chain
443 .iter()
444 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
445 .sum::<u64>() +
446 last_block_body_index.last_tx_num()
447 });
448
449 if start > end {
450 return Ok(vec![])
451 }
452
453 let mut tx_range = start..=end;
454
455 if *tx_range.end() < in_memory_tx_num {
458 return fetch_from_db(provider, tx_range);
459 }
460
461 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
462
463 if *tx_range.start() < in_memory_tx_num {
465 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
467
468 tx_range = in_memory_tx_num..=*tx_range.end();
470
471 items.extend(fetch_from_db(provider, db_range)?);
472 }
473
474 for block_state in in_mem_chain.iter().rev() {
476 let block_tx_count =
477 block_state.block_ref().recovered_block().body().transactions().len();
478 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
479
480 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
483 in_memory_tx_num += block_tx_count as u64;
484 continue
485 }
486
487 let skip = (tx_range.start() - in_memory_tx_num) as usize;
489
490 items.extend(fetch_from_block_state(
491 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
492 block_state,
493 )?);
494
495 in_memory_tx_num += block_tx_count as u64;
496
497 if in_memory_tx_num > *tx_range.end() {
499 break
500 }
501
502 tx_range = in_memory_tx_num..=*tx_range.end();
504 }
505
506 Ok(items)
507 }
508
509 fn get_in_memory_or_storage_by_tx<S, M, R>(
512 &self,
513 id: HashOrNumber,
514 fetch_from_db: S,
515 fetch_from_block_state: M,
516 ) -> ProviderResult<Option<R>>
517 where
518 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
519 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
520 {
521 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
522 let provider = &self.storage_provider;
523
524 let last_database_block_number = in_mem_chain
527 .last()
528 .map(|b| Ok(b.anchor().number))
529 .unwrap_or_else(|| provider.last_block_number())?;
530
531 let last_block_body_index = provider
534 .block_body_indices(last_database_block_number)?
535 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
536 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
537
538 if let HashOrNumber::Number(id) = id &&
541 id < in_memory_tx_num
542 {
543 return fetch_from_db(provider)
544 }
545
546 for block_state in in_mem_chain.iter().rev() {
548 let executed_block = block_state.block_ref();
549 let block = executed_block.recovered_block();
550
551 for tx_index in 0..block.body().transactions().len() {
552 match id {
553 HashOrNumber::Hash(tx_hash) => {
554 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
555 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
556 }
557 }
558 HashOrNumber::Number(id) => {
559 if id == in_memory_tx_num {
560 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
561 }
562 }
563 }
564
565 in_memory_tx_num += 1;
566 }
567 }
568
569 if let HashOrNumber::Hash(_) = id {
571 return fetch_from_db(provider)
572 }
573
574 Ok(None)
575 }
576
577 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
579 &self,
580 id: BlockHashOrNumber,
581 fetch_from_db: S,
582 fetch_from_block_state: M,
583 ) -> ProviderResult<R>
584 where
585 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
586 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
587 {
588 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
589 return fetch_from_block_state(block_state)
590 }
591 fetch_from_db(&self.storage_provider)
592 }
593
594 pub(crate) fn into_state_provider_at_block_hash(
596 self,
597 block_hash: BlockHash,
598 ) -> ProviderResult<Box<dyn StateProvider>> {
599 let Self { storage_provider, head_block, .. } = self;
600 let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
601 let block_number = storage_provider
602 .block_number(block_hash)?
603 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
604 storage_provider.try_into_history_at_block(block_number)
605 };
606 if let Some(Some(block_state)) =
607 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
608 {
609 let anchor_hash = block_state.anchor().hash;
610 let latest_historical = into_history_at_block_hash(anchor_hash)?;
611 return Ok(Box::new(block_state.state_provider(latest_historical)));
612 }
613 into_history_at_block_hash(block_hash)
614 }
615}
616
617impl<N: ProviderNodeTypes> ConsistentProvider<N> {
618 #[inline]
627 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
628 let latest = self.best_block_number()?;
629 if block_number > latest {
630 Err(ProviderError::HeaderNotFound(block_number.into()))
631 } else {
632 Ok(())
633 }
634 }
635}
636
637impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
638 type Primitives = N::Primitives;
639}
640
641impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
642 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
643 self.storage_provider.static_file_provider()
644 }
645}
646
647impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
648 type Header = HeaderTy<N>;
649
650 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
651 self.get_in_memory_or_storage_by_block(
652 block_hash.into(),
653 |db_provider| db_provider.header(block_hash),
654 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
655 )
656 }
657
658 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
659 self.get_in_memory_or_storage_by_block(
660 num.into(),
661 |db_provider| db_provider.header_by_number(num),
662 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
663 )
664 }
665
666 fn headers_range(
667 &self,
668 range: impl RangeBounds<BlockNumber>,
669 ) -> ProviderResult<Vec<Self::Header>> {
670 self.get_in_memory_or_storage_by_block_range_while(
671 range,
672 |db_provider, range, _| db_provider.headers_range(range),
673 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
674 |_| true,
675 )
676 }
677
678 fn sealed_header(
679 &self,
680 number: BlockNumber,
681 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
682 self.get_in_memory_or_storage_by_block(
683 number.into(),
684 |db_provider| db_provider.sealed_header(number),
685 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
686 )
687 }
688
689 fn sealed_headers_range(
690 &self,
691 range: impl RangeBounds<BlockNumber>,
692 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
693 self.get_in_memory_or_storage_by_block_range_while(
694 range,
695 |db_provider, range, _| db_provider.sealed_headers_range(range),
696 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
697 |_| true,
698 )
699 }
700
701 fn sealed_headers_while(
702 &self,
703 range: impl RangeBounds<BlockNumber>,
704 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
705 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
706 self.get_in_memory_or_storage_by_block_range_while(
707 range,
708 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
709 |block_state, predicate| {
710 let header = block_state.block_ref().recovered_block().sealed_header();
711 predicate(header).then(|| header.clone())
712 },
713 predicate,
714 )
715 }
716}
717
718impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
719 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
720 self.get_in_memory_or_storage_by_block(
721 number.into(),
722 |db_provider| db_provider.block_hash(number),
723 |block_state| Ok(Some(block_state.hash())),
724 )
725 }
726
727 fn canonical_hashes_range(
728 &self,
729 start: BlockNumber,
730 end: BlockNumber,
731 ) -> ProviderResult<Vec<B256>> {
732 self.get_in_memory_or_storage_by_block_range_while(
733 start..end,
734 |db_provider, inclusive_range, _| {
735 db_provider
736 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
737 },
738 |block_state, _| Some(block_state.hash()),
739 |_| true,
740 )
741 }
742}
743
744impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
745 fn chain_info(&self) -> ProviderResult<ChainInfo> {
746 let best_number = self.best_block_number()?;
747 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
748 }
749
750 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
751 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
752 }
753
754 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
755 self.storage_provider.last_block_number()
756 }
757
758 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
759 self.get_in_memory_or_storage_by_block(
760 hash.into(),
761 |db_provider| db_provider.block_number(hash),
762 |block_state| Ok(Some(block_state.number())),
763 )
764 }
765}
766
767impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
768 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
769 Ok(self.canonical_in_memory_state.pending_block_num_hash())
770 }
771
772 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
773 Ok(self.canonical_in_memory_state.get_safe_num_hash())
774 }
775
776 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
777 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
778 }
779}
780
781impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
782 type Block = BlockTy<N>;
783
784 fn find_block_by_hash(
785 &self,
786 hash: B256,
787 source: BlockSource,
788 ) -> ProviderResult<Option<Self::Block>> {
789 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
790 let Some(block) = self.get_in_memory_or_storage_by_block(
791 hash.into(),
792 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
793 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
794 )?
795 {
796 return Ok(Some(block))
797 }
798
799 if matches!(source, BlockSource::Pending | BlockSource::Any) {
800 return Ok(self
801 .canonical_in_memory_state
802 .pending_block()
803 .filter(|b| b.hash() == hash)
804 .map(|b| b.into_block()))
805 }
806
807 Ok(None)
808 }
809
810 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
811 self.get_in_memory_or_storage_by_block(
812 id,
813 |db_provider| db_provider.block(id),
814 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
815 )
816 }
817
818 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
819 Ok(self.canonical_in_memory_state.pending_recovered_block())
820 }
821
822 fn pending_block_and_receipts(
823 &self,
824 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
825 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
826 }
827
828 fn recovered_block(
835 &self,
836 id: BlockHashOrNumber,
837 transaction_kind: TransactionVariant,
838 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
839 self.get_in_memory_or_storage_by_block(
840 id,
841 |db_provider| db_provider.recovered_block(id, transaction_kind),
842 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
843 )
844 }
845
846 fn sealed_block_with_senders(
847 &self,
848 id: BlockHashOrNumber,
849 transaction_kind: TransactionVariant,
850 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
851 self.get_in_memory_or_storage_by_block(
852 id,
853 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
854 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
855 )
856 }
857
858 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
859 self.get_in_memory_or_storage_by_block_range_while(
860 range,
861 |db_provider, range, _| db_provider.block_range(range),
862 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
863 |_| true,
864 )
865 }
866
867 fn block_with_senders_range(
868 &self,
869 range: RangeInclusive<BlockNumber>,
870 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
871 self.get_in_memory_or_storage_by_block_range_while(
872 range,
873 |db_provider, range, _| db_provider.block_with_senders_range(range),
874 |block_state, _| Some(block_state.block().recovered_block().clone()),
875 |_| true,
876 )
877 }
878
879 fn recovered_block_range(
880 &self,
881 range: RangeInclusive<BlockNumber>,
882 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
883 self.get_in_memory_or_storage_by_block_range_while(
884 range,
885 |db_provider, range, _| db_provider.recovered_block_range(range),
886 |block_state, _| Some(block_state.block().recovered_block().clone()),
887 |_| true,
888 )
889 }
890
891 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
892 self.get_in_memory_or_storage_by_tx(
893 id.into(),
894 |db_provider| db_provider.block_by_transaction_id(id),
895 |_, _, block_state| Ok(Some(block_state.number())),
896 )
897 }
898}
899
900impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
901 type Transaction = TxTy<N>;
902
903 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
904 self.get_in_memory_or_storage_by_tx(
905 tx_hash.into(),
906 |db_provider| db_provider.transaction_id(tx_hash),
907 |_, tx_number, _| Ok(Some(tx_number)),
908 )
909 }
910
911 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
912 self.get_in_memory_or_storage_by_tx(
913 id.into(),
914 |provider| provider.transaction_by_id(id),
915 |tx_index, _, block_state| {
916 Ok(block_state
917 .block_ref()
918 .recovered_block()
919 .body()
920 .transactions()
921 .get(tx_index)
922 .cloned())
923 },
924 )
925 }
926
927 fn transaction_by_id_unhashed(
928 &self,
929 id: TxNumber,
930 ) -> ProviderResult<Option<Self::Transaction>> {
931 self.get_in_memory_or_storage_by_tx(
932 id.into(),
933 |provider| provider.transaction_by_id_unhashed(id),
934 |tx_index, _, block_state| {
935 Ok(block_state
936 .block_ref()
937 .recovered_block()
938 .body()
939 .transactions()
940 .get(tx_index)
941 .cloned())
942 },
943 )
944 }
945
946 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
947 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
948 return Ok(Some(tx))
949 }
950
951 self.storage_provider.transaction_by_hash(hash)
952 }
953
954 fn transaction_by_hash_with_meta(
955 &self,
956 tx_hash: TxHash,
957 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
958 if let Some((tx, meta)) =
959 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
960 {
961 return Ok(Some((tx, meta)))
962 }
963
964 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
965 }
966
967 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
968 self.get_in_memory_or_storage_by_tx(
969 id.into(),
970 |provider| provider.transaction_block(id),
971 |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
972 )
973 }
974
975 fn transactions_by_block(
976 &self,
977 id: BlockHashOrNumber,
978 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
979 self.get_in_memory_or_storage_by_block(
980 id,
981 |provider| provider.transactions_by_block(id),
982 |block_state| {
983 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
984 },
985 )
986 }
987
988 fn transactions_by_block_range(
989 &self,
990 range: impl RangeBounds<BlockNumber>,
991 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
992 self.get_in_memory_or_storage_by_block_range_while(
993 range,
994 |db_provider, range, _| db_provider.transactions_by_block_range(range),
995 |block_state, _| {
996 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
997 },
998 |_| true,
999 )
1000 }
1001
1002 fn transactions_by_tx_range(
1003 &self,
1004 range: impl RangeBounds<TxNumber>,
1005 ) -> ProviderResult<Vec<Self::Transaction>> {
1006 self.get_in_memory_or_storage_by_tx_range(
1007 range,
1008 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1009 |index_range, block_state| {
1010 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1011 .to_vec())
1012 },
1013 )
1014 }
1015
1016 fn senders_by_tx_range(
1017 &self,
1018 range: impl RangeBounds<TxNumber>,
1019 ) -> ProviderResult<Vec<Address>> {
1020 self.get_in_memory_or_storage_by_tx_range(
1021 range,
1022 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1023 |index_range, block_state| {
1024 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1025 },
1026 )
1027 }
1028
1029 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1030 self.get_in_memory_or_storage_by_tx(
1031 id.into(),
1032 |provider| provider.transaction_sender(id),
1033 |tx_index, _, block_state| {
1034 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1035 },
1036 )
1037 }
1038}
1039
1040impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1041 type Receipt = ReceiptTy<N>;
1042
1043 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1044 self.get_in_memory_or_storage_by_tx(
1045 id.into(),
1046 |provider| provider.receipt(id),
1047 |tx_index, _, block_state| {
1048 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1049 },
1050 )
1051 }
1052
1053 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1054 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1055 let executed_block = block_state.block_ref();
1056 let block = executed_block.recovered_block();
1057 let receipts = block_state.executed_block_receipts();
1058
1059 debug_assert_eq!(
1061 block.body().transactions().len(),
1062 receipts.len(),
1063 "Mismatch between transaction and receipt count"
1064 );
1065
1066 if let Some(tx_index) =
1067 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1068 {
1069 return Ok(receipts.get(tx_index).cloned());
1071 }
1072 }
1073
1074 self.storage_provider.receipt_by_hash(hash)
1075 }
1076
1077 fn receipts_by_block(
1078 &self,
1079 block: BlockHashOrNumber,
1080 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1081 self.get_in_memory_or_storage_by_block(
1082 block,
1083 |db_provider| db_provider.receipts_by_block(block),
1084 |block_state| Ok(Some(block_state.executed_block_receipts())),
1085 )
1086 }
1087
1088 fn receipts_by_tx_range(
1089 &self,
1090 range: impl RangeBounds<TxNumber>,
1091 ) -> ProviderResult<Vec<Self::Receipt>> {
1092 self.get_in_memory_or_storage_by_tx_range(
1093 range,
1094 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1095 |index_range, block_state| {
1096 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1097 },
1098 )
1099 }
1100
1101 fn receipts_by_block_range(
1102 &self,
1103 block_range: RangeInclusive<BlockNumber>,
1104 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1105 self.storage_provider.receipts_by_block_range(block_range)
1106 }
1107}
1108
1109impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1110 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1111 match block {
1112 BlockId::Hash(rpc_block_hash) => {
1113 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1114 if receipts.is_none() &&
1115 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1116 let Some(state) = self
1117 .head_block
1118 .as_ref()
1119 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1120 {
1121 receipts = Some(state.executed_block_receipts());
1122 }
1123 Ok(receipts)
1124 }
1125 BlockId::Number(num_tag) => match num_tag {
1126 BlockNumberOrTag::Pending => Ok(self
1127 .canonical_in_memory_state
1128 .pending_state()
1129 .map(|block_state| block_state.executed_block_receipts())),
1130 _ => {
1131 if let Some(num) = self.convert_block_number(num_tag)? {
1132 self.receipts_by_block(num.into())
1133 } else {
1134 Ok(None)
1135 }
1136 }
1137 },
1138 }
1139 }
1140}
1141
1142impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1143 fn block_body_indices(
1144 &self,
1145 number: BlockNumber,
1146 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1147 self.get_in_memory_or_storage_by_block(
1148 number.into(),
1149 |db_provider| db_provider.block_body_indices(number),
1150 |block_state| {
1151 let last_storage_block_number = block_state.anchor().number;
1153 let mut stored_indices = self
1154 .storage_provider
1155 .block_body_indices(last_storage_block_number)?
1156 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1157
1158 stored_indices.first_tx_num = stored_indices.next_tx_num();
1160 stored_indices.tx_count = 0;
1161
1162 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1164 let block_tx_count =
1165 state.block_ref().recovered_block().body().transactions().len() as u64;
1166 if state.block_ref().recovered_block().number() == number {
1167 stored_indices.tx_count = block_tx_count;
1168 } else {
1169 stored_indices.first_tx_num += block_tx_count;
1170 }
1171 }
1172
1173 Ok(Some(stored_indices))
1174 },
1175 )
1176 }
1177
1178 fn block_body_indices_range(
1179 &self,
1180 range: RangeInclusive<BlockNumber>,
1181 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1182 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1183 }
1184}
1185
1186impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1187 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1188 self.storage_provider.get_stage_checkpoint(id)
1189 }
1190
1191 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1192 self.storage_provider.get_stage_checkpoint_progress(id)
1193 }
1194
1195 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1196 self.storage_provider.get_all_checkpoints()
1197 }
1198}
1199
1200impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1201 fn get_prune_checkpoint(
1202 &self,
1203 segment: PruneSegment,
1204 ) -> ProviderResult<Option<PruneCheckpoint>> {
1205 self.storage_provider.get_prune_checkpoint(segment)
1206 }
1207
1208 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1209 self.storage_provider.get_prune_checkpoints()
1210 }
1211}
1212
1213impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1214 type ChainSpec = N::ChainSpec;
1215
1216 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1217 ChainSpecProvider::chain_spec(&self.storage_provider)
1218 }
1219}
1220
1221impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1222 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1223 match id {
1224 BlockId::Number(num) => self.block_by_number_or_tag(num),
1225 BlockId::Hash(hash) => {
1226 if Some(true) == hash.require_canonical {
1231 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1233 } else {
1234 self.block_by_hash(hash.block_hash)
1235 }
1236 }
1237 }
1238 }
1239
1240 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1241 Ok(match id {
1242 BlockNumberOrTag::Latest => {
1243 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1244 }
1245 BlockNumberOrTag::Finalized => {
1246 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1247 }
1248 BlockNumberOrTag::Safe => {
1249 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1250 }
1251 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1252 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1253
1254 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1255 })
1256 }
1257
1258 fn sealed_header_by_number_or_tag(
1259 &self,
1260 id: BlockNumberOrTag,
1261 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1262 match id {
1263 BlockNumberOrTag::Latest => {
1264 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1265 }
1266 BlockNumberOrTag::Finalized => {
1267 Ok(self.canonical_in_memory_state.get_finalized_header())
1268 }
1269 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1270 BlockNumberOrTag::Earliest => self
1271 .header_by_number(self.earliest_block_number()?)?
1272 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1273 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1274 BlockNumberOrTag::Number(num) => self
1275 .header_by_number(num)?
1276 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1277 }
1278 }
1279
1280 fn sealed_header_by_id(
1281 &self,
1282 id: BlockId,
1283 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1284 Ok(match id {
1285 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1286 BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1287 })
1288 }
1289
1290 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1291 Ok(match id {
1292 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1293 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1294 })
1295 }
1296}
1297
1298impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1299 fn storage_changeset(
1300 &self,
1301 block_number: BlockNumber,
1302 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1303 if let Some(state) =
1304 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1305 {
1306 let changesets = state
1307 .block()
1308 .execution_output
1309 .bundle
1310 .reverts
1311 .clone()
1312 .to_plain_state_reverts()
1313 .storage
1314 .into_iter()
1315 .flatten()
1316 .flat_map(|revert: PlainStorageRevert| {
1317 revert.storage_revert.into_iter().map(move |(key, value)| {
1318 (
1319 BlockNumberAddress((block_number, revert.address)),
1320 StorageEntry { key: key.into(), value: value.to_previous_value() },
1321 )
1322 })
1323 })
1324 .collect();
1325 Ok(changesets)
1326 } else {
1327 let storage_history_exists = self
1331 .storage_provider
1332 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1333 .and_then(|checkpoint| {
1334 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1339 })
1340 .unwrap_or(true);
1341
1342 if !storage_history_exists {
1343 return Err(ProviderError::StateAtBlockPruned(block_number))
1344 }
1345
1346 self.storage_provider.storage_changeset(block_number)
1347 }
1348 }
1349}
1350
1351impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1352 fn account_block_changeset(
1353 &self,
1354 block_number: BlockNumber,
1355 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1356 if let Some(state) =
1357 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1358 {
1359 let changesets = state
1360 .block_ref()
1361 .execution_output
1362 .bundle
1363 .reverts
1364 .clone()
1365 .to_plain_state_reverts()
1366 .accounts
1367 .into_iter()
1368 .flatten()
1369 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1370 .collect();
1371 Ok(changesets)
1372 } else {
1373 let account_history_exists = self
1377 .storage_provider
1378 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1379 .and_then(|checkpoint| {
1380 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1385 })
1386 .unwrap_or(true);
1387
1388 if !account_history_exists {
1389 return Err(ProviderError::StateAtBlockPruned(block_number))
1390 }
1391
1392 self.storage_provider.account_block_changeset(block_number)
1393 }
1394 }
1395
1396 fn get_account_before_block(
1397 &self,
1398 block_number: BlockNumber,
1399 address: Address,
1400 ) -> ProviderResult<Option<AccountBeforeTx>> {
1401 if let Some(state) =
1402 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1403 {
1404 let changeset = state
1406 .block_ref()
1407 .execution_output
1408 .bundle
1409 .reverts
1410 .clone()
1411 .to_plain_state_reverts()
1412 .accounts
1413 .into_iter()
1414 .flatten()
1415 .find(|(addr, _)| addr == &address)
1416 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1417 Ok(changeset)
1418 } else {
1419 let account_history_exists = self
1422 .storage_provider
1423 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1424 .and_then(|checkpoint| {
1425 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1430 })
1431 .unwrap_or(true);
1432
1433 if !account_history_exists {
1434 return Err(ProviderError::StateAtBlockPruned(block_number))
1435 }
1436
1437 self.storage_provider.get_account_before_block(block_number, address)
1439 }
1440 }
1441}
1442
1443impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1444 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1446 let state_provider = self.latest_ref()?;
1448 state_provider.basic_account(address)
1449 }
1450}
1451
1452impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1453 type Receipt = ReceiptTy<N>;
1454
1455 fn get_state(
1465 &self,
1466 block: BlockNumber,
1467 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1468 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1469 let state = state.block_ref().execution_outcome().clone();
1470 Ok(Some(state))
1471 } else {
1472 Self::get_state(self, block..=block)
1473 }
1474 }
1475}
1476
1477impl<N: ProviderNodeTypes> TrieReader for ConsistentProvider<N> {
1478 fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
1479 self.storage_provider.trie_reverts(from)
1480 }
1481
1482 fn get_block_trie_updates(
1483 &self,
1484 block_number: BlockNumber,
1485 ) -> ProviderResult<TrieUpdatesSorted> {
1486 self.storage_provider.get_block_trie_updates(block_number)
1487 }
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492 use crate::{
1493 providers::blockchain_provider::BlockchainProvider,
1494 test_utils::create_test_provider_factory, BlockWriter,
1495 };
1496 use alloy_eips::BlockHashOrNumber;
1497 use alloy_primitives::B256;
1498 use itertools::Itertools;
1499 use rand::Rng;
1500 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1501 use reth_db_api::models::AccountBeforeTx;
1502 use reth_ethereum_primitives::Block;
1503 use reth_execution_types::ExecutionOutcome;
1504 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1505 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1506 use reth_testing_utils::generators::{
1507 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1508 };
1509 use revm_database::BundleState;
1510 use std::{
1511 ops::{Bound, Range, RangeBounds},
1512 sync::Arc,
1513 };
1514
1515 const TEST_BLOCKS_COUNT: usize = 5;
1516
1517 fn random_blocks(
1518 rng: &mut impl Rng,
1519 database_blocks: usize,
1520 in_memory_blocks: usize,
1521 requests_count: Option<Range<u8>>,
1522 withdrawals_count: Option<Range<u8>>,
1523 tx_count: impl RangeBounds<u8>,
1524 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1525 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1526
1527 let tx_start = match tx_count.start_bound() {
1528 Bound::Included(&n) | Bound::Excluded(&n) => n,
1529 Bound::Unbounded => u8::MIN,
1530 };
1531 let tx_end = match tx_count.end_bound() {
1532 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1533 Bound::Unbounded => u8::MAX,
1534 };
1535
1536 let blocks = random_block_range(
1537 rng,
1538 0..=block_range,
1539 BlockRangeParams {
1540 parent: Some(B256::ZERO),
1541 tx_count: tx_start..tx_end,
1542 requests_count,
1543 withdrawals_count,
1544 },
1545 );
1546 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1547 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1548 }
1549
1550 #[test]
1551 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1552 let mut rng = generators::rng();
1554 let factory = create_test_provider_factory();
1555
1556 let blocks = random_block_range(
1558 &mut rng,
1559 0..=10,
1560 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1561 );
1562 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1563
1564 let provider_rw = factory.provider_rw()?;
1566 for block in database_blocks {
1567 provider_rw.insert_block(
1568 block.clone().try_recover().expect("failed to seal block with senders"),
1569 )?;
1570 }
1571 provider_rw.commit()?;
1572
1573 let provider = BlockchainProvider::new(factory)?;
1575 let consistent_provider = provider.consistent_provider()?;
1576
1577 let first_db_block = database_blocks.first().unwrap();
1579 let first_in_mem_block = in_memory_blocks.first().unwrap();
1580 let last_in_mem_block = in_memory_blocks.last().unwrap();
1581
1582 assert_eq!(
1584 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1585 None
1586 );
1587 assert_eq!(
1588 consistent_provider
1589 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1590 None
1591 );
1592 assert_eq!(
1594 consistent_provider
1595 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1596 None
1597 );
1598
1599 let in_memory_block_senders =
1601 first_in_mem_block.senders().expect("failed to recover senders");
1602 let chain = NewCanonicalChain::Commit {
1603 new: vec![ExecutedBlock {
1604 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1605 first_in_mem_block.clone(),
1606 in_memory_block_senders,
1607 )),
1608 ..Default::default()
1609 }],
1610 };
1611 consistent_provider.canonical_in_memory_state.update_chain(chain);
1612 let consistent_provider = provider.consistent_provider()?;
1613
1614 assert_eq!(
1616 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1617 Some(first_in_mem_block.clone().into_block())
1618 );
1619 assert_eq!(
1620 consistent_provider
1621 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1622 Some(first_in_mem_block.clone().into_block())
1623 );
1624
1625 assert_eq!(
1627 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1628 Some(first_db_block.clone().into_block())
1629 );
1630 assert_eq!(
1631 consistent_provider
1632 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1633 Some(first_db_block.clone().into_block())
1634 );
1635
1636 assert_eq!(
1638 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1639 None
1640 );
1641
1642 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1644 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1645 last_in_mem_block.clone(),
1646 Default::default(),
1647 )),
1648 ..Default::default()
1649 });
1650
1651 assert_eq!(
1653 consistent_provider
1654 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1655 Some(last_in_mem_block.clone_block())
1656 );
1657
1658 Ok(())
1659 }
1660
1661 #[test]
1662 fn test_block_reader_block() -> eyre::Result<()> {
1663 let mut rng = generators::rng();
1665 let factory = create_test_provider_factory();
1666
1667 let blocks = random_block_range(
1669 &mut rng,
1670 0..=10,
1671 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1672 );
1673 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1674
1675 let provider_rw = factory.provider_rw()?;
1677 for block in database_blocks {
1678 provider_rw.insert_block(
1679 block.clone().try_recover().expect("failed to seal block with senders"),
1680 )?;
1681 }
1682 provider_rw.commit()?;
1683
1684 let provider = BlockchainProvider::new(factory)?;
1686 let consistent_provider = provider.consistent_provider()?;
1687
1688 let first_in_mem_block = in_memory_blocks.first().unwrap();
1690 let first_db_block = database_blocks.first().unwrap();
1692
1693 assert_eq!(
1695 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1696 None
1697 );
1698 assert_eq!(
1699 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1700 None
1701 );
1702
1703 let in_memory_block_senders =
1705 first_in_mem_block.senders().expect("failed to recover senders");
1706 let chain = NewCanonicalChain::Commit {
1707 new: vec![ExecutedBlock {
1708 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1709 first_in_mem_block.clone(),
1710 in_memory_block_senders,
1711 )),
1712 ..Default::default()
1713 }],
1714 };
1715 consistent_provider.canonical_in_memory_state.update_chain(chain);
1716
1717 let consistent_provider = provider.consistent_provider()?;
1718
1719 assert_eq!(
1721 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1722 Some(first_in_mem_block.clone().into_block())
1723 );
1724 assert_eq!(
1725 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1726 Some(first_in_mem_block.clone().into_block())
1727 );
1728
1729 assert_eq!(
1731 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1732 Some(first_db_block.clone().into_block())
1733 );
1734 assert_eq!(
1735 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1736 Some(first_db_block.clone().into_block())
1737 );
1738
1739 Ok(())
1740 }
1741
1742 #[test]
1743 fn test_changeset_reader() -> eyre::Result<()> {
1744 let mut rng = generators::rng();
1745
1746 let (database_blocks, in_memory_blocks) =
1747 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1748
1749 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1750 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1751 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1752
1753 let accounts = random_eoa_accounts(&mut rng, 2);
1754
1755 let (database_changesets, database_state) = random_changeset_range(
1756 &mut rng,
1757 &database_blocks,
1758 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1759 0..0,
1760 0..0,
1761 );
1762 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1763 &mut rng,
1764 &in_memory_blocks,
1765 database_state
1766 .iter()
1767 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1768 0..0,
1769 0..0,
1770 );
1771
1772 let factory = create_test_provider_factory();
1773
1774 let provider_rw = factory.provider_rw()?;
1775 provider_rw.append_blocks_with_state(
1776 database_blocks
1777 .into_iter()
1778 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1779 .collect(),
1780 &ExecutionOutcome {
1781 bundle: BundleState::new(
1782 database_state.into_iter().map(|(address, (account, _))| {
1783 (address, None, Some(account.into()), Default::default())
1784 }),
1785 database_changesets
1786 .iter()
1787 .map(|block_changesets| {
1788 block_changesets.iter().map(|(address, account, _)| {
1789 (*address, Some(Some((*account).into())), [])
1790 })
1791 })
1792 .collect::<Vec<_>>(),
1793 Vec::new(),
1794 ),
1795 first_block: first_database_block,
1796 ..Default::default()
1797 },
1798 Default::default(),
1799 )?;
1800 provider_rw.commit()?;
1801
1802 let provider = BlockchainProvider::new(factory)?;
1803
1804 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1805 let chain = NewCanonicalChain::Commit {
1806 new: vec![in_memory_blocks
1807 .first()
1808 .map(|block| {
1809 let senders = block.senders().expect("failed to recover senders");
1810 ExecutedBlock {
1811 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1812 block.clone(),
1813 senders,
1814 )),
1815 execution_output: Arc::new(ExecutionOutcome {
1816 bundle: BundleState::new(
1817 in_memory_state.into_iter().map(|(address, (account, _))| {
1818 (address, None, Some(account.into()), Default::default())
1819 }),
1820 [in_memory_changesets.iter().map(|(address, account, _)| {
1821 (*address, Some(Some((*account).into())), Vec::new())
1822 })],
1823 [],
1824 ),
1825 first_block: first_in_memory_block,
1826 ..Default::default()
1827 }),
1828 ..Default::default()
1829 }
1830 })
1831 .unwrap()],
1832 };
1833 provider.canonical_in_memory_state.update_chain(chain);
1834
1835 let consistent_provider = provider.consistent_provider()?;
1836
1837 assert_eq!(
1838 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1839 database_changesets
1840 .into_iter()
1841 .next_back()
1842 .unwrap()
1843 .into_iter()
1844 .sorted_by_key(|(address, _, _)| *address)
1845 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1846 .collect::<Vec<_>>()
1847 );
1848 assert_eq!(
1849 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1850 in_memory_changesets
1851 .into_iter()
1852 .sorted_by_key(|(address, _, _)| *address)
1853 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1854 .collect::<Vec<_>>()
1855 );
1856
1857 Ok(())
1858 }
1859}