1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12 HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{
24 Account, BlockBody, RecoveredBlock, SealedBlock, SealedHeader, StorageEntry,
25};
26use reth_prune_types::{PruneCheckpoint, PruneSegment};
27use reth_stages_types::{StageCheckpoint, StageId};
28use reth_storage_api::{
29 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30 StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
221 &self,
222 account_changeset: Vec<(u64, AccountBeforeTx)>,
223 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224 block_range_end: BlockNumber,
225 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226 let mut state: BundleStateInit = HashMap::default();
227 let mut reverts: RevertsInit = HashMap::default();
228 let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230 for (block_number, account_before) in account_changeset.into_iter().rev() {
232 let AccountBeforeTx { info: old_info, address } = account_before;
233 match state.entry(address) {
234 hash_map::Entry::Vacant(entry) => {
235 let new_info = state_provider.basic_account(&address)?;
236 entry.insert((old_info, new_info, HashMap::default()));
237 }
238 hash_map::Entry::Occupied(mut entry) => {
239 entry.get_mut().0 = old_info;
241 }
242 }
243 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245 }
246
247 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249 let BlockNumberAddress((block_number, address)) = block_and_address;
250 let account_state = match state.entry(address) {
252 hash_map::Entry::Vacant(entry) => {
253 let present_info = state_provider.basic_account(&address)?;
254 entry.insert((present_info, present_info, HashMap::default()))
255 }
256 hash_map::Entry::Occupied(entry) => entry.into_mut(),
257 };
258
259 match account_state.2.entry(old_storage.key) {
261 hash_map::Entry::Vacant(entry) => {
262 let new_storage_value =
263 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264 entry.insert((old_storage.value, new_storage_value));
265 }
266 hash_map::Entry::Occupied(mut entry) => {
267 entry.get_mut().0 = old_storage.value;
268 }
269 };
270
271 reverts
272 .entry(block_number)
273 .or_default()
274 .entry(address)
275 .or_default()
276 .1
277 .push(old_storage);
278 }
279
280 Ok((state, reverts))
281 }
282
283 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295 &self,
296 range: impl RangeBounds<BlockNumber>,
297 fetch_db_range: F,
298 map_block_state_item: G,
299 mut predicate: P,
300 ) -> ProviderResult<Vec<T>>
301 where
302 F: FnOnce(
303 &DatabaseProviderRO<N::DB, N>,
304 RangeInclusive<BlockNumber>,
305 &mut P,
306 ) -> ProviderResult<Vec<T>>,
307 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308 P: FnMut(&T) -> bool,
309 {
310 let mut in_memory_chain =
318 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319 let db_provider = &self.storage_provider;
320
321 let (start, end) = self.convert_range_bounds(range, || {
322 in_memory_chain
324 .first()
325 .map(|b| b.number())
326 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327 });
328
329 if start > end {
330 return Ok(vec![])
331 }
332
333 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338 Some(lowest_memory_block) if lowest_memory_block <= end => {
339 let highest_memory_block =
340 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342 let in_memory_range =
346 lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348 in_memory_chain.truncate(
351 in_memory_chain
352 .len()
353 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354 );
355
356 let storage_range =
357 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359 (Some((in_memory_chain, in_memory_range)), storage_range)
360 }
361 _ => {
362 drop(in_memory_chain);
364
365 (None, Some(start..=end))
366 }
367 };
368
369 let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371 if let Some(storage_range) = storage_range {
372 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373 items.append(&mut db_items);
374
375 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378 return Ok(items)
379 }
380 }
381
382 if let Some((in_memory_chain, in_memory_range)) = in_memory {
383 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384 debug_assert!(num == block.number());
385 if let Some(item) = map_block_state_item(block, &mut predicate) {
386 items.push(item);
387 } else {
388 break
389 }
390 }
391 }
392
393 Ok(items)
394 }
395
396 fn block_state_provider_ref(
398 &self,
399 state: &BlockState<N::Primitives>,
400 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401 let anchor_hash = state.anchor().hash;
402 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405 }
406
407 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413 &self,
414 range: impl RangeBounds<BlockNumber>,
415 fetch_from_db: S,
416 fetch_from_block_state: M,
417 ) -> ProviderResult<Vec<R>>
418 where
419 S: FnOnce(
420 &DatabaseProviderRO<N::DB, N>,
421 RangeInclusive<TxNumber>,
422 ) -> ProviderResult<Vec<R>>,
423 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424 {
425 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426 let provider = &self.storage_provider;
427
428 let last_database_block_number = in_mem_chain
431 .last()
432 .map(|b| Ok(b.anchor().number))
433 .unwrap_or_else(|| provider.last_block_number())?;
434
435 let last_block_body_index = provider
438 .block_body_indices(last_database_block_number)?
439 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442 let (start, end) = self.convert_range_bounds(range, || {
443 in_mem_chain
444 .iter()
445 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446 .sum::<u64>() +
447 last_block_body_index.last_tx_num()
448 });
449
450 if start > end {
451 return Ok(vec![])
452 }
453
454 let mut tx_range = start..=end;
455
456 if *tx_range.end() < in_memory_tx_num {
459 return fetch_from_db(provider, tx_range);
460 }
461
462 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464 if *tx_range.start() < in_memory_tx_num {
466 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469 tx_range = in_memory_tx_num..=*tx_range.end();
471
472 items.extend(fetch_from_db(provider, db_range)?);
473 }
474
475 for block_state in in_mem_chain.iter().rev() {
477 let block_tx_count =
478 block_state.block_ref().recovered_block().body().transactions().len();
479 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484 in_memory_tx_num += block_tx_count as u64;
485 continue
486 }
487
488 let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491 items.extend(fetch_from_block_state(
492 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493 block_state,
494 )?);
495
496 in_memory_tx_num += block_tx_count as u64;
497
498 if in_memory_tx_num > *tx_range.end() {
500 break
501 }
502
503 tx_range = in_memory_tx_num..=*tx_range.end();
505 }
506
507 Ok(items)
508 }
509
510 fn get_in_memory_or_storage_by_tx<S, M, R>(
513 &self,
514 id: HashOrNumber,
515 fetch_from_db: S,
516 fetch_from_block_state: M,
517 ) -> ProviderResult<Option<R>>
518 where
519 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521 {
522 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523 let provider = &self.storage_provider;
524
525 let last_database_block_number = in_mem_chain
528 .last()
529 .map(|b| Ok(b.anchor().number))
530 .unwrap_or_else(|| provider.last_block_number())?;
531
532 let last_block_body_index = provider
535 .block_body_indices(last_database_block_number)?
536 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539 if let HashOrNumber::Number(id) = id {
542 if id < in_memory_tx_num {
543 return fetch_from_db(provider)
544 }
545 }
546
547 for block_state in in_mem_chain.iter().rev() {
549 let executed_block = block_state.block_ref();
550 let block = executed_block.recovered_block();
551
552 for tx_index in 0..block.body().transactions().len() {
553 match id {
554 HashOrNumber::Hash(tx_hash) => {
555 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557 }
558 }
559 HashOrNumber::Number(id) => {
560 if id == in_memory_tx_num {
561 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562 }
563 }
564 }
565
566 in_memory_tx_num += 1;
567 }
568 }
569
570 if let HashOrNumber::Hash(_) = id {
572 return fetch_from_db(provider)
573 }
574
575 Ok(None)
576 }
577
578 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580 &self,
581 id: BlockHashOrNumber,
582 fetch_from_db: S,
583 fetch_from_block_state: M,
584 ) -> ProviderResult<R>
585 where
586 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588 {
589 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590 return fetch_from_block_state(block_state)
591 }
592 fetch_from_db(&self.storage_provider)
593 }
594
595 pub(crate) fn into_state_provider_at_block_hash(
597 self,
598 block_hash: BlockHash,
599 ) -> ProviderResult<Box<dyn StateProvider>> {
600 let Self { storage_provider, head_block, .. } = self;
601 let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
602 let block_number = storage_provider
603 .block_number(block_hash)?
604 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
605 storage_provider.try_into_history_at_block(block_number)
606 };
607 if let Some(Some(block_state)) =
608 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
609 {
610 let anchor_hash = block_state.anchor().hash;
611 let latest_historical = into_history_at_block_hash(anchor_hash)?;
612 return Ok(Box::new(block_state.state_provider(latest_historical)));
613 }
614 into_history_at_block_hash(block_hash)
615 }
616}
617
618impl<N: ProviderNodeTypes> ConsistentProvider<N> {
619 #[inline]
628 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
629 let latest = self.best_block_number()?;
630 if block_number > latest {
631 Err(ProviderError::HeaderNotFound(block_number.into()))
632 } else {
633 Ok(())
634 }
635 }
636}
637
638impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
639 type Primitives = N::Primitives;
640}
641
642impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
643 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
644 self.storage_provider.static_file_provider()
645 }
646}
647
648impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
649 type Header = HeaderTy<N>;
650
651 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
652 self.get_in_memory_or_storage_by_block(
653 (*block_hash).into(),
654 |db_provider| db_provider.header(block_hash),
655 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
656 )
657 }
658
659 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
660 self.get_in_memory_or_storage_by_block(
661 num.into(),
662 |db_provider| db_provider.header_by_number(num),
663 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
664 )
665 }
666
667 fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
668 if let Some(num) = self.block_number(*hash)? {
669 self.header_td_by_number(num)
670 } else {
671 Ok(None)
672 }
673 }
674
675 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
676 let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
677 {
678 if let Some(last_finalized_num_hash) =
685 self.canonical_in_memory_state.get_finalized_num_hash()
686 {
687 last_finalized_num_hash.number
688 } else {
689 self.last_block_number()?
690 }
691 } else {
692 number
694 };
695 self.storage_provider.header_td_by_number(number)
696 }
697
698 fn headers_range(
699 &self,
700 range: impl RangeBounds<BlockNumber>,
701 ) -> ProviderResult<Vec<Self::Header>> {
702 self.get_in_memory_or_storage_by_block_range_while(
703 range,
704 |db_provider, range, _| db_provider.headers_range(range),
705 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
706 |_| true,
707 )
708 }
709
710 fn sealed_header(
711 &self,
712 number: BlockNumber,
713 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
714 self.get_in_memory_or_storage_by_block(
715 number.into(),
716 |db_provider| db_provider.sealed_header(number),
717 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
718 )
719 }
720
721 fn sealed_headers_range(
722 &self,
723 range: impl RangeBounds<BlockNumber>,
724 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
725 self.get_in_memory_or_storage_by_block_range_while(
726 range,
727 |db_provider, range, _| db_provider.sealed_headers_range(range),
728 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
729 |_| true,
730 )
731 }
732
733 fn sealed_headers_while(
734 &self,
735 range: impl RangeBounds<BlockNumber>,
736 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
737 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
738 self.get_in_memory_or_storage_by_block_range_while(
739 range,
740 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
741 |block_state, predicate| {
742 let header = block_state.block_ref().recovered_block().sealed_header();
743 predicate(header).then(|| header.clone())
744 },
745 predicate,
746 )
747 }
748}
749
750impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
751 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
752 self.get_in_memory_or_storage_by_block(
753 number.into(),
754 |db_provider| db_provider.block_hash(number),
755 |block_state| Ok(Some(block_state.hash())),
756 )
757 }
758
759 fn canonical_hashes_range(
760 &self,
761 start: BlockNumber,
762 end: BlockNumber,
763 ) -> ProviderResult<Vec<B256>> {
764 self.get_in_memory_or_storage_by_block_range_while(
765 start..end,
766 |db_provider, inclusive_range, _| {
767 db_provider
768 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
769 },
770 |block_state, _| Some(block_state.hash()),
771 |_| true,
772 )
773 }
774}
775
776impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
777 fn chain_info(&self) -> ProviderResult<ChainInfo> {
778 let best_number = self.best_block_number()?;
779 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
780 }
781
782 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
783 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
784 }
785
786 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
787 self.storage_provider.last_block_number()
788 }
789
790 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
791 self.get_in_memory_or_storage_by_block(
792 hash.into(),
793 |db_provider| db_provider.block_number(hash),
794 |block_state| Ok(Some(block_state.number())),
795 )
796 }
797}
798
799impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
800 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
801 Ok(self.canonical_in_memory_state.pending_block_num_hash())
802 }
803
804 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
805 Ok(self.canonical_in_memory_state.get_safe_num_hash())
806 }
807
808 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
809 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
810 }
811}
812
813impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
814 type Block = BlockTy<N>;
815
816 fn find_block_by_hash(
817 &self,
818 hash: B256,
819 source: BlockSource,
820 ) -> ProviderResult<Option<Self::Block>> {
821 if matches!(source, BlockSource::Canonical | BlockSource::Any) {
822 if let Some(block) = self.get_in_memory_or_storage_by_block(
823 hash.into(),
824 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
825 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
826 )? {
827 return Ok(Some(block))
828 }
829 }
830
831 if matches!(source, BlockSource::Pending | BlockSource::Any) {
832 return Ok(self
833 .canonical_in_memory_state
834 .pending_block()
835 .filter(|b| b.hash() == hash)
836 .map(|b| b.into_block()))
837 }
838
839 Ok(None)
840 }
841
842 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
843 self.get_in_memory_or_storage_by_block(
844 id,
845 |db_provider| db_provider.block(id),
846 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
847 )
848 }
849
850 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
851 Ok(self.canonical_in_memory_state.pending_recovered_block())
852 }
853
854 fn pending_block_and_receipts(
855 &self,
856 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
857 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
858 }
859
860 fn recovered_block(
867 &self,
868 id: BlockHashOrNumber,
869 transaction_kind: TransactionVariant,
870 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
871 self.get_in_memory_or_storage_by_block(
872 id,
873 |db_provider| db_provider.recovered_block(id, transaction_kind),
874 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
875 )
876 }
877
878 fn sealed_block_with_senders(
879 &self,
880 id: BlockHashOrNumber,
881 transaction_kind: TransactionVariant,
882 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
883 self.get_in_memory_or_storage_by_block(
884 id,
885 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
886 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
887 )
888 }
889
890 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
891 self.get_in_memory_or_storage_by_block_range_while(
892 range,
893 |db_provider, range, _| db_provider.block_range(range),
894 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
895 |_| true,
896 )
897 }
898
899 fn block_with_senders_range(
900 &self,
901 range: RangeInclusive<BlockNumber>,
902 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
903 self.get_in_memory_or_storage_by_block_range_while(
904 range,
905 |db_provider, range, _| db_provider.block_with_senders_range(range),
906 |block_state, _| Some(block_state.block().recovered_block().clone()),
907 |_| true,
908 )
909 }
910
911 fn recovered_block_range(
912 &self,
913 range: RangeInclusive<BlockNumber>,
914 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
915 self.get_in_memory_or_storage_by_block_range_while(
916 range,
917 |db_provider, range, _| db_provider.recovered_block_range(range),
918 |block_state, _| Some(block_state.block().recovered_block().clone()),
919 |_| true,
920 )
921 }
922}
923
924impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
925 type Transaction = TxTy<N>;
926
927 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
928 self.get_in_memory_or_storage_by_tx(
929 tx_hash.into(),
930 |db_provider| db_provider.transaction_id(tx_hash),
931 |_, tx_number, _| Ok(Some(tx_number)),
932 )
933 }
934
935 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
936 self.get_in_memory_or_storage_by_tx(
937 id.into(),
938 |provider| provider.transaction_by_id(id),
939 |tx_index, _, block_state| {
940 Ok(block_state
941 .block_ref()
942 .recovered_block()
943 .body()
944 .transactions()
945 .get(tx_index)
946 .cloned())
947 },
948 )
949 }
950
951 fn transaction_by_id_unhashed(
952 &self,
953 id: TxNumber,
954 ) -> ProviderResult<Option<Self::Transaction>> {
955 self.get_in_memory_or_storage_by_tx(
956 id.into(),
957 |provider| provider.transaction_by_id_unhashed(id),
958 |tx_index, _, block_state| {
959 Ok(block_state
960 .block_ref()
961 .recovered_block()
962 .body()
963 .transactions()
964 .get(tx_index)
965 .cloned())
966 },
967 )
968 }
969
970 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
971 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
972 return Ok(Some(tx))
973 }
974
975 self.storage_provider.transaction_by_hash(hash)
976 }
977
978 fn transaction_by_hash_with_meta(
979 &self,
980 tx_hash: TxHash,
981 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
982 if let Some((tx, meta)) =
983 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
984 {
985 return Ok(Some((tx, meta)))
986 }
987
988 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
989 }
990
991 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
992 self.get_in_memory_or_storage_by_tx(
993 id.into(),
994 |provider| provider.transaction_block(id),
995 |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
996 )
997 }
998
999 fn transactions_by_block(
1000 &self,
1001 id: BlockHashOrNumber,
1002 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1003 self.get_in_memory_or_storage_by_block(
1004 id,
1005 |provider| provider.transactions_by_block(id),
1006 |block_state| {
1007 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
1008 },
1009 )
1010 }
1011
1012 fn transactions_by_block_range(
1013 &self,
1014 range: impl RangeBounds<BlockNumber>,
1015 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1016 self.get_in_memory_or_storage_by_block_range_while(
1017 range,
1018 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1019 |block_state, _| {
1020 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1021 },
1022 |_| true,
1023 )
1024 }
1025
1026 fn transactions_by_tx_range(
1027 &self,
1028 range: impl RangeBounds<TxNumber>,
1029 ) -> ProviderResult<Vec<Self::Transaction>> {
1030 self.get_in_memory_or_storage_by_tx_range(
1031 range,
1032 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1033 |index_range, block_state| {
1034 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1035 .to_vec())
1036 },
1037 )
1038 }
1039
1040 fn senders_by_tx_range(
1041 &self,
1042 range: impl RangeBounds<TxNumber>,
1043 ) -> ProviderResult<Vec<Address>> {
1044 self.get_in_memory_or_storage_by_tx_range(
1045 range,
1046 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1047 |index_range, block_state| {
1048 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1049 },
1050 )
1051 }
1052
1053 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1054 self.get_in_memory_or_storage_by_tx(
1055 id.into(),
1056 |provider| provider.transaction_sender(id),
1057 |tx_index, _, block_state| {
1058 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1059 },
1060 )
1061 }
1062}
1063
1064impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1065 type Receipt = ReceiptTy<N>;
1066
1067 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1068 self.get_in_memory_or_storage_by_tx(
1069 id.into(),
1070 |provider| provider.receipt(id),
1071 |tx_index, _, block_state| {
1072 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1073 },
1074 )
1075 }
1076
1077 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1078 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1079 let executed_block = block_state.block_ref();
1080 let block = executed_block.recovered_block();
1081 let receipts = block_state.executed_block_receipts();
1082
1083 debug_assert_eq!(
1085 block.body().transactions().len(),
1086 receipts.len(),
1087 "Mismatch between transaction and receipt count"
1088 );
1089
1090 if let Some(tx_index) =
1091 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1092 {
1093 return Ok(receipts.get(tx_index).cloned());
1095 }
1096 }
1097
1098 self.storage_provider.receipt_by_hash(hash)
1099 }
1100
1101 fn receipts_by_block(
1102 &self,
1103 block: BlockHashOrNumber,
1104 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1105 self.get_in_memory_or_storage_by_block(
1106 block,
1107 |db_provider| db_provider.receipts_by_block(block),
1108 |block_state| Ok(Some(block_state.executed_block_receipts())),
1109 )
1110 }
1111
1112 fn receipts_by_tx_range(
1113 &self,
1114 range: impl RangeBounds<TxNumber>,
1115 ) -> ProviderResult<Vec<Self::Receipt>> {
1116 self.get_in_memory_or_storage_by_tx_range(
1117 range,
1118 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1119 |index_range, block_state| {
1120 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1121 },
1122 )
1123 }
1124
1125 fn receipts_by_block_range(
1126 &self,
1127 block_range: RangeInclusive<BlockNumber>,
1128 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1129 self.storage_provider.receipts_by_block_range(block_range)
1130 }
1131}
1132
1133impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1134 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1135 match block {
1136 BlockId::Hash(rpc_block_hash) => {
1137 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1138 if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1139 if let Some(state) = self
1140 .head_block
1141 .as_ref()
1142 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1143 {
1144 receipts = Some(state.executed_block_receipts());
1145 }
1146 }
1147 Ok(receipts)
1148 }
1149 BlockId::Number(num_tag) => match num_tag {
1150 BlockNumberOrTag::Pending => Ok(self
1151 .canonical_in_memory_state
1152 .pending_state()
1153 .map(|block_state| block_state.executed_block_receipts())),
1154 _ => {
1155 if let Some(num) = self.convert_block_number(num_tag)? {
1156 self.receipts_by_block(num.into())
1157 } else {
1158 Ok(None)
1159 }
1160 }
1161 },
1162 }
1163 }
1164}
1165
1166impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1167 fn block_body_indices(
1168 &self,
1169 number: BlockNumber,
1170 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1171 self.get_in_memory_or_storage_by_block(
1172 number.into(),
1173 |db_provider| db_provider.block_body_indices(number),
1174 |block_state| {
1175 let last_storage_block_number = block_state.anchor().number;
1177 let mut stored_indices = self
1178 .storage_provider
1179 .block_body_indices(last_storage_block_number)?
1180 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1181
1182 stored_indices.first_tx_num = stored_indices.next_tx_num();
1184 stored_indices.tx_count = 0;
1185
1186 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1188 let block_tx_count =
1189 state.block_ref().recovered_block().body().transactions().len() as u64;
1190 if state.block_ref().recovered_block().number() == number {
1191 stored_indices.tx_count = block_tx_count;
1192 } else {
1193 stored_indices.first_tx_num += block_tx_count;
1194 }
1195 }
1196
1197 Ok(Some(stored_indices))
1198 },
1199 )
1200 }
1201
1202 fn block_body_indices_range(
1203 &self,
1204 range: RangeInclusive<BlockNumber>,
1205 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1206 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1207 }
1208}
1209
1210impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1211 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1212 self.storage_provider.get_stage_checkpoint(id)
1213 }
1214
1215 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1216 self.storage_provider.get_stage_checkpoint_progress(id)
1217 }
1218
1219 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1220 self.storage_provider.get_all_checkpoints()
1221 }
1222}
1223
1224impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1225 fn get_prune_checkpoint(
1226 &self,
1227 segment: PruneSegment,
1228 ) -> ProviderResult<Option<PruneCheckpoint>> {
1229 self.storage_provider.get_prune_checkpoint(segment)
1230 }
1231
1232 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1233 self.storage_provider.get_prune_checkpoints()
1234 }
1235}
1236
1237impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1238 type ChainSpec = N::ChainSpec;
1239
1240 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1241 ChainSpecProvider::chain_spec(&self.storage_provider)
1242 }
1243}
1244
1245impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1246 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1247 match id {
1248 BlockId::Number(num) => self.block_by_number_or_tag(num),
1249 BlockId::Hash(hash) => {
1250 if Some(true) == hash.require_canonical {
1255 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1257 } else {
1258 self.block_by_hash(hash.block_hash)
1259 }
1260 }
1261 }
1262 }
1263
1264 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1265 Ok(match id {
1266 BlockNumberOrTag::Latest => {
1267 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1268 }
1269 BlockNumberOrTag::Finalized => {
1270 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1271 }
1272 BlockNumberOrTag::Safe => {
1273 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1274 }
1275 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1276 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1277
1278 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1279 })
1280 }
1281
1282 fn sealed_header_by_number_or_tag(
1283 &self,
1284 id: BlockNumberOrTag,
1285 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1286 match id {
1287 BlockNumberOrTag::Latest => {
1288 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1289 }
1290 BlockNumberOrTag::Finalized => {
1291 Ok(self.canonical_in_memory_state.get_finalized_header())
1292 }
1293 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1294 BlockNumberOrTag::Earliest => self
1295 .header_by_number(self.earliest_block_number()?)?
1296 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1297 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1298 BlockNumberOrTag::Number(num) => self
1299 .header_by_number(num)?
1300 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1301 }
1302 }
1303
1304 fn sealed_header_by_id(
1305 &self,
1306 id: BlockId,
1307 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1308 Ok(match id {
1309 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1310 BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1311 })
1312 }
1313
1314 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1315 Ok(match id {
1316 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1317 BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1318 })
1319 }
1320}
1321
1322impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1323 fn storage_changeset(
1324 &self,
1325 block_number: BlockNumber,
1326 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1327 if let Some(state) =
1328 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1329 {
1330 let changesets = state
1331 .block()
1332 .execution_output
1333 .bundle
1334 .reverts
1335 .clone()
1336 .to_plain_state_reverts()
1337 .storage
1338 .into_iter()
1339 .flatten()
1340 .flat_map(|revert: PlainStorageRevert| {
1341 revert.storage_revert.into_iter().map(move |(key, value)| {
1342 (
1343 BlockNumberAddress((block_number, revert.address)),
1344 StorageEntry { key: key.into(), value: value.to_previous_value() },
1345 )
1346 })
1347 })
1348 .collect();
1349 Ok(changesets)
1350 } else {
1351 let storage_history_exists = self
1355 .storage_provider
1356 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1357 .and_then(|checkpoint| {
1358 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1363 })
1364 .unwrap_or(true);
1365
1366 if !storage_history_exists {
1367 return Err(ProviderError::StateAtBlockPruned(block_number))
1368 }
1369
1370 self.storage_provider.storage_changeset(block_number)
1371 }
1372 }
1373}
1374
1375impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1376 fn account_block_changeset(
1377 &self,
1378 block_number: BlockNumber,
1379 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1380 if let Some(state) =
1381 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1382 {
1383 let changesets = state
1384 .block_ref()
1385 .execution_output
1386 .bundle
1387 .reverts
1388 .clone()
1389 .to_plain_state_reverts()
1390 .accounts
1391 .into_iter()
1392 .flatten()
1393 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1394 .collect();
1395 Ok(changesets)
1396 } else {
1397 let account_history_exists = self
1401 .storage_provider
1402 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1403 .and_then(|checkpoint| {
1404 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1409 })
1410 .unwrap_or(true);
1411
1412 if !account_history_exists {
1413 return Err(ProviderError::StateAtBlockPruned(block_number))
1414 }
1415
1416 self.storage_provider.account_block_changeset(block_number)
1417 }
1418 }
1419}
1420
1421impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1422 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1424 let state_provider = self.latest_ref()?;
1426 state_provider.basic_account(address)
1427 }
1428}
1429
1430impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1431 type Receipt = ReceiptTy<N>;
1432
1433 fn get_state(
1443 &self,
1444 block: BlockNumber,
1445 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1446 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1447 let state = state.block_ref().execution_outcome().clone();
1448 Ok(Some(state))
1449 } else {
1450 Self::get_state(self, block..=block)
1451 }
1452 }
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457 use crate::{
1458 providers::blockchain_provider::BlockchainProvider,
1459 test_utils::create_test_provider_factory, BlockWriter,
1460 };
1461 use alloy_eips::BlockHashOrNumber;
1462 use alloy_primitives::B256;
1463 use itertools::Itertools;
1464 use rand::Rng;
1465 use reth_chain_state::{
1466 ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates, NewCanonicalChain,
1467 };
1468 use reth_db_api::models::AccountBeforeTx;
1469 use reth_ethereum_primitives::Block;
1470 use reth_execution_types::ExecutionOutcome;
1471 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1472 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1473 use reth_testing_utils::generators::{
1474 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1475 };
1476 use revm_database::BundleState;
1477 use std::{
1478 ops::{Bound, Range, RangeBounds},
1479 sync::Arc,
1480 };
1481
1482 const TEST_BLOCKS_COUNT: usize = 5;
1483
1484 fn random_blocks(
1485 rng: &mut impl Rng,
1486 database_blocks: usize,
1487 in_memory_blocks: usize,
1488 requests_count: Option<Range<u8>>,
1489 withdrawals_count: Option<Range<u8>>,
1490 tx_count: impl RangeBounds<u8>,
1491 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1492 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1493
1494 let tx_start = match tx_count.start_bound() {
1495 Bound::Included(&n) | Bound::Excluded(&n) => n,
1496 Bound::Unbounded => u8::MIN,
1497 };
1498 let tx_end = match tx_count.end_bound() {
1499 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1500 Bound::Unbounded => u8::MAX,
1501 };
1502
1503 let blocks = random_block_range(
1504 rng,
1505 0..=block_range,
1506 BlockRangeParams {
1507 parent: Some(B256::ZERO),
1508 tx_count: tx_start..tx_end,
1509 requests_count,
1510 withdrawals_count,
1511 },
1512 );
1513 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1514 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1515 }
1516
1517 #[test]
1518 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1519 let mut rng = generators::rng();
1521 let factory = create_test_provider_factory();
1522
1523 let blocks = random_block_range(
1525 &mut rng,
1526 0..=10,
1527 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1528 );
1529 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1530
1531 let provider_rw = factory.provider_rw()?;
1533 for block in database_blocks {
1534 provider_rw.insert_historical_block(
1535 block.clone().try_recover().expect("failed to seal block with senders"),
1536 )?;
1537 }
1538 provider_rw.commit()?;
1539
1540 let provider = BlockchainProvider::new(factory)?;
1542 let consistent_provider = provider.consistent_provider()?;
1543
1544 let first_db_block = database_blocks.first().unwrap();
1546 let first_in_mem_block = in_memory_blocks.first().unwrap();
1547 let last_in_mem_block = in_memory_blocks.last().unwrap();
1548
1549 assert_eq!(
1551 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1552 None
1553 );
1554 assert_eq!(
1555 consistent_provider
1556 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1557 None
1558 );
1559 assert_eq!(
1561 consistent_provider
1562 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1563 None
1564 );
1565
1566 let in_memory_block_senders =
1568 first_in_mem_block.senders().expect("failed to recover senders");
1569 let chain = NewCanonicalChain::Commit {
1570 new: vec![ExecutedBlockWithTrieUpdates::new(
1571 Arc::new(RecoveredBlock::new_sealed(
1572 first_in_mem_block.clone(),
1573 in_memory_block_senders,
1574 )),
1575 Default::default(),
1576 Default::default(),
1577 ExecutedTrieUpdates::empty(),
1578 )],
1579 };
1580 consistent_provider.canonical_in_memory_state.update_chain(chain);
1581 let consistent_provider = provider.consistent_provider()?;
1582
1583 assert_eq!(
1585 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1586 Some(first_in_mem_block.clone().into_block())
1587 );
1588 assert_eq!(
1589 consistent_provider
1590 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1591 Some(first_in_mem_block.clone().into_block())
1592 );
1593
1594 assert_eq!(
1596 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1597 Some(first_db_block.clone().into_block())
1598 );
1599 assert_eq!(
1600 consistent_provider
1601 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1602 Some(first_db_block.clone().into_block())
1603 );
1604
1605 assert_eq!(
1607 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1608 None
1609 );
1610
1611 provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1613 block: ExecutedBlock {
1614 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1615 last_in_mem_block.clone(),
1616 Default::default(),
1617 )),
1618 execution_output: Default::default(),
1619 hashed_state: Default::default(),
1620 },
1621 trie: ExecutedTrieUpdates::empty(),
1622 });
1623
1624 assert_eq!(
1626 consistent_provider
1627 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1628 Some(last_in_mem_block.clone_block())
1629 );
1630
1631 Ok(())
1632 }
1633
1634 #[test]
1635 fn test_block_reader_block() -> eyre::Result<()> {
1636 let mut rng = generators::rng();
1638 let factory = create_test_provider_factory();
1639
1640 let blocks = random_block_range(
1642 &mut rng,
1643 0..=10,
1644 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1645 );
1646 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1647
1648 let provider_rw = factory.provider_rw()?;
1650 for block in database_blocks {
1651 provider_rw.insert_historical_block(
1652 block.clone().try_recover().expect("failed to seal block with senders"),
1653 )?;
1654 }
1655 provider_rw.commit()?;
1656
1657 let provider = BlockchainProvider::new(factory)?;
1659 let consistent_provider = provider.consistent_provider()?;
1660
1661 let first_in_mem_block = in_memory_blocks.first().unwrap();
1663 let first_db_block = database_blocks.first().unwrap();
1665
1666 assert_eq!(
1668 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1669 None
1670 );
1671 assert_eq!(
1672 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1673 None
1674 );
1675
1676 let in_memory_block_senders =
1678 first_in_mem_block.senders().expect("failed to recover senders");
1679 let chain = NewCanonicalChain::Commit {
1680 new: vec![ExecutedBlockWithTrieUpdates::new(
1681 Arc::new(RecoveredBlock::new_sealed(
1682 first_in_mem_block.clone(),
1683 in_memory_block_senders,
1684 )),
1685 Default::default(),
1686 Default::default(),
1687 ExecutedTrieUpdates::empty(),
1688 )],
1689 };
1690 consistent_provider.canonical_in_memory_state.update_chain(chain);
1691
1692 let consistent_provider = provider.consistent_provider()?;
1693
1694 assert_eq!(
1696 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1697 Some(first_in_mem_block.clone().into_block())
1698 );
1699 assert_eq!(
1700 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1701 Some(first_in_mem_block.clone().into_block())
1702 );
1703
1704 assert_eq!(
1706 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1707 Some(first_db_block.clone().into_block())
1708 );
1709 assert_eq!(
1710 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1711 Some(first_db_block.clone().into_block())
1712 );
1713
1714 Ok(())
1715 }
1716
1717 #[test]
1718 fn test_changeset_reader() -> eyre::Result<()> {
1719 let mut rng = generators::rng();
1720
1721 let (database_blocks, in_memory_blocks) =
1722 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1723
1724 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1725 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1726 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1727
1728 let accounts = random_eoa_accounts(&mut rng, 2);
1729
1730 let (database_changesets, database_state) = random_changeset_range(
1731 &mut rng,
1732 &database_blocks,
1733 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1734 0..0,
1735 0..0,
1736 );
1737 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1738 &mut rng,
1739 &in_memory_blocks,
1740 database_state
1741 .iter()
1742 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1743 0..0,
1744 0..0,
1745 );
1746
1747 let factory = create_test_provider_factory();
1748
1749 let provider_rw = factory.provider_rw()?;
1750 provider_rw.append_blocks_with_state(
1751 database_blocks
1752 .into_iter()
1753 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1754 .collect(),
1755 &ExecutionOutcome {
1756 bundle: BundleState::new(
1757 database_state.into_iter().map(|(address, (account, _))| {
1758 (address, None, Some(account.into()), Default::default())
1759 }),
1760 database_changesets
1761 .iter()
1762 .map(|block_changesets| {
1763 block_changesets.iter().map(|(address, account, _)| {
1764 (*address, Some(Some((*account).into())), [])
1765 })
1766 })
1767 .collect::<Vec<_>>(),
1768 Vec::new(),
1769 ),
1770 first_block: first_database_block,
1771 ..Default::default()
1772 },
1773 Default::default(),
1774 Default::default(),
1775 )?;
1776 provider_rw.commit()?;
1777
1778 let provider = BlockchainProvider::new(factory)?;
1779
1780 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1781 let chain = NewCanonicalChain::Commit {
1782 new: vec![in_memory_blocks
1783 .first()
1784 .map(|block| {
1785 let senders = block.senders().expect("failed to recover senders");
1786 ExecutedBlockWithTrieUpdates::new(
1787 Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1788 Arc::new(ExecutionOutcome {
1789 bundle: BundleState::new(
1790 in_memory_state.into_iter().map(|(address, (account, _))| {
1791 (address, None, Some(account.into()), Default::default())
1792 }),
1793 [in_memory_changesets.iter().map(|(address, account, _)| {
1794 (*address, Some(Some((*account).into())), Vec::new())
1795 })],
1796 [],
1797 ),
1798 first_block: first_in_memory_block,
1799 ..Default::default()
1800 }),
1801 Default::default(),
1802 ExecutedTrieUpdates::empty(),
1803 )
1804 })
1805 .unwrap()],
1806 };
1807 provider.canonical_in_memory_state.update_chain(chain);
1808
1809 let consistent_provider = provider.consistent_provider()?;
1810
1811 assert_eq!(
1812 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1813 database_changesets
1814 .into_iter()
1815 .next_back()
1816 .unwrap()
1817 .into_iter()
1818 .sorted_by_key(|(address, _, _)| *address)
1819 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1820 .collect::<Vec<_>>()
1821 );
1822 assert_eq!(
1823 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1824 in_memory_changesets
1825 .into_iter()
1826 .sorted_by_key(|(address, _, _)| *address)
1827 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1828 .collect::<Vec<_>>()
1829 );
1830
1831 Ok(())
1832 }
1833}