1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider, WithdrawalsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber, BlockId, BlockNumHash,
12 BlockNumberOrTag, HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::{ChainInfo, EthereumHardforks};
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{
24 Account, BlockBody, RecoveredBlock, SealedBlock, SealedHeader, StorageEntry,
25};
26use reth_prune_types::{PruneCheckpoint, PruneSegment};
27use reth_stages_types::{StageCheckpoint, StageId};
28use reth_storage_api::{
29 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, OmmersProvider,
30 StateProvider, StorageChangeSetReader,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
221 &self,
222 account_changeset: Vec<(u64, AccountBeforeTx)>,
223 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224 block_range_end: BlockNumber,
225 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226 let mut state: BundleStateInit = HashMap::default();
227 let mut reverts: RevertsInit = HashMap::default();
228 let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230 for (block_number, account_before) in account_changeset.into_iter().rev() {
232 let AccountBeforeTx { info: old_info, address } = account_before;
233 match state.entry(address) {
234 hash_map::Entry::Vacant(entry) => {
235 let new_info = state_provider.basic_account(&address)?;
236 entry.insert((old_info, new_info, HashMap::default()));
237 }
238 hash_map::Entry::Occupied(mut entry) => {
239 entry.get_mut().0 = old_info;
241 }
242 }
243 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245 }
246
247 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249 let BlockNumberAddress((block_number, address)) = block_and_address;
250 let account_state = match state.entry(address) {
252 hash_map::Entry::Vacant(entry) => {
253 let present_info = state_provider.basic_account(&address)?;
254 entry.insert((present_info, present_info, HashMap::default()))
255 }
256 hash_map::Entry::Occupied(entry) => entry.into_mut(),
257 };
258
259 match account_state.2.entry(old_storage.key) {
261 hash_map::Entry::Vacant(entry) => {
262 let new_storage_value =
263 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264 entry.insert((old_storage.value, new_storage_value));
265 }
266 hash_map::Entry::Occupied(mut entry) => {
267 entry.get_mut().0 = old_storage.value;
268 }
269 };
270
271 reverts
272 .entry(block_number)
273 .or_default()
274 .entry(address)
275 .or_default()
276 .1
277 .push(old_storage);
278 }
279
280 Ok((state, reverts))
281 }
282
283 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295 &self,
296 range: impl RangeBounds<BlockNumber>,
297 fetch_db_range: F,
298 map_block_state_item: G,
299 mut predicate: P,
300 ) -> ProviderResult<Vec<T>>
301 where
302 F: FnOnce(
303 &DatabaseProviderRO<N::DB, N>,
304 RangeInclusive<BlockNumber>,
305 &mut P,
306 ) -> ProviderResult<Vec<T>>,
307 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308 P: FnMut(&T) -> bool,
309 {
310 let mut in_memory_chain =
318 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319 let db_provider = &self.storage_provider;
320
321 let (start, end) = self.convert_range_bounds(range, || {
322 in_memory_chain
324 .first()
325 .map(|b| b.number())
326 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327 });
328
329 if start > end {
330 return Ok(vec![])
331 }
332
333 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338 Some(lowest_memory_block) if lowest_memory_block <= end => {
339 let highest_memory_block =
340 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342 let in_memory_range =
346 lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348 in_memory_chain.truncate(
351 in_memory_chain
352 .len()
353 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354 );
355
356 let storage_range =
357 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359 (Some((in_memory_chain, in_memory_range)), storage_range)
360 }
361 _ => {
362 drop(in_memory_chain);
364
365 (None, Some(start..=end))
366 }
367 };
368
369 let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371 if let Some(storage_range) = storage_range {
372 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373 items.append(&mut db_items);
374
375 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378 return Ok(items)
379 }
380 }
381
382 if let Some((in_memory_chain, in_memory_range)) = in_memory {
383 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384 debug_assert!(num == block.number());
385 if let Some(item) = map_block_state_item(block, &mut predicate) {
386 items.push(item);
387 } else {
388 break
389 }
390 }
391 }
392
393 Ok(items)
394 }
395
396 fn block_state_provider_ref(
398 &self,
399 state: &BlockState<N::Primitives>,
400 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401 let anchor_hash = state.anchor().hash;
402 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405 }
406
407 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413 &self,
414 range: impl RangeBounds<BlockNumber>,
415 fetch_from_db: S,
416 fetch_from_block_state: M,
417 ) -> ProviderResult<Vec<R>>
418 where
419 S: FnOnce(
420 &DatabaseProviderRO<N::DB, N>,
421 RangeInclusive<TxNumber>,
422 ) -> ProviderResult<Vec<R>>,
423 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424 {
425 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426 let provider = &self.storage_provider;
427
428 let last_database_block_number = in_mem_chain
431 .last()
432 .map(|b| Ok(b.anchor().number))
433 .unwrap_or_else(|| provider.last_block_number())?;
434
435 let last_block_body_index = provider
438 .block_body_indices(last_database_block_number)?
439 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442 let (start, end) = self.convert_range_bounds(range, || {
443 in_mem_chain
444 .iter()
445 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446 .sum::<u64>() +
447 last_block_body_index.last_tx_num()
448 });
449
450 if start > end {
451 return Ok(vec![])
452 }
453
454 let mut tx_range = start..=end;
455
456 if *tx_range.end() < in_memory_tx_num {
459 return fetch_from_db(provider, tx_range);
460 }
461
462 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464 if *tx_range.start() < in_memory_tx_num {
466 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469 tx_range = in_memory_tx_num..=*tx_range.end();
471
472 items.extend(fetch_from_db(provider, db_range)?);
473 }
474
475 for block_state in in_mem_chain.iter().rev() {
477 let block_tx_count =
478 block_state.block_ref().recovered_block().body().transactions().len();
479 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484 in_memory_tx_num += block_tx_count as u64;
485 continue
486 }
487
488 let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491 items.extend(fetch_from_block_state(
492 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493 block_state,
494 )?);
495
496 in_memory_tx_num += block_tx_count as u64;
497
498 if in_memory_tx_num > *tx_range.end() {
500 break
501 }
502
503 tx_range = in_memory_tx_num..=*tx_range.end();
505 }
506
507 Ok(items)
508 }
509
510 fn get_in_memory_or_storage_by_tx<S, M, R>(
513 &self,
514 id: HashOrNumber,
515 fetch_from_db: S,
516 fetch_from_block_state: M,
517 ) -> ProviderResult<Option<R>>
518 where
519 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521 {
522 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523 let provider = &self.storage_provider;
524
525 let last_database_block_number = in_mem_chain
528 .last()
529 .map(|b| Ok(b.anchor().number))
530 .unwrap_or_else(|| provider.last_block_number())?;
531
532 let last_block_body_index = provider
535 .block_body_indices(last_database_block_number)?
536 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539 if let HashOrNumber::Number(id) = id {
542 if id < in_memory_tx_num {
543 return fetch_from_db(provider)
544 }
545 }
546
547 for block_state in in_mem_chain.iter().rev() {
549 let executed_block = block_state.block_ref();
550 let block = executed_block.recovered_block();
551
552 for tx_index in 0..block.body().transactions().len() {
553 match id {
554 HashOrNumber::Hash(tx_hash) => {
555 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557 }
558 }
559 HashOrNumber::Number(id) => {
560 if id == in_memory_tx_num {
561 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562 }
563 }
564 }
565
566 in_memory_tx_num += 1;
567 }
568 }
569
570 if let HashOrNumber::Hash(_) = id {
572 return fetch_from_db(provider)
573 }
574
575 Ok(None)
576 }
577
578 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580 &self,
581 id: BlockHashOrNumber,
582 fetch_from_db: S,
583 fetch_from_block_state: M,
584 ) -> ProviderResult<R>
585 where
586 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588 {
589 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590 return fetch_from_block_state(block_state)
591 }
592 fetch_from_db(&self.storage_provider)
593 }
594}
595
596impl<N: ProviderNodeTypes> ConsistentProvider<N> {
597 #[inline]
606 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
607 let latest = self.best_block_number()?;
608 if block_number > latest {
609 Err(ProviderError::HeaderNotFound(block_number.into()))
610 } else {
611 Ok(())
612 }
613 }
614}
615
616impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
617 type Primitives = N::Primitives;
618}
619
620impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
621 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
622 self.storage_provider.static_file_provider()
623 }
624}
625
626impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
627 type Header = HeaderTy<N>;
628
629 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
630 self.get_in_memory_or_storage_by_block(
631 (*block_hash).into(),
632 |db_provider| db_provider.header(block_hash),
633 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
634 )
635 }
636
637 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
638 self.get_in_memory_or_storage_by_block(
639 num.into(),
640 |db_provider| db_provider.header_by_number(num),
641 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
642 )
643 }
644
645 fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
646 if let Some(num) = self.block_number(*hash)? {
647 self.header_td_by_number(num)
648 } else {
649 Ok(None)
650 }
651 }
652
653 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
654 let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
655 {
656 if let Some(last_finalized_num_hash) =
663 self.canonical_in_memory_state.get_finalized_num_hash()
664 {
665 last_finalized_num_hash.number
666 } else {
667 self.last_block_number()?
668 }
669 } else {
670 number
672 };
673 self.storage_provider.header_td_by_number(number)
674 }
675
676 fn headers_range(
677 &self,
678 range: impl RangeBounds<BlockNumber>,
679 ) -> ProviderResult<Vec<Self::Header>> {
680 self.get_in_memory_or_storage_by_block_range_while(
681 range,
682 |db_provider, range, _| db_provider.headers_range(range),
683 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
684 |_| true,
685 )
686 }
687
688 fn sealed_header(
689 &self,
690 number: BlockNumber,
691 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
692 self.get_in_memory_or_storage_by_block(
693 number.into(),
694 |db_provider| db_provider.sealed_header(number),
695 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
696 )
697 }
698
699 fn sealed_headers_range(
700 &self,
701 range: impl RangeBounds<BlockNumber>,
702 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
703 self.get_in_memory_or_storage_by_block_range_while(
704 range,
705 |db_provider, range, _| db_provider.sealed_headers_range(range),
706 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
707 |_| true,
708 )
709 }
710
711 fn sealed_headers_while(
712 &self,
713 range: impl RangeBounds<BlockNumber>,
714 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
715 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
716 self.get_in_memory_or_storage_by_block_range_while(
717 range,
718 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
719 |block_state, predicate| {
720 let header = block_state.block_ref().recovered_block().sealed_header();
721 predicate(header).then(|| header.clone())
722 },
723 predicate,
724 )
725 }
726}
727
728impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
729 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
730 self.get_in_memory_or_storage_by_block(
731 number.into(),
732 |db_provider| db_provider.block_hash(number),
733 |block_state| Ok(Some(block_state.hash())),
734 )
735 }
736
737 fn canonical_hashes_range(
738 &self,
739 start: BlockNumber,
740 end: BlockNumber,
741 ) -> ProviderResult<Vec<B256>> {
742 self.get_in_memory_or_storage_by_block_range_while(
743 start..end,
744 |db_provider, inclusive_range, _| {
745 db_provider
746 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
747 },
748 |block_state, _| Some(block_state.hash()),
749 |_| true,
750 )
751 }
752}
753
754impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
755 fn chain_info(&self) -> ProviderResult<ChainInfo> {
756 let best_number = self.best_block_number()?;
757 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
758 }
759
760 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
761 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
762 }
763
764 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
765 self.storage_provider.last_block_number()
766 }
767
768 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
769 self.get_in_memory_or_storage_by_block(
770 hash.into(),
771 |db_provider| db_provider.block_number(hash),
772 |block_state| Ok(Some(block_state.number())),
773 )
774 }
775}
776
777impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
778 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
779 Ok(self.canonical_in_memory_state.pending_block_num_hash())
780 }
781
782 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
783 Ok(self.canonical_in_memory_state.get_safe_num_hash())
784 }
785
786 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
787 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
788 }
789}
790
791impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
792 type Block = BlockTy<N>;
793
794 fn find_block_by_hash(
795 &self,
796 hash: B256,
797 source: BlockSource,
798 ) -> ProviderResult<Option<Self::Block>> {
799 if matches!(source, BlockSource::Canonical | BlockSource::Any) {
800 if let Some(block) = self.get_in_memory_or_storage_by_block(
801 hash.into(),
802 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
803 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
804 )? {
805 return Ok(Some(block))
806 }
807 }
808
809 if matches!(source, BlockSource::Pending | BlockSource::Any) {
810 return Ok(self
811 .canonical_in_memory_state
812 .pending_block()
813 .filter(|b| b.hash() == hash)
814 .map(|b| b.into_block()))
815 }
816
817 Ok(None)
818 }
819
820 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
821 self.get_in_memory_or_storage_by_block(
822 id,
823 |db_provider| db_provider.block(id),
824 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
825 )
826 }
827
828 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
829 Ok(self.canonical_in_memory_state.pending_block())
830 }
831
832 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
833 Ok(self.canonical_in_memory_state.pending_recovered_block())
834 }
835
836 fn pending_block_and_receipts(
837 &self,
838 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
839 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
840 }
841
842 fn recovered_block(
849 &self,
850 id: BlockHashOrNumber,
851 transaction_kind: TransactionVariant,
852 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
853 self.get_in_memory_or_storage_by_block(
854 id,
855 |db_provider| db_provider.recovered_block(id, transaction_kind),
856 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
857 )
858 }
859
860 fn sealed_block_with_senders(
861 &self,
862 id: BlockHashOrNumber,
863 transaction_kind: TransactionVariant,
864 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
865 self.get_in_memory_or_storage_by_block(
866 id,
867 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
868 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
869 )
870 }
871
872 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
873 self.get_in_memory_or_storage_by_block_range_while(
874 range,
875 |db_provider, range, _| db_provider.block_range(range),
876 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
877 |_| true,
878 )
879 }
880
881 fn block_with_senders_range(
882 &self,
883 range: RangeInclusive<BlockNumber>,
884 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
885 self.get_in_memory_or_storage_by_block_range_while(
886 range,
887 |db_provider, range, _| db_provider.block_with_senders_range(range),
888 |block_state, _| Some(block_state.block().recovered_block().clone()),
889 |_| true,
890 )
891 }
892
893 fn recovered_block_range(
894 &self,
895 range: RangeInclusive<BlockNumber>,
896 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
897 self.get_in_memory_or_storage_by_block_range_while(
898 range,
899 |db_provider, range, _| db_provider.recovered_block_range(range),
900 |block_state, _| Some(block_state.block().recovered_block().clone()),
901 |_| true,
902 )
903 }
904}
905
906impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
907 type Transaction = TxTy<N>;
908
909 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
910 self.get_in_memory_or_storage_by_tx(
911 tx_hash.into(),
912 |db_provider| db_provider.transaction_id(tx_hash),
913 |_, tx_number, _| Ok(Some(tx_number)),
914 )
915 }
916
917 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
918 self.get_in_memory_or_storage_by_tx(
919 id.into(),
920 |provider| provider.transaction_by_id(id),
921 |tx_index, _, block_state| {
922 Ok(block_state
923 .block_ref()
924 .recovered_block()
925 .body()
926 .transactions()
927 .get(tx_index)
928 .cloned())
929 },
930 )
931 }
932
933 fn transaction_by_id_unhashed(
934 &self,
935 id: TxNumber,
936 ) -> ProviderResult<Option<Self::Transaction>> {
937 self.get_in_memory_or_storage_by_tx(
938 id.into(),
939 |provider| provider.transaction_by_id_unhashed(id),
940 |tx_index, _, block_state| {
941 Ok(block_state
942 .block_ref()
943 .recovered_block()
944 .body()
945 .transactions()
946 .get(tx_index)
947 .cloned())
948 },
949 )
950 }
951
952 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
953 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
954 return Ok(Some(tx))
955 }
956
957 self.storage_provider.transaction_by_hash(hash)
958 }
959
960 fn transaction_by_hash_with_meta(
961 &self,
962 tx_hash: TxHash,
963 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
964 if let Some((tx, meta)) =
965 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
966 {
967 return Ok(Some((tx, meta)))
968 }
969
970 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
971 }
972
973 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
974 self.get_in_memory_or_storage_by_tx(
975 id.into(),
976 |provider| provider.transaction_block(id),
977 |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
978 )
979 }
980
981 fn transactions_by_block(
982 &self,
983 id: BlockHashOrNumber,
984 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
985 self.get_in_memory_or_storage_by_block(
986 id,
987 |provider| provider.transactions_by_block(id),
988 |block_state| {
989 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
990 },
991 )
992 }
993
994 fn transactions_by_block_range(
995 &self,
996 range: impl RangeBounds<BlockNumber>,
997 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
998 self.get_in_memory_or_storage_by_block_range_while(
999 range,
1000 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1001 |block_state, _| {
1002 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1003 },
1004 |_| true,
1005 )
1006 }
1007
1008 fn transactions_by_tx_range(
1009 &self,
1010 range: impl RangeBounds<TxNumber>,
1011 ) -> ProviderResult<Vec<Self::Transaction>> {
1012 self.get_in_memory_or_storage_by_tx_range(
1013 range,
1014 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1015 |index_range, block_state| {
1016 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1017 .to_vec())
1018 },
1019 )
1020 }
1021
1022 fn senders_by_tx_range(
1023 &self,
1024 range: impl RangeBounds<TxNumber>,
1025 ) -> ProviderResult<Vec<Address>> {
1026 self.get_in_memory_or_storage_by_tx_range(
1027 range,
1028 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1029 |index_range, block_state| {
1030 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1031 },
1032 )
1033 }
1034
1035 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1036 self.get_in_memory_or_storage_by_tx(
1037 id.into(),
1038 |provider| provider.transaction_sender(id),
1039 |tx_index, _, block_state| {
1040 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1041 },
1042 )
1043 }
1044}
1045
1046impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1047 type Receipt = ReceiptTy<N>;
1048
1049 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1050 self.get_in_memory_or_storage_by_tx(
1051 id.into(),
1052 |provider| provider.receipt(id),
1053 |tx_index, _, block_state| {
1054 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1055 },
1056 )
1057 }
1058
1059 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1060 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1061 let executed_block = block_state.block_ref();
1062 let block = executed_block.recovered_block();
1063 let receipts = block_state.executed_block_receipts();
1064
1065 debug_assert_eq!(
1067 block.body().transactions().len(),
1068 receipts.len(),
1069 "Mismatch between transaction and receipt count"
1070 );
1071
1072 if let Some(tx_index) =
1073 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1074 {
1075 return Ok(receipts.get(tx_index).cloned());
1077 }
1078 }
1079
1080 self.storage_provider.receipt_by_hash(hash)
1081 }
1082
1083 fn receipts_by_block(
1084 &self,
1085 block: BlockHashOrNumber,
1086 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1087 self.get_in_memory_or_storage_by_block(
1088 block,
1089 |db_provider| db_provider.receipts_by_block(block),
1090 |block_state| Ok(Some(block_state.executed_block_receipts())),
1091 )
1092 }
1093
1094 fn receipts_by_tx_range(
1095 &self,
1096 range: impl RangeBounds<TxNumber>,
1097 ) -> ProviderResult<Vec<Self::Receipt>> {
1098 self.get_in_memory_or_storage_by_tx_range(
1099 range,
1100 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1101 |index_range, block_state| {
1102 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1103 },
1104 )
1105 }
1106}
1107
1108impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1109 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1110 match block {
1111 BlockId::Hash(rpc_block_hash) => {
1112 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1113 if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1114 if let Some(state) = self
1115 .head_block
1116 .as_ref()
1117 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1118 {
1119 receipts = Some(state.executed_block_receipts());
1120 }
1121 }
1122 Ok(receipts)
1123 }
1124 BlockId::Number(num_tag) => match num_tag {
1125 BlockNumberOrTag::Pending => Ok(self
1126 .canonical_in_memory_state
1127 .pending_state()
1128 .map(|block_state| block_state.executed_block_receipts())),
1129 _ => {
1130 if let Some(num) = self.convert_block_number(num_tag)? {
1131 self.receipts_by_block(num.into())
1132 } else {
1133 Ok(None)
1134 }
1135 }
1136 },
1137 }
1138 }
1139}
1140
1141impl<N: ProviderNodeTypes> WithdrawalsProvider for ConsistentProvider<N> {
1142 fn withdrawals_by_block(
1143 &self,
1144 id: BlockHashOrNumber,
1145 timestamp: u64,
1146 ) -> ProviderResult<Option<Withdrawals>> {
1147 if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) {
1148 return Ok(None)
1149 }
1150
1151 self.get_in_memory_or_storage_by_block(
1152 id,
1153 |db_provider| db_provider.withdrawals_by_block(id, timestamp),
1154 |block_state| {
1155 Ok(block_state.block_ref().recovered_block().body().withdrawals().cloned())
1156 },
1157 )
1158 }
1159}
1160
1161impl<N: ProviderNodeTypes> OmmersProvider for ConsistentProvider<N> {
1162 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1163 self.get_in_memory_or_storage_by_block(
1164 id,
1165 |db_provider| db_provider.ommers(id),
1166 |block_state| {
1167 if self.chain_spec().is_paris_active_at_block(block_state.number()) {
1168 return Ok(Some(Vec::new()))
1169 }
1170
1171 Ok(block_state.block_ref().recovered_block().body().ommers().map(|o| o.to_vec()))
1172 },
1173 )
1174 }
1175}
1176
1177impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1178 fn block_body_indices(
1179 &self,
1180 number: BlockNumber,
1181 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1182 self.get_in_memory_or_storage_by_block(
1183 number.into(),
1184 |db_provider| db_provider.block_body_indices(number),
1185 |block_state| {
1186 let last_storage_block_number = block_state.anchor().number;
1188 let mut stored_indices = self
1189 .storage_provider
1190 .block_body_indices(last_storage_block_number)?
1191 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1192
1193 stored_indices.first_tx_num = stored_indices.next_tx_num();
1195 stored_indices.tx_count = 0;
1196
1197 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1199 let block_tx_count =
1200 state.block_ref().recovered_block().body().transactions().len() as u64;
1201 if state.block_ref().recovered_block().number() == number {
1202 stored_indices.tx_count = block_tx_count;
1203 } else {
1204 stored_indices.first_tx_num += block_tx_count;
1205 }
1206 }
1207
1208 Ok(Some(stored_indices))
1209 },
1210 )
1211 }
1212
1213 fn block_body_indices_range(
1214 &self,
1215 range: RangeInclusive<BlockNumber>,
1216 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1217 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1218 }
1219}
1220
1221impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1222 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1223 self.storage_provider.get_stage_checkpoint(id)
1224 }
1225
1226 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1227 self.storage_provider.get_stage_checkpoint_progress(id)
1228 }
1229
1230 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1231 self.storage_provider.get_all_checkpoints()
1232 }
1233}
1234
1235impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1236 fn get_prune_checkpoint(
1237 &self,
1238 segment: PruneSegment,
1239 ) -> ProviderResult<Option<PruneCheckpoint>> {
1240 self.storage_provider.get_prune_checkpoint(segment)
1241 }
1242
1243 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1244 self.storage_provider.get_prune_checkpoints()
1245 }
1246}
1247
1248impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1249 type ChainSpec = N::ChainSpec;
1250
1251 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1252 ChainSpecProvider::chain_spec(&self.storage_provider)
1253 }
1254}
1255
1256impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1257 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1258 match id {
1259 BlockId::Number(num) => self.block_by_number_or_tag(num),
1260 BlockId::Hash(hash) => {
1261 if Some(true) == hash.require_canonical {
1266 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1268 } else {
1269 self.block_by_hash(hash.block_hash)
1270 }
1271 }
1272 }
1273 }
1274
1275 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1276 Ok(match id {
1277 BlockNumberOrTag::Latest => {
1278 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1279 }
1280 BlockNumberOrTag::Finalized => {
1281 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1282 }
1283 BlockNumberOrTag::Safe => {
1284 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1285 }
1286 BlockNumberOrTag::Earliest => self.header_by_number(0)?,
1287 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1288
1289 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1290 })
1291 }
1292
1293 fn sealed_header_by_number_or_tag(
1294 &self,
1295 id: BlockNumberOrTag,
1296 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1297 match id {
1298 BlockNumberOrTag::Latest => {
1299 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1300 }
1301 BlockNumberOrTag::Finalized => {
1302 Ok(self.canonical_in_memory_state.get_finalized_header())
1303 }
1304 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1305 BlockNumberOrTag::Earliest => self
1306 .header_by_number(0)?
1307 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1308 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1309 BlockNumberOrTag::Number(num) => self
1310 .header_by_number(num)?
1311 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1312 }
1313 }
1314
1315 fn sealed_header_by_id(
1316 &self,
1317 id: BlockId,
1318 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1319 Ok(match id {
1320 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1321 BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1322 })
1323 }
1324
1325 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1326 Ok(match id {
1327 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1328 BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1329 })
1330 }
1331
1332 fn ommers_by_id(&self, id: BlockId) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1333 match id {
1334 BlockId::Number(num) => self.ommers_by_number_or_tag(num),
1335 BlockId::Hash(hash) => {
1336 self.ommers(BlockHashOrNumber::Hash(hash.block_hash))
1339 }
1340 }
1341 }
1342}
1343
1344impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1345 fn storage_changeset(
1346 &self,
1347 block_number: BlockNumber,
1348 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1349 if let Some(state) =
1350 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1351 {
1352 let changesets = state
1353 .block()
1354 .execution_output
1355 .bundle
1356 .reverts
1357 .clone()
1358 .to_plain_state_reverts()
1359 .storage
1360 .into_iter()
1361 .flatten()
1362 .flat_map(|revert: PlainStorageRevert| {
1363 revert.storage_revert.into_iter().map(move |(key, value)| {
1364 (
1365 BlockNumberAddress((block_number, revert.address)),
1366 StorageEntry { key: key.into(), value: value.to_previous_value() },
1367 )
1368 })
1369 })
1370 .collect();
1371 Ok(changesets)
1372 } else {
1373 let storage_history_exists = self
1377 .storage_provider
1378 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1379 .and_then(|checkpoint| {
1380 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1385 })
1386 .unwrap_or(true);
1387
1388 if !storage_history_exists {
1389 return Err(ProviderError::StateAtBlockPruned(block_number))
1390 }
1391
1392 self.storage_provider.storage_changeset(block_number)
1393 }
1394 }
1395}
1396
1397impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1398 fn account_block_changeset(
1399 &self,
1400 block_number: BlockNumber,
1401 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1402 if let Some(state) =
1403 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1404 {
1405 let changesets = state
1406 .block_ref()
1407 .execution_output
1408 .bundle
1409 .reverts
1410 .clone()
1411 .to_plain_state_reverts()
1412 .accounts
1413 .into_iter()
1414 .flatten()
1415 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1416 .collect();
1417 Ok(changesets)
1418 } else {
1419 let account_history_exists = self
1423 .storage_provider
1424 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1425 .and_then(|checkpoint| {
1426 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1431 })
1432 .unwrap_or(true);
1433
1434 if !account_history_exists {
1435 return Err(ProviderError::StateAtBlockPruned(block_number))
1436 }
1437
1438 self.storage_provider.account_block_changeset(block_number)
1439 }
1440 }
1441}
1442
1443impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1444 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1446 let state_provider = self.latest_ref()?;
1448 state_provider.basic_account(address)
1449 }
1450}
1451
1452impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1453 type Receipt = ReceiptTy<N>;
1454
1455 fn get_state(
1465 &self,
1466 block: BlockNumber,
1467 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1468 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1469 let state = state.block_ref().execution_outcome().clone();
1470 Ok(Some(state))
1471 } else {
1472 Self::get_state(self, block..=block)
1473 }
1474 }
1475}
1476
1477#[cfg(test)]
1478mod tests {
1479 use crate::{
1480 providers::blockchain_provider::BlockchainProvider,
1481 test_utils::create_test_provider_factory, BlockWriter,
1482 };
1483 use alloy_eips::BlockHashOrNumber;
1484 use alloy_primitives::B256;
1485 use itertools::Itertools;
1486 use rand::Rng;
1487 use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, NewCanonicalChain};
1488 use reth_db_api::models::AccountBeforeTx;
1489 use reth_ethereum_primitives::Block;
1490 use reth_execution_types::ExecutionOutcome;
1491 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1492 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1493 use reth_testing_utils::generators::{
1494 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1495 };
1496 use revm_database::BundleState;
1497 use std::{
1498 ops::{Bound, Range, RangeBounds},
1499 sync::Arc,
1500 };
1501
1502 const TEST_BLOCKS_COUNT: usize = 5;
1503
1504 fn random_blocks(
1505 rng: &mut impl Rng,
1506 database_blocks: usize,
1507 in_memory_blocks: usize,
1508 requests_count: Option<Range<u8>>,
1509 withdrawals_count: Option<Range<u8>>,
1510 tx_count: impl RangeBounds<u8>,
1511 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1512 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1513
1514 let tx_start = match tx_count.start_bound() {
1515 Bound::Included(&n) | Bound::Excluded(&n) => n,
1516 Bound::Unbounded => u8::MIN,
1517 };
1518 let tx_end = match tx_count.end_bound() {
1519 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1520 Bound::Unbounded => u8::MAX,
1521 };
1522
1523 let blocks = random_block_range(
1524 rng,
1525 0..=block_range,
1526 BlockRangeParams {
1527 parent: Some(B256::ZERO),
1528 tx_count: tx_start..tx_end,
1529 requests_count,
1530 withdrawals_count,
1531 },
1532 );
1533 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1534 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1535 }
1536
1537 #[test]
1538 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1539 let mut rng = generators::rng();
1541 let factory = create_test_provider_factory();
1542
1543 let blocks = random_block_range(
1545 &mut rng,
1546 0..=10,
1547 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1548 );
1549 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1550
1551 let provider_rw = factory.provider_rw()?;
1553 for block in database_blocks {
1554 provider_rw.insert_historical_block(
1555 block.clone().try_recover().expect("failed to seal block with senders"),
1556 )?;
1557 }
1558 provider_rw.commit()?;
1559
1560 let provider = BlockchainProvider::new(factory)?;
1562 let consistent_provider = provider.consistent_provider()?;
1563
1564 let first_db_block = database_blocks.first().unwrap();
1566 let first_in_mem_block = in_memory_blocks.first().unwrap();
1567 let last_in_mem_block = in_memory_blocks.last().unwrap();
1568
1569 assert_eq!(
1571 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1572 None
1573 );
1574 assert_eq!(
1575 consistent_provider
1576 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1577 None
1578 );
1579 assert_eq!(
1581 consistent_provider
1582 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1583 None
1584 );
1585
1586 let in_memory_block_senders =
1588 first_in_mem_block.senders().expect("failed to recover senders");
1589 let chain = NewCanonicalChain::Commit {
1590 new: vec![ExecutedBlockWithTrieUpdates::new(
1591 Arc::new(RecoveredBlock::new_sealed(
1592 first_in_mem_block.clone(),
1593 in_memory_block_senders,
1594 )),
1595 Default::default(),
1596 Default::default(),
1597 Default::default(),
1598 )],
1599 };
1600 consistent_provider.canonical_in_memory_state.update_chain(chain);
1601 let consistent_provider = provider.consistent_provider()?;
1602
1603 assert_eq!(
1605 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1606 Some(first_in_mem_block.clone().into_block())
1607 );
1608 assert_eq!(
1609 consistent_provider
1610 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1611 Some(first_in_mem_block.clone().into_block())
1612 );
1613
1614 assert_eq!(
1616 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1617 Some(first_db_block.clone().into_block())
1618 );
1619 assert_eq!(
1620 consistent_provider
1621 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1622 Some(first_db_block.clone().into_block())
1623 );
1624
1625 assert_eq!(
1627 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1628 None
1629 );
1630
1631 provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1633 block: ExecutedBlock {
1634 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1635 last_in_mem_block.clone(),
1636 Default::default(),
1637 )),
1638 execution_output: Default::default(),
1639 hashed_state: Default::default(),
1640 },
1641 trie: Default::default(),
1642 });
1643
1644 assert_eq!(
1646 consistent_provider
1647 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1648 Some(last_in_mem_block.clone_block())
1649 );
1650
1651 Ok(())
1652 }
1653
1654 #[test]
1655 fn test_block_reader_block() -> eyre::Result<()> {
1656 let mut rng = generators::rng();
1658 let factory = create_test_provider_factory();
1659
1660 let blocks = random_block_range(
1662 &mut rng,
1663 0..=10,
1664 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1665 );
1666 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1667
1668 let provider_rw = factory.provider_rw()?;
1670 for block in database_blocks {
1671 provider_rw.insert_historical_block(
1672 block.clone().try_recover().expect("failed to seal block with senders"),
1673 )?;
1674 }
1675 provider_rw.commit()?;
1676
1677 let provider = BlockchainProvider::new(factory)?;
1679 let consistent_provider = provider.consistent_provider()?;
1680
1681 let first_in_mem_block = in_memory_blocks.first().unwrap();
1683 let first_db_block = database_blocks.first().unwrap();
1685
1686 assert_eq!(
1688 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1689 None
1690 );
1691 assert_eq!(
1692 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1693 None
1694 );
1695
1696 let in_memory_block_senders =
1698 first_in_mem_block.senders().expect("failed to recover senders");
1699 let chain = NewCanonicalChain::Commit {
1700 new: vec![ExecutedBlockWithTrieUpdates::new(
1701 Arc::new(RecoveredBlock::new_sealed(
1702 first_in_mem_block.clone(),
1703 in_memory_block_senders,
1704 )),
1705 Default::default(),
1706 Default::default(),
1707 Default::default(),
1708 )],
1709 };
1710 consistent_provider.canonical_in_memory_state.update_chain(chain);
1711
1712 let consistent_provider = provider.consistent_provider()?;
1713
1714 assert_eq!(
1716 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1717 Some(first_in_mem_block.clone().into_block())
1718 );
1719 assert_eq!(
1720 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1721 Some(first_in_mem_block.clone().into_block())
1722 );
1723
1724 assert_eq!(
1726 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1727 Some(first_db_block.clone().into_block())
1728 );
1729 assert_eq!(
1730 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1731 Some(first_db_block.clone().into_block())
1732 );
1733
1734 Ok(())
1735 }
1736
1737 #[test]
1738 fn test_changeset_reader() -> eyre::Result<()> {
1739 let mut rng = generators::rng();
1740
1741 let (database_blocks, in_memory_blocks) =
1742 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1743
1744 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1745 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1746 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1747
1748 let accounts = random_eoa_accounts(&mut rng, 2);
1749
1750 let (database_changesets, database_state) = random_changeset_range(
1751 &mut rng,
1752 &database_blocks,
1753 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1754 0..0,
1755 0..0,
1756 );
1757 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1758 &mut rng,
1759 &in_memory_blocks,
1760 database_state
1761 .iter()
1762 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1763 0..0,
1764 0..0,
1765 );
1766
1767 let factory = create_test_provider_factory();
1768
1769 let provider_rw = factory.provider_rw()?;
1770 provider_rw.append_blocks_with_state(
1771 database_blocks
1772 .into_iter()
1773 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1774 .collect(),
1775 &ExecutionOutcome {
1776 bundle: BundleState::new(
1777 database_state.into_iter().map(|(address, (account, _))| {
1778 (address, None, Some(account.into()), Default::default())
1779 }),
1780 database_changesets
1781 .iter()
1782 .map(|block_changesets| {
1783 block_changesets.iter().map(|(address, account, _)| {
1784 (*address, Some(Some((*account).into())), [])
1785 })
1786 })
1787 .collect::<Vec<_>>(),
1788 Vec::new(),
1789 ),
1790 first_block: first_database_block,
1791 ..Default::default()
1792 },
1793 Default::default(),
1794 Default::default(),
1795 )?;
1796 provider_rw.commit()?;
1797
1798 let provider = BlockchainProvider::new(factory)?;
1799
1800 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1801 let chain = NewCanonicalChain::Commit {
1802 new: vec![in_memory_blocks
1803 .first()
1804 .map(|block| {
1805 let senders = block.senders().expect("failed to recover senders");
1806 ExecutedBlockWithTrieUpdates::new(
1807 Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1808 Arc::new(ExecutionOutcome {
1809 bundle: BundleState::new(
1810 in_memory_state.into_iter().map(|(address, (account, _))| {
1811 (address, None, Some(account.into()), Default::default())
1812 }),
1813 [in_memory_changesets.iter().map(|(address, account, _)| {
1814 (*address, Some(Some((*account).into())), Vec::new())
1815 })],
1816 [],
1817 ),
1818 first_block: first_in_memory_block,
1819 ..Default::default()
1820 }),
1821 Default::default(),
1822 Default::default(),
1823 )
1824 })
1825 .unwrap()],
1826 };
1827 provider.canonical_in_memory_state.update_chain(chain);
1828
1829 let consistent_provider = provider.consistent_provider()?;
1830
1831 assert_eq!(
1832 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1833 database_changesets
1834 .into_iter()
1835 .next_back()
1836 .unwrap()
1837 .into_iter()
1838 .sorted_by_key(|(address, _, _)| *address)
1839 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1840 .collect::<Vec<_>>()
1841 );
1842 assert_eq!(
1843 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1844 in_memory_changesets
1845 .into_iter()
1846 .sorted_by_key(|(address, _, _)| *address)
1847 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1848 .collect::<Vec<_>>()
1849 );
1850
1851 Ok(())
1852 }
1853}