1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12 HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
24use reth_prune_types::{PruneCheckpoint, PruneSegment};
25use reth_stages_types::{StageCheckpoint, StageId};
26use reth_storage_api::{
27 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
28 StorageChangeSetReader, TryIntoHistoricalStateProvider,
29};
30use reth_storage_errors::provider::ProviderResult;
31use revm_database::states::PlainStorageRevert;
32use std::{
33 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
34 sync::Arc,
35};
36use tracing::trace;
37
38#[derive(Debug)]
45#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
47 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
49 head_block: Option<Arc<BlockState<N::Primitives>>>,
51 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
53}
54
55impl<N: ProviderNodeTypes> ConsistentProvider<N> {
56 pub fn new(
62 storage_provider_factory: ProviderFactory<N>,
63 state: CanonicalInMemoryState<N::Primitives>,
64 ) -> ProviderResult<Self> {
65 let head_block = state.head_state();
73 let storage_provider = storage_provider_factory.database_provider_ro()?;
74 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
75 }
76
77 fn convert_range_bounds<T>(
79 &self,
80 range: impl RangeBounds<T>,
81 end_unbounded: impl FnOnce() -> T,
82 ) -> (T, T)
83 where
84 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
85 {
86 let start = match range.start_bound() {
87 Bound::Included(&n) => n,
88 Bound::Excluded(&n) => n + T::from(1u8),
89 Bound::Unbounded => T::from(0u8),
90 };
91
92 let end = match range.end_bound() {
93 Bound::Included(&n) => n,
94 Bound::Excluded(&n) => n - T::from(1u8),
95 Bound::Unbounded => end_unbounded(),
96 };
97
98 (start, end)
99 }
100
101 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
103 trace!(target: "providers::blockchain", "Getting latest block state provider");
104
105 if let Some(state) = &self.head_block {
107 trace!(target: "providers::blockchain", "Using head state for latest state provider");
108 Ok(self.block_state_provider_ref(state)?.boxed())
109 } else {
110 trace!(target: "providers::blockchain", "Using database state for latest state provider");
111 Ok(self.storage_provider.latest())
112 }
113 }
114
115 fn history_by_block_hash_ref<'a>(
116 &'a self,
117 block_hash: BlockHash,
118 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
119 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
120
121 self.get_in_memory_or_storage_by_block(
122 block_hash.into(),
123 |_| self.storage_provider.history_by_block_hash(block_hash),
124 |block_state| {
125 let state_provider = self.block_state_provider_ref(block_state)?;
126 Ok(Box::new(state_provider))
127 },
128 )
129 }
130
131 fn state_by_block_number_ref<'a>(
133 &'a self,
134 number: BlockNumber,
135 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
136 let hash =
137 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
138 self.history_by_block_hash_ref(hash)
139 }
140
141 pub fn get_state(
145 &self,
146 range: RangeInclusive<BlockNumber>,
147 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
148 if range.is_empty() {
149 return Ok(None)
150 }
151 let start_block_number = *range.start();
152 let end_block_number = *range.end();
153
154 let mut block_bodies = Vec::new();
156 for block_num in range.clone() {
157 let block_body = self
158 .block_body_indices(block_num)?
159 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
160 block_bodies.push((block_num, block_body))
161 }
162
163 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
165 else {
166 return Ok(None)
167 };
168 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
169 return Ok(None)
170 };
171
172 let mut account_changeset = Vec::new();
173 for block_num in range.clone() {
174 let changeset =
175 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
176 account_changeset.extend(changeset);
177 }
178
179 let mut storage_changeset = Vec::new();
180 for block_num in range {
181 let changeset = self.storage_changeset(block_num)?;
182 storage_changeset.extend(changeset);
183 }
184
185 let (state, reverts) =
186 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
187
188 let mut receipt_iter =
189 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
190
191 let mut receipts = Vec::with_capacity(block_bodies.len());
192 for (_, block_body) in block_bodies {
194 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
195 for tx_num in block_body.tx_num_range() {
196 let receipt = receipt_iter
197 .next()
198 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
199 block_receipts.push(receipt);
200 }
201 receipts.push(block_receipts);
202 }
203
204 Ok(Some(ExecutionOutcome::new_init(
205 state,
206 reverts,
207 Vec::new(),
209 receipts,
210 start_block_number,
211 Vec::new(),
212 )))
213 }
214
215 fn populate_bundle_state(
219 &self,
220 account_changeset: Vec<(u64, AccountBeforeTx)>,
221 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
222 block_range_end: BlockNumber,
223 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
224 let mut state: BundleStateInit = HashMap::default();
225 let mut reverts: RevertsInit = HashMap::default();
226 let state_provider = self.state_by_block_number_ref(block_range_end)?;
227
228 for (block_number, account_before) in account_changeset.into_iter().rev() {
230 let AccountBeforeTx { info: old_info, address } = account_before;
231 match state.entry(address) {
232 hash_map::Entry::Vacant(entry) => {
233 let new_info = state_provider.basic_account(&address)?;
234 entry.insert((old_info, new_info, HashMap::default()));
235 }
236 hash_map::Entry::Occupied(mut entry) => {
237 entry.get_mut().0 = old_info;
239 }
240 }
241 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
243 }
244
245 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
247 let BlockNumberAddress((block_number, address)) = block_and_address;
248 let account_state = match state.entry(address) {
250 hash_map::Entry::Vacant(entry) => {
251 let present_info = state_provider.basic_account(&address)?;
252 entry.insert((present_info, present_info, HashMap::default()))
253 }
254 hash_map::Entry::Occupied(entry) => entry.into_mut(),
255 };
256
257 match account_state.2.entry(old_storage.key) {
259 hash_map::Entry::Vacant(entry) => {
260 let new_storage_value =
261 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
262 entry.insert((old_storage.value, new_storage_value));
263 }
264 hash_map::Entry::Occupied(mut entry) => {
265 entry.get_mut().0 = old_storage.value;
266 }
267 };
268
269 reverts
270 .entry(block_number)
271 .or_default()
272 .entry(address)
273 .or_default()
274 .1
275 .push(old_storage);
276 }
277
278 Ok((state, reverts))
279 }
280
281 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
293 &self,
294 range: impl RangeBounds<BlockNumber>,
295 fetch_db_range: F,
296 map_block_state_item: G,
297 mut predicate: P,
298 ) -> ProviderResult<Vec<T>>
299 where
300 F: FnOnce(
301 &DatabaseProviderRO<N::DB, N>,
302 RangeInclusive<BlockNumber>,
303 &mut P,
304 ) -> ProviderResult<Vec<T>>,
305 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
306 P: FnMut(&T) -> bool,
307 {
308 let mut in_memory_chain =
316 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
317 let db_provider = &self.storage_provider;
318
319 let (start, end) = self.convert_range_bounds(range, || {
320 in_memory_chain
322 .first()
323 .map(|b| b.number())
324 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
325 });
326
327 if start > end {
328 return Ok(vec![])
329 }
330
331 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
336 Some(lowest_memory_block) if lowest_memory_block <= end => {
337 let highest_memory_block =
338 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
339
340 let in_memory_range =
344 lowest_memory_block.max(start)..=end.min(highest_memory_block);
345
346 in_memory_chain.truncate(
349 in_memory_chain
350 .len()
351 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
352 );
353
354 let storage_range =
355 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
356
357 (Some((in_memory_chain, in_memory_range)), storage_range)
358 }
359 _ => {
360 drop(in_memory_chain);
362
363 (None, Some(start..=end))
364 }
365 };
366
367 let mut items = Vec::with_capacity((end - start + 1) as usize);
368
369 if let Some(storage_range) = storage_range {
370 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
371 items.append(&mut db_items);
372
373 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
376 return Ok(items)
377 }
378 }
379
380 if let Some((in_memory_chain, in_memory_range)) = in_memory {
381 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
382 debug_assert!(num == block.number());
383 if let Some(item) = map_block_state_item(block, &mut predicate) {
384 items.push(item);
385 } else {
386 break
387 }
388 }
389 }
390
391 Ok(items)
392 }
393
394 fn block_state_provider_ref(
396 &self,
397 state: &BlockState<N::Primitives>,
398 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
399 let anchor_hash = state.anchor().hash;
400 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
401 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
402 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
403 }
404
405 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
411 &self,
412 range: impl RangeBounds<BlockNumber>,
413 fetch_from_db: S,
414 fetch_from_block_state: M,
415 ) -> ProviderResult<Vec<R>>
416 where
417 S: FnOnce(
418 &DatabaseProviderRO<N::DB, N>,
419 RangeInclusive<TxNumber>,
420 ) -> ProviderResult<Vec<R>>,
421 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
422 {
423 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
424 let provider = &self.storage_provider;
425
426 let last_database_block_number = in_mem_chain
429 .last()
430 .map(|b| Ok(b.anchor().number))
431 .unwrap_or_else(|| provider.last_block_number())?;
432
433 let last_block_body_index = provider
436 .block_body_indices(last_database_block_number)?
437 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
438 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
439
440 let (start, end) = self.convert_range_bounds(range, || {
441 in_mem_chain
442 .iter()
443 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
444 .sum::<u64>() +
445 last_block_body_index.last_tx_num()
446 });
447
448 if start > end {
449 return Ok(vec![])
450 }
451
452 let mut tx_range = start..=end;
453
454 if *tx_range.end() < in_memory_tx_num {
457 return fetch_from_db(provider, tx_range);
458 }
459
460 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
461
462 if *tx_range.start() < in_memory_tx_num {
464 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
466
467 tx_range = in_memory_tx_num..=*tx_range.end();
469
470 items.extend(fetch_from_db(provider, db_range)?);
471 }
472
473 for block_state in in_mem_chain.iter().rev() {
475 let block_tx_count =
476 block_state.block_ref().recovered_block().body().transactions().len();
477 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
478
479 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
482 in_memory_tx_num += block_tx_count as u64;
483 continue
484 }
485
486 let skip = (tx_range.start() - in_memory_tx_num) as usize;
488
489 items.extend(fetch_from_block_state(
490 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
491 block_state,
492 )?);
493
494 in_memory_tx_num += block_tx_count as u64;
495
496 if in_memory_tx_num > *tx_range.end() {
498 break
499 }
500
501 tx_range = in_memory_tx_num..=*tx_range.end();
503 }
504
505 Ok(items)
506 }
507
508 fn get_in_memory_or_storage_by_tx<S, M, R>(
511 &self,
512 id: HashOrNumber,
513 fetch_from_db: S,
514 fetch_from_block_state: M,
515 ) -> ProviderResult<Option<R>>
516 where
517 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
518 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
519 {
520 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
521 let provider = &self.storage_provider;
522
523 let last_database_block_number = in_mem_chain
526 .last()
527 .map(|b| Ok(b.anchor().number))
528 .unwrap_or_else(|| provider.last_block_number())?;
529
530 let last_block_body_index = provider
533 .block_body_indices(last_database_block_number)?
534 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
535 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
536
537 if let HashOrNumber::Number(id) = id {
540 if id < in_memory_tx_num {
541 return fetch_from_db(provider)
542 }
543 }
544
545 for block_state in in_mem_chain.iter().rev() {
547 let executed_block = block_state.block_ref();
548 let block = executed_block.recovered_block();
549
550 for tx_index in 0..block.body().transactions().len() {
551 match id {
552 HashOrNumber::Hash(tx_hash) => {
553 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
554 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
555 }
556 }
557 HashOrNumber::Number(id) => {
558 if id == in_memory_tx_num {
559 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560 }
561 }
562 }
563
564 in_memory_tx_num += 1;
565 }
566 }
567
568 if let HashOrNumber::Hash(_) = id {
570 return fetch_from_db(provider)
571 }
572
573 Ok(None)
574 }
575
576 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
578 &self,
579 id: BlockHashOrNumber,
580 fetch_from_db: S,
581 fetch_from_block_state: M,
582 ) -> ProviderResult<R>
583 where
584 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
585 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
586 {
587 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
588 return fetch_from_block_state(block_state)
589 }
590 fetch_from_db(&self.storage_provider)
591 }
592
593 pub(crate) fn into_state_provider_at_block_hash(
595 self,
596 block_hash: BlockHash,
597 ) -> ProviderResult<Box<dyn StateProvider>> {
598 let Self { storage_provider, head_block, .. } = self;
599 let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
600 let block_number = storage_provider
601 .block_number(block_hash)?
602 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
603 storage_provider.try_into_history_at_block(block_number)
604 };
605 if let Some(Some(block_state)) =
606 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
607 {
608 let anchor_hash = block_state.anchor().hash;
609 let latest_historical = into_history_at_block_hash(anchor_hash)?;
610 return Ok(Box::new(block_state.state_provider(latest_historical)));
611 }
612 into_history_at_block_hash(block_hash)
613 }
614}
615
616impl<N: ProviderNodeTypes> ConsistentProvider<N> {
617 #[inline]
626 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
627 let latest = self.best_block_number()?;
628 if block_number > latest {
629 Err(ProviderError::HeaderNotFound(block_number.into()))
630 } else {
631 Ok(())
632 }
633 }
634}
635
636impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
637 type Primitives = N::Primitives;
638}
639
640impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
641 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
642 self.storage_provider.static_file_provider()
643 }
644}
645
646impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
647 type Header = HeaderTy<N>;
648
649 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
650 self.get_in_memory_or_storage_by_block(
651 (*block_hash).into(),
652 |db_provider| db_provider.header(block_hash),
653 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
654 )
655 }
656
657 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
658 self.get_in_memory_or_storage_by_block(
659 num.into(),
660 |db_provider| db_provider.header_by_number(num),
661 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
662 )
663 }
664
665 fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
666 if let Some(num) = self.block_number(*hash)? {
667 self.header_td_by_number(num)
668 } else {
669 Ok(None)
670 }
671 }
672
673 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
674 let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
675 {
676 if let Some(last_finalized_num_hash) =
683 self.canonical_in_memory_state.get_finalized_num_hash()
684 {
685 last_finalized_num_hash.number
686 } else {
687 self.last_block_number()?
688 }
689 } else {
690 number
692 };
693 self.storage_provider.header_td_by_number(number)
694 }
695
696 fn headers_range(
697 &self,
698 range: impl RangeBounds<BlockNumber>,
699 ) -> ProviderResult<Vec<Self::Header>> {
700 self.get_in_memory_or_storage_by_block_range_while(
701 range,
702 |db_provider, range, _| db_provider.headers_range(range),
703 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
704 |_| true,
705 )
706 }
707
708 fn sealed_header(
709 &self,
710 number: BlockNumber,
711 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
712 self.get_in_memory_or_storage_by_block(
713 number.into(),
714 |db_provider| db_provider.sealed_header(number),
715 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
716 )
717 }
718
719 fn sealed_headers_range(
720 &self,
721 range: impl RangeBounds<BlockNumber>,
722 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
723 self.get_in_memory_or_storage_by_block_range_while(
724 range,
725 |db_provider, range, _| db_provider.sealed_headers_range(range),
726 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
727 |_| true,
728 )
729 }
730
731 fn sealed_headers_while(
732 &self,
733 range: impl RangeBounds<BlockNumber>,
734 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
735 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
736 self.get_in_memory_or_storage_by_block_range_while(
737 range,
738 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
739 |block_state, predicate| {
740 let header = block_state.block_ref().recovered_block().sealed_header();
741 predicate(header).then(|| header.clone())
742 },
743 predicate,
744 )
745 }
746}
747
748impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
749 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
750 self.get_in_memory_or_storage_by_block(
751 number.into(),
752 |db_provider| db_provider.block_hash(number),
753 |block_state| Ok(Some(block_state.hash())),
754 )
755 }
756
757 fn canonical_hashes_range(
758 &self,
759 start: BlockNumber,
760 end: BlockNumber,
761 ) -> ProviderResult<Vec<B256>> {
762 self.get_in_memory_or_storage_by_block_range_while(
763 start..end,
764 |db_provider, inclusive_range, _| {
765 db_provider
766 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
767 },
768 |block_state, _| Some(block_state.hash()),
769 |_| true,
770 )
771 }
772}
773
774impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
775 fn chain_info(&self) -> ProviderResult<ChainInfo> {
776 let best_number = self.best_block_number()?;
777 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
778 }
779
780 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
781 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
782 }
783
784 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
785 self.storage_provider.last_block_number()
786 }
787
788 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
789 self.get_in_memory_or_storage_by_block(
790 hash.into(),
791 |db_provider| db_provider.block_number(hash),
792 |block_state| Ok(Some(block_state.number())),
793 )
794 }
795}
796
797impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
798 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
799 Ok(self.canonical_in_memory_state.pending_block_num_hash())
800 }
801
802 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
803 Ok(self.canonical_in_memory_state.get_safe_num_hash())
804 }
805
806 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
807 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
808 }
809}
810
811impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
812 type Block = BlockTy<N>;
813
814 fn find_block_by_hash(
815 &self,
816 hash: B256,
817 source: BlockSource,
818 ) -> ProviderResult<Option<Self::Block>> {
819 if matches!(source, BlockSource::Canonical | BlockSource::Any) {
820 if let Some(block) = self.get_in_memory_or_storage_by_block(
821 hash.into(),
822 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
823 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
824 )? {
825 return Ok(Some(block))
826 }
827 }
828
829 if matches!(source, BlockSource::Pending | BlockSource::Any) {
830 return Ok(self
831 .canonical_in_memory_state
832 .pending_block()
833 .filter(|b| b.hash() == hash)
834 .map(|b| b.into_block()))
835 }
836
837 Ok(None)
838 }
839
840 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
841 self.get_in_memory_or_storage_by_block(
842 id,
843 |db_provider| db_provider.block(id),
844 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
845 )
846 }
847
848 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
849 Ok(self.canonical_in_memory_state.pending_recovered_block())
850 }
851
852 fn pending_block_and_receipts(
853 &self,
854 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
855 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
856 }
857
858 fn recovered_block(
865 &self,
866 id: BlockHashOrNumber,
867 transaction_kind: TransactionVariant,
868 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
869 self.get_in_memory_or_storage_by_block(
870 id,
871 |db_provider| db_provider.recovered_block(id, transaction_kind),
872 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
873 )
874 }
875
876 fn sealed_block_with_senders(
877 &self,
878 id: BlockHashOrNumber,
879 transaction_kind: TransactionVariant,
880 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
881 self.get_in_memory_or_storage_by_block(
882 id,
883 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
884 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
885 )
886 }
887
888 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
889 self.get_in_memory_or_storage_by_block_range_while(
890 range,
891 |db_provider, range, _| db_provider.block_range(range),
892 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
893 |_| true,
894 )
895 }
896
897 fn block_with_senders_range(
898 &self,
899 range: RangeInclusive<BlockNumber>,
900 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
901 self.get_in_memory_or_storage_by_block_range_while(
902 range,
903 |db_provider, range, _| db_provider.block_with_senders_range(range),
904 |block_state, _| Some(block_state.block().recovered_block().clone()),
905 |_| true,
906 )
907 }
908
909 fn recovered_block_range(
910 &self,
911 range: RangeInclusive<BlockNumber>,
912 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
913 self.get_in_memory_or_storage_by_block_range_while(
914 range,
915 |db_provider, range, _| db_provider.recovered_block_range(range),
916 |block_state, _| Some(block_state.block().recovered_block().clone()),
917 |_| true,
918 )
919 }
920}
921
922impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
923 type Transaction = TxTy<N>;
924
925 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
926 self.get_in_memory_or_storage_by_tx(
927 tx_hash.into(),
928 |db_provider| db_provider.transaction_id(tx_hash),
929 |_, tx_number, _| Ok(Some(tx_number)),
930 )
931 }
932
933 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
934 self.get_in_memory_or_storage_by_tx(
935 id.into(),
936 |provider| provider.transaction_by_id(id),
937 |tx_index, _, block_state| {
938 Ok(block_state
939 .block_ref()
940 .recovered_block()
941 .body()
942 .transactions()
943 .get(tx_index)
944 .cloned())
945 },
946 )
947 }
948
949 fn transaction_by_id_unhashed(
950 &self,
951 id: TxNumber,
952 ) -> ProviderResult<Option<Self::Transaction>> {
953 self.get_in_memory_or_storage_by_tx(
954 id.into(),
955 |provider| provider.transaction_by_id_unhashed(id),
956 |tx_index, _, block_state| {
957 Ok(block_state
958 .block_ref()
959 .recovered_block()
960 .body()
961 .transactions()
962 .get(tx_index)
963 .cloned())
964 },
965 )
966 }
967
968 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
969 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
970 return Ok(Some(tx))
971 }
972
973 self.storage_provider.transaction_by_hash(hash)
974 }
975
976 fn transaction_by_hash_with_meta(
977 &self,
978 tx_hash: TxHash,
979 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
980 if let Some((tx, meta)) =
981 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
982 {
983 return Ok(Some((tx, meta)))
984 }
985
986 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
987 }
988
989 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
990 self.get_in_memory_or_storage_by_tx(
991 id.into(),
992 |provider| provider.transaction_block(id),
993 |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
994 )
995 }
996
997 fn transactions_by_block(
998 &self,
999 id: BlockHashOrNumber,
1000 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1001 self.get_in_memory_or_storage_by_block(
1002 id,
1003 |provider| provider.transactions_by_block(id),
1004 |block_state| {
1005 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
1006 },
1007 )
1008 }
1009
1010 fn transactions_by_block_range(
1011 &self,
1012 range: impl RangeBounds<BlockNumber>,
1013 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1014 self.get_in_memory_or_storage_by_block_range_while(
1015 range,
1016 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1017 |block_state, _| {
1018 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1019 },
1020 |_| true,
1021 )
1022 }
1023
1024 fn transactions_by_tx_range(
1025 &self,
1026 range: impl RangeBounds<TxNumber>,
1027 ) -> ProviderResult<Vec<Self::Transaction>> {
1028 self.get_in_memory_or_storage_by_tx_range(
1029 range,
1030 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1031 |index_range, block_state| {
1032 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1033 .to_vec())
1034 },
1035 )
1036 }
1037
1038 fn senders_by_tx_range(
1039 &self,
1040 range: impl RangeBounds<TxNumber>,
1041 ) -> ProviderResult<Vec<Address>> {
1042 self.get_in_memory_or_storage_by_tx_range(
1043 range,
1044 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1045 |index_range, block_state| {
1046 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1047 },
1048 )
1049 }
1050
1051 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1052 self.get_in_memory_or_storage_by_tx(
1053 id.into(),
1054 |provider| provider.transaction_sender(id),
1055 |tx_index, _, block_state| {
1056 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1057 },
1058 )
1059 }
1060}
1061
1062impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1063 type Receipt = ReceiptTy<N>;
1064
1065 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1066 self.get_in_memory_or_storage_by_tx(
1067 id.into(),
1068 |provider| provider.receipt(id),
1069 |tx_index, _, block_state| {
1070 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1071 },
1072 )
1073 }
1074
1075 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1076 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1077 let executed_block = block_state.block_ref();
1078 let block = executed_block.recovered_block();
1079 let receipts = block_state.executed_block_receipts();
1080
1081 debug_assert_eq!(
1083 block.body().transactions().len(),
1084 receipts.len(),
1085 "Mismatch between transaction and receipt count"
1086 );
1087
1088 if let Some(tx_index) =
1089 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1090 {
1091 return Ok(receipts.get(tx_index).cloned());
1093 }
1094 }
1095
1096 self.storage_provider.receipt_by_hash(hash)
1097 }
1098
1099 fn receipts_by_block(
1100 &self,
1101 block: BlockHashOrNumber,
1102 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1103 self.get_in_memory_or_storage_by_block(
1104 block,
1105 |db_provider| db_provider.receipts_by_block(block),
1106 |block_state| Ok(Some(block_state.executed_block_receipts())),
1107 )
1108 }
1109
1110 fn receipts_by_tx_range(
1111 &self,
1112 range: impl RangeBounds<TxNumber>,
1113 ) -> ProviderResult<Vec<Self::Receipt>> {
1114 self.get_in_memory_or_storage_by_tx_range(
1115 range,
1116 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1117 |index_range, block_state| {
1118 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1119 },
1120 )
1121 }
1122
1123 fn receipts_by_block_range(
1124 &self,
1125 block_range: RangeInclusive<BlockNumber>,
1126 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1127 self.storage_provider.receipts_by_block_range(block_range)
1128 }
1129}
1130
1131impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1132 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1133 match block {
1134 BlockId::Hash(rpc_block_hash) => {
1135 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1136 if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1137 if let Some(state) = self
1138 .head_block
1139 .as_ref()
1140 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1141 {
1142 receipts = Some(state.executed_block_receipts());
1143 }
1144 }
1145 Ok(receipts)
1146 }
1147 BlockId::Number(num_tag) => match num_tag {
1148 BlockNumberOrTag::Pending => Ok(self
1149 .canonical_in_memory_state
1150 .pending_state()
1151 .map(|block_state| block_state.executed_block_receipts())),
1152 _ => {
1153 if let Some(num) = self.convert_block_number(num_tag)? {
1154 self.receipts_by_block(num.into())
1155 } else {
1156 Ok(None)
1157 }
1158 }
1159 },
1160 }
1161 }
1162}
1163
1164impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1165 fn block_body_indices(
1166 &self,
1167 number: BlockNumber,
1168 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1169 self.get_in_memory_or_storage_by_block(
1170 number.into(),
1171 |db_provider| db_provider.block_body_indices(number),
1172 |block_state| {
1173 let last_storage_block_number = block_state.anchor().number;
1175 let mut stored_indices = self
1176 .storage_provider
1177 .block_body_indices(last_storage_block_number)?
1178 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1179
1180 stored_indices.first_tx_num = stored_indices.next_tx_num();
1182 stored_indices.tx_count = 0;
1183
1184 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1186 let block_tx_count =
1187 state.block_ref().recovered_block().body().transactions().len() as u64;
1188 if state.block_ref().recovered_block().number() == number {
1189 stored_indices.tx_count = block_tx_count;
1190 } else {
1191 stored_indices.first_tx_num += block_tx_count;
1192 }
1193 }
1194
1195 Ok(Some(stored_indices))
1196 },
1197 )
1198 }
1199
1200 fn block_body_indices_range(
1201 &self,
1202 range: RangeInclusive<BlockNumber>,
1203 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1204 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1205 }
1206}
1207
1208impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1209 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1210 self.storage_provider.get_stage_checkpoint(id)
1211 }
1212
1213 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1214 self.storage_provider.get_stage_checkpoint_progress(id)
1215 }
1216
1217 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1218 self.storage_provider.get_all_checkpoints()
1219 }
1220}
1221
1222impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1223 fn get_prune_checkpoint(
1224 &self,
1225 segment: PruneSegment,
1226 ) -> ProviderResult<Option<PruneCheckpoint>> {
1227 self.storage_provider.get_prune_checkpoint(segment)
1228 }
1229
1230 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1231 self.storage_provider.get_prune_checkpoints()
1232 }
1233}
1234
1235impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1236 type ChainSpec = N::ChainSpec;
1237
1238 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1239 ChainSpecProvider::chain_spec(&self.storage_provider)
1240 }
1241}
1242
1243impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1244 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1245 match id {
1246 BlockId::Number(num) => self.block_by_number_or_tag(num),
1247 BlockId::Hash(hash) => {
1248 if Some(true) == hash.require_canonical {
1253 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1255 } else {
1256 self.block_by_hash(hash.block_hash)
1257 }
1258 }
1259 }
1260 }
1261
1262 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1263 Ok(match id {
1264 BlockNumberOrTag::Latest => {
1265 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1266 }
1267 BlockNumberOrTag::Finalized => {
1268 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1269 }
1270 BlockNumberOrTag::Safe => {
1271 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1272 }
1273 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1274 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1275
1276 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1277 })
1278 }
1279
1280 fn sealed_header_by_number_or_tag(
1281 &self,
1282 id: BlockNumberOrTag,
1283 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1284 match id {
1285 BlockNumberOrTag::Latest => {
1286 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1287 }
1288 BlockNumberOrTag::Finalized => {
1289 Ok(self.canonical_in_memory_state.get_finalized_header())
1290 }
1291 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1292 BlockNumberOrTag::Earliest => self
1293 .header_by_number(self.earliest_block_number()?)?
1294 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1295 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1296 BlockNumberOrTag::Number(num) => self
1297 .header_by_number(num)?
1298 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1299 }
1300 }
1301
1302 fn sealed_header_by_id(
1303 &self,
1304 id: BlockId,
1305 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1306 Ok(match id {
1307 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1308 BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1309 })
1310 }
1311
1312 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1313 Ok(match id {
1314 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1315 BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1316 })
1317 }
1318}
1319
1320impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1321 fn storage_changeset(
1322 &self,
1323 block_number: BlockNumber,
1324 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1325 if let Some(state) =
1326 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1327 {
1328 let changesets = state
1329 .block()
1330 .execution_output
1331 .bundle
1332 .reverts
1333 .clone()
1334 .to_plain_state_reverts()
1335 .storage
1336 .into_iter()
1337 .flatten()
1338 .flat_map(|revert: PlainStorageRevert| {
1339 revert.storage_revert.into_iter().map(move |(key, value)| {
1340 (
1341 BlockNumberAddress((block_number, revert.address)),
1342 StorageEntry { key: key.into(), value: value.to_previous_value() },
1343 )
1344 })
1345 })
1346 .collect();
1347 Ok(changesets)
1348 } else {
1349 let storage_history_exists = self
1353 .storage_provider
1354 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1355 .and_then(|checkpoint| {
1356 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1361 })
1362 .unwrap_or(true);
1363
1364 if !storage_history_exists {
1365 return Err(ProviderError::StateAtBlockPruned(block_number))
1366 }
1367
1368 self.storage_provider.storage_changeset(block_number)
1369 }
1370 }
1371}
1372
1373impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1374 fn account_block_changeset(
1375 &self,
1376 block_number: BlockNumber,
1377 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1378 if let Some(state) =
1379 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1380 {
1381 let changesets = state
1382 .block_ref()
1383 .execution_output
1384 .bundle
1385 .reverts
1386 .clone()
1387 .to_plain_state_reverts()
1388 .accounts
1389 .into_iter()
1390 .flatten()
1391 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1392 .collect();
1393 Ok(changesets)
1394 } else {
1395 let account_history_exists = self
1399 .storage_provider
1400 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1401 .and_then(|checkpoint| {
1402 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1407 })
1408 .unwrap_or(true);
1409
1410 if !account_history_exists {
1411 return Err(ProviderError::StateAtBlockPruned(block_number))
1412 }
1413
1414 self.storage_provider.account_block_changeset(block_number)
1415 }
1416 }
1417}
1418
1419impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1420 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1422 let state_provider = self.latest_ref()?;
1424 state_provider.basic_account(address)
1425 }
1426}
1427
1428impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1429 type Receipt = ReceiptTy<N>;
1430
1431 fn get_state(
1441 &self,
1442 block: BlockNumber,
1443 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1444 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1445 let state = state.block_ref().execution_outcome().clone();
1446 Ok(Some(state))
1447 } else {
1448 Self::get_state(self, block..=block)
1449 }
1450 }
1451}
1452
1453#[cfg(test)]
1454mod tests {
1455 use crate::{
1456 providers::blockchain_provider::BlockchainProvider,
1457 test_utils::create_test_provider_factory, BlockWriter,
1458 };
1459 use alloy_eips::BlockHashOrNumber;
1460 use alloy_primitives::B256;
1461 use itertools::Itertools;
1462 use rand::Rng;
1463 use reth_chain_state::{
1464 ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates, NewCanonicalChain,
1465 };
1466 use reth_db_api::models::AccountBeforeTx;
1467 use reth_ethereum_primitives::Block;
1468 use reth_execution_types::ExecutionOutcome;
1469 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1470 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1471 use reth_testing_utils::generators::{
1472 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1473 };
1474 use revm_database::BundleState;
1475 use std::{
1476 ops::{Bound, Range, RangeBounds},
1477 sync::Arc,
1478 };
1479
1480 const TEST_BLOCKS_COUNT: usize = 5;
1481
1482 fn random_blocks(
1483 rng: &mut impl Rng,
1484 database_blocks: usize,
1485 in_memory_blocks: usize,
1486 requests_count: Option<Range<u8>>,
1487 withdrawals_count: Option<Range<u8>>,
1488 tx_count: impl RangeBounds<u8>,
1489 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1490 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1491
1492 let tx_start = match tx_count.start_bound() {
1493 Bound::Included(&n) | Bound::Excluded(&n) => n,
1494 Bound::Unbounded => u8::MIN,
1495 };
1496 let tx_end = match tx_count.end_bound() {
1497 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1498 Bound::Unbounded => u8::MAX,
1499 };
1500
1501 let blocks = random_block_range(
1502 rng,
1503 0..=block_range,
1504 BlockRangeParams {
1505 parent: Some(B256::ZERO),
1506 tx_count: tx_start..tx_end,
1507 requests_count,
1508 withdrawals_count,
1509 },
1510 );
1511 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1512 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1513 }
1514
1515 #[test]
1516 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1517 let mut rng = generators::rng();
1519 let factory = create_test_provider_factory();
1520
1521 let blocks = random_block_range(
1523 &mut rng,
1524 0..=10,
1525 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1526 );
1527 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1528
1529 let provider_rw = factory.provider_rw()?;
1531 for block in database_blocks {
1532 provider_rw.insert_historical_block(
1533 block.clone().try_recover().expect("failed to seal block with senders"),
1534 )?;
1535 }
1536 provider_rw.commit()?;
1537
1538 let provider = BlockchainProvider::new(factory)?;
1540 let consistent_provider = provider.consistent_provider()?;
1541
1542 let first_db_block = database_blocks.first().unwrap();
1544 let first_in_mem_block = in_memory_blocks.first().unwrap();
1545 let last_in_mem_block = in_memory_blocks.last().unwrap();
1546
1547 assert_eq!(
1549 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1550 None
1551 );
1552 assert_eq!(
1553 consistent_provider
1554 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1555 None
1556 );
1557 assert_eq!(
1559 consistent_provider
1560 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1561 None
1562 );
1563
1564 let in_memory_block_senders =
1566 first_in_mem_block.senders().expect("failed to recover senders");
1567 let chain = NewCanonicalChain::Commit {
1568 new: vec![ExecutedBlockWithTrieUpdates::new(
1569 Arc::new(RecoveredBlock::new_sealed(
1570 first_in_mem_block.clone(),
1571 in_memory_block_senders,
1572 )),
1573 Default::default(),
1574 Default::default(),
1575 ExecutedTrieUpdates::empty(),
1576 )],
1577 };
1578 consistent_provider.canonical_in_memory_state.update_chain(chain);
1579 let consistent_provider = provider.consistent_provider()?;
1580
1581 assert_eq!(
1583 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1584 Some(first_in_mem_block.clone().into_block())
1585 );
1586 assert_eq!(
1587 consistent_provider
1588 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1589 Some(first_in_mem_block.clone().into_block())
1590 );
1591
1592 assert_eq!(
1594 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1595 Some(first_db_block.clone().into_block())
1596 );
1597 assert_eq!(
1598 consistent_provider
1599 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1600 Some(first_db_block.clone().into_block())
1601 );
1602
1603 assert_eq!(
1605 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1606 None
1607 );
1608
1609 provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1611 block: ExecutedBlock {
1612 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1613 last_in_mem_block.clone(),
1614 Default::default(),
1615 )),
1616 execution_output: Default::default(),
1617 hashed_state: Default::default(),
1618 },
1619 trie: ExecutedTrieUpdates::empty(),
1620 });
1621
1622 assert_eq!(
1624 consistent_provider
1625 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1626 Some(last_in_mem_block.clone_block())
1627 );
1628
1629 Ok(())
1630 }
1631
1632 #[test]
1633 fn test_block_reader_block() -> eyre::Result<()> {
1634 let mut rng = generators::rng();
1636 let factory = create_test_provider_factory();
1637
1638 let blocks = random_block_range(
1640 &mut rng,
1641 0..=10,
1642 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1643 );
1644 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1645
1646 let provider_rw = factory.provider_rw()?;
1648 for block in database_blocks {
1649 provider_rw.insert_historical_block(
1650 block.clone().try_recover().expect("failed to seal block with senders"),
1651 )?;
1652 }
1653 provider_rw.commit()?;
1654
1655 let provider = BlockchainProvider::new(factory)?;
1657 let consistent_provider = provider.consistent_provider()?;
1658
1659 let first_in_mem_block = in_memory_blocks.first().unwrap();
1661 let first_db_block = database_blocks.first().unwrap();
1663
1664 assert_eq!(
1666 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1667 None
1668 );
1669 assert_eq!(
1670 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1671 None
1672 );
1673
1674 let in_memory_block_senders =
1676 first_in_mem_block.senders().expect("failed to recover senders");
1677 let chain = NewCanonicalChain::Commit {
1678 new: vec![ExecutedBlockWithTrieUpdates::new(
1679 Arc::new(RecoveredBlock::new_sealed(
1680 first_in_mem_block.clone(),
1681 in_memory_block_senders,
1682 )),
1683 Default::default(),
1684 Default::default(),
1685 ExecutedTrieUpdates::empty(),
1686 )],
1687 };
1688 consistent_provider.canonical_in_memory_state.update_chain(chain);
1689
1690 let consistent_provider = provider.consistent_provider()?;
1691
1692 assert_eq!(
1694 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1695 Some(first_in_mem_block.clone().into_block())
1696 );
1697 assert_eq!(
1698 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1699 Some(first_in_mem_block.clone().into_block())
1700 );
1701
1702 assert_eq!(
1704 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1705 Some(first_db_block.clone().into_block())
1706 );
1707 assert_eq!(
1708 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1709 Some(first_db_block.clone().into_block())
1710 );
1711
1712 Ok(())
1713 }
1714
1715 #[test]
1716 fn test_changeset_reader() -> eyre::Result<()> {
1717 let mut rng = generators::rng();
1718
1719 let (database_blocks, in_memory_blocks) =
1720 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1721
1722 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1723 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1724 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1725
1726 let accounts = random_eoa_accounts(&mut rng, 2);
1727
1728 let (database_changesets, database_state) = random_changeset_range(
1729 &mut rng,
1730 &database_blocks,
1731 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1732 0..0,
1733 0..0,
1734 );
1735 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1736 &mut rng,
1737 &in_memory_blocks,
1738 database_state
1739 .iter()
1740 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1741 0..0,
1742 0..0,
1743 );
1744
1745 let factory = create_test_provider_factory();
1746
1747 let provider_rw = factory.provider_rw()?;
1748 provider_rw.append_blocks_with_state(
1749 database_blocks
1750 .into_iter()
1751 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1752 .collect(),
1753 &ExecutionOutcome {
1754 bundle: BundleState::new(
1755 database_state.into_iter().map(|(address, (account, _))| {
1756 (address, None, Some(account.into()), Default::default())
1757 }),
1758 database_changesets
1759 .iter()
1760 .map(|block_changesets| {
1761 block_changesets.iter().map(|(address, account, _)| {
1762 (*address, Some(Some((*account).into())), [])
1763 })
1764 })
1765 .collect::<Vec<_>>(),
1766 Vec::new(),
1767 ),
1768 first_block: first_database_block,
1769 ..Default::default()
1770 },
1771 Default::default(),
1772 )?;
1773 provider_rw.commit()?;
1774
1775 let provider = BlockchainProvider::new(factory)?;
1776
1777 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1778 let chain = NewCanonicalChain::Commit {
1779 new: vec![in_memory_blocks
1780 .first()
1781 .map(|block| {
1782 let senders = block.senders().expect("failed to recover senders");
1783 ExecutedBlockWithTrieUpdates::new(
1784 Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1785 Arc::new(ExecutionOutcome {
1786 bundle: BundleState::new(
1787 in_memory_state.into_iter().map(|(address, (account, _))| {
1788 (address, None, Some(account.into()), Default::default())
1789 }),
1790 [in_memory_changesets.iter().map(|(address, account, _)| {
1791 (*address, Some(Some((*account).into())), Vec::new())
1792 })],
1793 [],
1794 ),
1795 first_block: first_in_memory_block,
1796 ..Default::default()
1797 }),
1798 Default::default(),
1799 ExecutedTrieUpdates::empty(),
1800 )
1801 })
1802 .unwrap()],
1803 };
1804 provider.canonical_in_memory_state.update_chain(chain);
1805
1806 let consistent_provider = provider.consistent_provider()?;
1807
1808 assert_eq!(
1809 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1810 database_changesets
1811 .into_iter()
1812 .next_back()
1813 .unwrap()
1814 .into_iter()
1815 .sorted_by_key(|(address, _, _)| *address)
1816 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1817 .collect::<Vec<_>>()
1818 );
1819 assert_eq!(
1820 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1821 in_memory_changesets
1822 .into_iter()
1823 .sorted_by_key(|(address, _, _)| *address)
1824 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1825 .collect::<Vec<_>>()
1826 );
1827
1828 Ok(())
1829 }
1830}