1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4 AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
5 BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider, ProviderError,
6 PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader,
7 StateReader, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, TrieReader,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
12 HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::ChainInfo;
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
24use reth_prune_types::{PruneCheckpoint, PruneSegment};
25use reth_stages_types::{StageCheckpoint, StageId};
26use reth_static_file_types::StaticFileSegment;
27use reth_storage_api::{
28 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
29 StorageChangeSetReader, TryIntoHistoricalStateProvider,
30};
31use reth_storage_errors::provider::ProviderResult;
32use reth_trie::updates::TrieUpdatesSorted;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
221 &self,
222 account_changeset: Vec<(u64, AccountBeforeTx)>,
223 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224 block_range_end: BlockNumber,
225 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226 let mut state: BundleStateInit = HashMap::default();
227 let mut reverts: RevertsInit = HashMap::default();
228 let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230 for (block_number, account_before) in account_changeset.into_iter().rev() {
232 let AccountBeforeTx { info: old_info, address } = account_before;
233 match state.entry(address) {
234 hash_map::Entry::Vacant(entry) => {
235 let new_info = state_provider.basic_account(&address)?;
236 entry.insert((old_info, new_info, HashMap::default()));
237 }
238 hash_map::Entry::Occupied(mut entry) => {
239 entry.get_mut().0 = old_info;
241 }
242 }
243 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245 }
246
247 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249 let BlockNumberAddress((block_number, address)) = block_and_address;
250 let account_state = match state.entry(address) {
252 hash_map::Entry::Vacant(entry) => {
253 let present_info = state_provider.basic_account(&address)?;
254 entry.insert((present_info, present_info, HashMap::default()))
255 }
256 hash_map::Entry::Occupied(entry) => entry.into_mut(),
257 };
258
259 match account_state.2.entry(old_storage.key) {
261 hash_map::Entry::Vacant(entry) => {
262 let new_storage_value =
263 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264 entry.insert((old_storage.value, new_storage_value));
265 }
266 hash_map::Entry::Occupied(mut entry) => {
267 entry.get_mut().0 = old_storage.value;
268 }
269 };
270
271 reverts
272 .entry(block_number)
273 .or_default()
274 .entry(address)
275 .or_default()
276 .1
277 .push(old_storage);
278 }
279
280 Ok((state, reverts))
281 }
282
283 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295 &self,
296 range: impl RangeBounds<BlockNumber>,
297 fetch_db_range: F,
298 map_block_state_item: G,
299 mut predicate: P,
300 ) -> ProviderResult<Vec<T>>
301 where
302 F: FnOnce(
303 &DatabaseProviderRO<N::DB, N>,
304 RangeInclusive<BlockNumber>,
305 &mut P,
306 ) -> ProviderResult<Vec<T>>,
307 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308 P: FnMut(&T) -> bool,
309 {
310 let mut in_memory_chain =
318 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319 let db_provider = &self.storage_provider;
320
321 let (start, end) = self.convert_range_bounds(range, || {
322 in_memory_chain
324 .first()
325 .map(|b| b.number())
326 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327 });
328
329 if start > end {
330 return Ok(vec![])
331 }
332
333 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338 Some(lowest_memory_block) if lowest_memory_block <= end => {
339 let highest_memory_block =
340 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342 let in_memory_range =
346 lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348 in_memory_chain.truncate(
351 in_memory_chain
352 .len()
353 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354 );
355
356 let storage_range =
357 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359 (Some((in_memory_chain, in_memory_range)), storage_range)
360 }
361 _ => {
362 drop(in_memory_chain);
364
365 (None, Some(start..=end))
366 }
367 };
368
369 let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371 if let Some(storage_range) = storage_range {
372 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373 items.append(&mut db_items);
374
375 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378 return Ok(items)
379 }
380 }
381
382 if let Some((in_memory_chain, in_memory_range)) = in_memory {
383 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384 debug_assert!(num == block.number());
385 if let Some(item) = map_block_state_item(block, &mut predicate) {
386 items.push(item);
387 } else {
388 break
389 }
390 }
391 }
392
393 Ok(items)
394 }
395
396 fn block_state_provider_ref(
398 &self,
399 state: &BlockState<N::Primitives>,
400 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401 let anchor_hash = state.anchor().hash;
402 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405 }
406
407 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413 &self,
414 range: impl RangeBounds<BlockNumber>,
415 fetch_from_db: S,
416 fetch_from_block_state: M,
417 ) -> ProviderResult<Vec<R>>
418 where
419 S: FnOnce(
420 &DatabaseProviderRO<N::DB, N>,
421 RangeInclusive<TxNumber>,
422 ) -> ProviderResult<Vec<R>>,
423 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424 {
425 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426 let provider = &self.storage_provider;
427
428 let last_database_block_number = in_mem_chain
431 .last()
432 .map(|b| Ok(b.anchor().number))
433 .unwrap_or_else(|| provider.last_block_number())?;
434
435 let last_block_body_index = provider
438 .block_body_indices(last_database_block_number)?
439 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442 let (start, end) = self.convert_range_bounds(range, || {
443 in_mem_chain
444 .iter()
445 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446 .sum::<u64>() +
447 last_block_body_index.last_tx_num()
448 });
449
450 if start > end {
451 return Ok(vec![])
452 }
453
454 let mut tx_range = start..=end;
455
456 if *tx_range.end() < in_memory_tx_num {
459 return fetch_from_db(provider, tx_range);
460 }
461
462 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464 if *tx_range.start() < in_memory_tx_num {
466 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469 tx_range = in_memory_tx_num..=*tx_range.end();
471
472 items.extend(fetch_from_db(provider, db_range)?);
473 }
474
475 for block_state in in_mem_chain.iter().rev() {
477 let block_tx_count =
478 block_state.block_ref().recovered_block().body().transactions().len();
479 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484 in_memory_tx_num += block_tx_count as u64;
485 continue
486 }
487
488 let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491 items.extend(fetch_from_block_state(
492 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493 block_state,
494 )?);
495
496 in_memory_tx_num += block_tx_count as u64;
497
498 if in_memory_tx_num > *tx_range.end() {
500 break
501 }
502
503 tx_range = in_memory_tx_num..=*tx_range.end();
505 }
506
507 Ok(items)
508 }
509
510 fn get_in_memory_or_storage_by_tx<S, M, R>(
513 &self,
514 id: HashOrNumber,
515 fetch_from_db: S,
516 fetch_from_block_state: M,
517 ) -> ProviderResult<Option<R>>
518 where
519 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521 {
522 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523 let provider = &self.storage_provider;
524
525 let last_database_block_number = in_mem_chain
528 .last()
529 .map(|b| Ok(b.anchor().number))
530 .unwrap_or_else(|| provider.last_block_number())?;
531
532 let last_block_body_index = provider
535 .block_body_indices(last_database_block_number)?
536 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539 if let HashOrNumber::Number(id) = id &&
542 id < in_memory_tx_num
543 {
544 return fetch_from_db(provider)
545 }
546
547 for block_state in in_mem_chain.iter().rev() {
549 let executed_block = block_state.block_ref();
550 let block = executed_block.recovered_block();
551
552 for tx_index in 0..block.body().transactions().len() {
553 match id {
554 HashOrNumber::Hash(tx_hash) => {
555 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557 }
558 }
559 HashOrNumber::Number(id) => {
560 if id == in_memory_tx_num {
561 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562 }
563 }
564 }
565
566 in_memory_tx_num += 1;
567 }
568 }
569
570 if let HashOrNumber::Hash(_) = id {
572 return fetch_from_db(provider)
573 }
574
575 Ok(None)
576 }
577
578 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580 &self,
581 id: BlockHashOrNumber,
582 fetch_from_db: S,
583 fetch_from_block_state: M,
584 ) -> ProviderResult<R>
585 where
586 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588 {
589 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590 return fetch_from_block_state(block_state)
591 }
592 fetch_from_db(&self.storage_provider)
593 }
594
595 pub(crate) fn into_state_provider_at_block_hash(
597 self,
598 block_hash: BlockHash,
599 ) -> ProviderResult<Box<dyn StateProvider>> {
600 let Self { storage_provider, head_block, .. } = self;
601 let into_history_at_block_hash = |block_hash| -> ProviderResult<Box<dyn StateProvider>> {
602 let block_number = storage_provider
603 .block_number(block_hash)?
604 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
605 storage_provider.try_into_history_at_block(block_number)
606 };
607 if let Some(Some(block_state)) =
608 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
609 {
610 let anchor_hash = block_state.anchor().hash;
611 let latest_historical = into_history_at_block_hash(anchor_hash)?;
612 return Ok(Box::new(block_state.state_provider(latest_historical)));
613 }
614 into_history_at_block_hash(block_hash)
615 }
616}
617
618impl<N: ProviderNodeTypes> ConsistentProvider<N> {
619 #[inline]
628 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
629 let latest = self.best_block_number()?;
630 if block_number > latest {
631 Err(ProviderError::HeaderNotFound(block_number.into()))
632 } else {
633 Ok(())
634 }
635 }
636}
637
638impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
639 type Primitives = N::Primitives;
640}
641
642impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
643 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
644 self.storage_provider.static_file_provider()
645 }
646
647 fn get_static_file_writer(
648 &self,
649 block: BlockNumber,
650 segment: StaticFileSegment,
651 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
652 self.storage_provider.get_static_file_writer(block, segment)
653 }
654}
655
656impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
657 type Header = HeaderTy<N>;
658
659 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
660 self.get_in_memory_or_storage_by_block(
661 block_hash.into(),
662 |db_provider| db_provider.header(block_hash),
663 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
664 )
665 }
666
667 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
668 self.get_in_memory_or_storage_by_block(
669 num.into(),
670 |db_provider| db_provider.header_by_number(num),
671 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
672 )
673 }
674
675 fn headers_range(
676 &self,
677 range: impl RangeBounds<BlockNumber>,
678 ) -> ProviderResult<Vec<Self::Header>> {
679 self.get_in_memory_or_storage_by_block_range_while(
680 range,
681 |db_provider, range, _| db_provider.headers_range(range),
682 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
683 |_| true,
684 )
685 }
686
687 fn sealed_header(
688 &self,
689 number: BlockNumber,
690 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
691 self.get_in_memory_or_storage_by_block(
692 number.into(),
693 |db_provider| db_provider.sealed_header(number),
694 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
695 )
696 }
697
698 fn sealed_headers_range(
699 &self,
700 range: impl RangeBounds<BlockNumber>,
701 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
702 self.get_in_memory_or_storage_by_block_range_while(
703 range,
704 |db_provider, range, _| db_provider.sealed_headers_range(range),
705 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
706 |_| true,
707 )
708 }
709
710 fn sealed_headers_while(
711 &self,
712 range: impl RangeBounds<BlockNumber>,
713 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
714 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
715 self.get_in_memory_or_storage_by_block_range_while(
716 range,
717 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
718 |block_state, predicate| {
719 let header = block_state.block_ref().recovered_block().sealed_header();
720 predicate(header).then(|| header.clone())
721 },
722 predicate,
723 )
724 }
725}
726
727impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
728 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
729 self.get_in_memory_or_storage_by_block(
730 number.into(),
731 |db_provider| db_provider.block_hash(number),
732 |block_state| Ok(Some(block_state.hash())),
733 )
734 }
735
736 fn canonical_hashes_range(
737 &self,
738 start: BlockNumber,
739 end: BlockNumber,
740 ) -> ProviderResult<Vec<B256>> {
741 self.get_in_memory_or_storage_by_block_range_while(
742 start..end,
743 |db_provider, inclusive_range, _| {
744 db_provider
745 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
746 },
747 |block_state, _| Some(block_state.hash()),
748 |_| true,
749 )
750 }
751}
752
753impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
754 fn chain_info(&self) -> ProviderResult<ChainInfo> {
755 let best_number = self.best_block_number()?;
756 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
757 }
758
759 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
760 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
761 }
762
763 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
764 self.storage_provider.last_block_number()
765 }
766
767 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
768 self.get_in_memory_or_storage_by_block(
769 hash.into(),
770 |db_provider| db_provider.block_number(hash),
771 |block_state| Ok(Some(block_state.number())),
772 )
773 }
774}
775
776impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
777 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
778 Ok(self.canonical_in_memory_state.pending_block_num_hash())
779 }
780
781 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
782 Ok(self.canonical_in_memory_state.get_safe_num_hash())
783 }
784
785 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
786 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
787 }
788}
789
790impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
791 type Block = BlockTy<N>;
792
793 fn find_block_by_hash(
794 &self,
795 hash: B256,
796 source: BlockSource,
797 ) -> ProviderResult<Option<Self::Block>> {
798 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
799 let Some(block) = self.get_in_memory_or_storage_by_block(
800 hash.into(),
801 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
802 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
803 )?
804 {
805 return Ok(Some(block))
806 }
807
808 if matches!(source, BlockSource::Pending | BlockSource::Any) {
809 return Ok(self
810 .canonical_in_memory_state
811 .pending_block()
812 .filter(|b| b.hash() == hash)
813 .map(|b| b.into_block()))
814 }
815
816 Ok(None)
817 }
818
819 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
820 self.get_in_memory_or_storage_by_block(
821 id,
822 |db_provider| db_provider.block(id),
823 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
824 )
825 }
826
827 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
828 Ok(self.canonical_in_memory_state.pending_recovered_block())
829 }
830
831 fn pending_block_and_receipts(
832 &self,
833 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
834 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
835 }
836
837 fn recovered_block(
844 &self,
845 id: BlockHashOrNumber,
846 transaction_kind: TransactionVariant,
847 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
848 self.get_in_memory_or_storage_by_block(
849 id,
850 |db_provider| db_provider.recovered_block(id, transaction_kind),
851 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
852 )
853 }
854
855 fn sealed_block_with_senders(
856 &self,
857 id: BlockHashOrNumber,
858 transaction_kind: TransactionVariant,
859 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
860 self.get_in_memory_or_storage_by_block(
861 id,
862 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
863 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
864 )
865 }
866
867 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
868 self.get_in_memory_or_storage_by_block_range_while(
869 range,
870 |db_provider, range, _| db_provider.block_range(range),
871 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
872 |_| true,
873 )
874 }
875
876 fn block_with_senders_range(
877 &self,
878 range: RangeInclusive<BlockNumber>,
879 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
880 self.get_in_memory_or_storage_by_block_range_while(
881 range,
882 |db_provider, range, _| db_provider.block_with_senders_range(range),
883 |block_state, _| Some(block_state.block().recovered_block().clone()),
884 |_| true,
885 )
886 }
887
888 fn recovered_block_range(
889 &self,
890 range: RangeInclusive<BlockNumber>,
891 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
892 self.get_in_memory_or_storage_by_block_range_while(
893 range,
894 |db_provider, range, _| db_provider.recovered_block_range(range),
895 |block_state, _| Some(block_state.block().recovered_block().clone()),
896 |_| true,
897 )
898 }
899
900 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
901 self.get_in_memory_or_storage_by_tx(
902 id.into(),
903 |db_provider| db_provider.block_by_transaction_id(id),
904 |_, _, block_state| Ok(Some(block_state.number())),
905 )
906 }
907}
908
909impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
910 type Transaction = TxTy<N>;
911
912 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
913 self.get_in_memory_or_storage_by_tx(
914 tx_hash.into(),
915 |db_provider| db_provider.transaction_id(tx_hash),
916 |_, tx_number, _| Ok(Some(tx_number)),
917 )
918 }
919
920 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
921 self.get_in_memory_or_storage_by_tx(
922 id.into(),
923 |provider| provider.transaction_by_id(id),
924 |tx_index, _, block_state| {
925 Ok(block_state
926 .block_ref()
927 .recovered_block()
928 .body()
929 .transactions()
930 .get(tx_index)
931 .cloned())
932 },
933 )
934 }
935
936 fn transaction_by_id_unhashed(
937 &self,
938 id: TxNumber,
939 ) -> ProviderResult<Option<Self::Transaction>> {
940 self.get_in_memory_or_storage_by_tx(
941 id.into(),
942 |provider| provider.transaction_by_id_unhashed(id),
943 |tx_index, _, block_state| {
944 Ok(block_state
945 .block_ref()
946 .recovered_block()
947 .body()
948 .transactions()
949 .get(tx_index)
950 .cloned())
951 },
952 )
953 }
954
955 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
956 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
957 return Ok(Some(tx))
958 }
959
960 self.storage_provider.transaction_by_hash(hash)
961 }
962
963 fn transaction_by_hash_with_meta(
964 &self,
965 tx_hash: TxHash,
966 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
967 if let Some((tx, meta)) =
968 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
969 {
970 return Ok(Some((tx, meta)))
971 }
972
973 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
974 }
975
976 fn transactions_by_block(
977 &self,
978 id: BlockHashOrNumber,
979 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
980 self.get_in_memory_or_storage_by_block(
981 id,
982 |provider| provider.transactions_by_block(id),
983 |block_state| {
984 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
985 },
986 )
987 }
988
989 fn transactions_by_block_range(
990 &self,
991 range: impl RangeBounds<BlockNumber>,
992 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
993 self.get_in_memory_or_storage_by_block_range_while(
994 range,
995 |db_provider, range, _| db_provider.transactions_by_block_range(range),
996 |block_state, _| {
997 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
998 },
999 |_| true,
1000 )
1001 }
1002
1003 fn transactions_by_tx_range(
1004 &self,
1005 range: impl RangeBounds<TxNumber>,
1006 ) -> ProviderResult<Vec<Self::Transaction>> {
1007 self.get_in_memory_or_storage_by_tx_range(
1008 range,
1009 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1010 |index_range, block_state| {
1011 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1012 .to_vec())
1013 },
1014 )
1015 }
1016
1017 fn senders_by_tx_range(
1018 &self,
1019 range: impl RangeBounds<TxNumber>,
1020 ) -> ProviderResult<Vec<Address>> {
1021 self.get_in_memory_or_storage_by_tx_range(
1022 range,
1023 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1024 |index_range, block_state| {
1025 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1026 },
1027 )
1028 }
1029
1030 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1031 self.get_in_memory_or_storage_by_tx(
1032 id.into(),
1033 |provider| provider.transaction_sender(id),
1034 |tx_index, _, block_state| {
1035 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1036 },
1037 )
1038 }
1039}
1040
1041impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1042 type Receipt = ReceiptTy<N>;
1043
1044 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1045 self.get_in_memory_or_storage_by_tx(
1046 id.into(),
1047 |provider| provider.receipt(id),
1048 |tx_index, _, block_state| {
1049 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1050 },
1051 )
1052 }
1053
1054 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1055 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1056 let executed_block = block_state.block_ref();
1057 let block = executed_block.recovered_block();
1058 let receipts = block_state.executed_block_receipts();
1059
1060 debug_assert_eq!(
1062 block.body().transactions().len(),
1063 receipts.len(),
1064 "Mismatch between transaction and receipt count"
1065 );
1066
1067 if let Some(tx_index) =
1068 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1069 {
1070 return Ok(receipts.get(tx_index).cloned());
1072 }
1073 }
1074
1075 self.storage_provider.receipt_by_hash(hash)
1076 }
1077
1078 fn receipts_by_block(
1079 &self,
1080 block: BlockHashOrNumber,
1081 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1082 self.get_in_memory_or_storage_by_block(
1083 block,
1084 |db_provider| db_provider.receipts_by_block(block),
1085 |block_state| Ok(Some(block_state.executed_block_receipts())),
1086 )
1087 }
1088
1089 fn receipts_by_tx_range(
1090 &self,
1091 range: impl RangeBounds<TxNumber>,
1092 ) -> ProviderResult<Vec<Self::Receipt>> {
1093 self.get_in_memory_or_storage_by_tx_range(
1094 range,
1095 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1096 |index_range, block_state| {
1097 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1098 },
1099 )
1100 }
1101
1102 fn receipts_by_block_range(
1103 &self,
1104 block_range: RangeInclusive<BlockNumber>,
1105 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1106 self.storage_provider.receipts_by_block_range(block_range)
1107 }
1108}
1109
1110impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1111 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1112 match block {
1113 BlockId::Hash(rpc_block_hash) => {
1114 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1115 if receipts.is_none() &&
1116 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1117 let Some(state) = self
1118 .head_block
1119 .as_ref()
1120 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1121 {
1122 receipts = Some(state.executed_block_receipts());
1123 }
1124 Ok(receipts)
1125 }
1126 BlockId::Number(num_tag) => match num_tag {
1127 BlockNumberOrTag::Pending => Ok(self
1128 .canonical_in_memory_state
1129 .pending_state()
1130 .map(|block_state| block_state.executed_block_receipts())),
1131 _ => {
1132 if let Some(num) = self.convert_block_number(num_tag)? {
1133 self.receipts_by_block(num.into())
1134 } else {
1135 Ok(None)
1136 }
1137 }
1138 },
1139 }
1140 }
1141}
1142
1143impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1144 fn block_body_indices(
1145 &self,
1146 number: BlockNumber,
1147 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1148 self.get_in_memory_or_storage_by_block(
1149 number.into(),
1150 |db_provider| db_provider.block_body_indices(number),
1151 |block_state| {
1152 let last_storage_block_number = block_state.anchor().number;
1154 let mut stored_indices = self
1155 .storage_provider
1156 .block_body_indices(last_storage_block_number)?
1157 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1158
1159 stored_indices.first_tx_num = stored_indices.next_tx_num();
1161 stored_indices.tx_count = 0;
1162
1163 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1165 let block_tx_count =
1166 state.block_ref().recovered_block().body().transactions().len() as u64;
1167 if state.block_ref().recovered_block().number() == number {
1168 stored_indices.tx_count = block_tx_count;
1169 } else {
1170 stored_indices.first_tx_num += block_tx_count;
1171 }
1172 }
1173
1174 Ok(Some(stored_indices))
1175 },
1176 )
1177 }
1178
1179 fn block_body_indices_range(
1180 &self,
1181 range: RangeInclusive<BlockNumber>,
1182 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1183 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1184 }
1185}
1186
1187impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1188 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1189 self.storage_provider.get_stage_checkpoint(id)
1190 }
1191
1192 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1193 self.storage_provider.get_stage_checkpoint_progress(id)
1194 }
1195
1196 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1197 self.storage_provider.get_all_checkpoints()
1198 }
1199}
1200
1201impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1202 fn get_prune_checkpoint(
1203 &self,
1204 segment: PruneSegment,
1205 ) -> ProviderResult<Option<PruneCheckpoint>> {
1206 self.storage_provider.get_prune_checkpoint(segment)
1207 }
1208
1209 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1210 self.storage_provider.get_prune_checkpoints()
1211 }
1212}
1213
1214impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1215 type ChainSpec = N::ChainSpec;
1216
1217 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1218 ChainSpecProvider::chain_spec(&self.storage_provider)
1219 }
1220}
1221
1222impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1223 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1224 match id {
1225 BlockId::Number(num) => self.block_by_number_or_tag(num),
1226 BlockId::Hash(hash) => {
1227 if Some(true) == hash.require_canonical {
1232 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1234 } else {
1235 self.block_by_hash(hash.block_hash)
1236 }
1237 }
1238 }
1239 }
1240
1241 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1242 Ok(match id {
1243 BlockNumberOrTag::Latest => {
1244 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1245 }
1246 BlockNumberOrTag::Finalized => {
1247 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1248 }
1249 BlockNumberOrTag::Safe => {
1250 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1251 }
1252 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1253 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1254
1255 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1256 })
1257 }
1258
1259 fn sealed_header_by_number_or_tag(
1260 &self,
1261 id: BlockNumberOrTag,
1262 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1263 match id {
1264 BlockNumberOrTag::Latest => {
1265 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1266 }
1267 BlockNumberOrTag::Finalized => {
1268 Ok(self.canonical_in_memory_state.get_finalized_header())
1269 }
1270 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1271 BlockNumberOrTag::Earliest => self
1272 .header_by_number(self.earliest_block_number()?)?
1273 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1274 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1275 BlockNumberOrTag::Number(num) => self
1276 .header_by_number(num)?
1277 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1278 }
1279 }
1280
1281 fn sealed_header_by_id(
1282 &self,
1283 id: BlockId,
1284 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1285 Ok(match id {
1286 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1287 BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1288 })
1289 }
1290
1291 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1292 Ok(match id {
1293 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1294 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1295 })
1296 }
1297}
1298
1299impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1300 fn storage_changeset(
1301 &self,
1302 block_number: BlockNumber,
1303 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1304 if let Some(state) =
1305 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1306 {
1307 let changesets = state
1308 .block()
1309 .execution_output
1310 .bundle
1311 .reverts
1312 .clone()
1313 .to_plain_state_reverts()
1314 .storage
1315 .into_iter()
1316 .flatten()
1317 .flat_map(|revert: PlainStorageRevert| {
1318 revert.storage_revert.into_iter().map(move |(key, value)| {
1319 (
1320 BlockNumberAddress((block_number, revert.address)),
1321 StorageEntry { key: key.into(), value: value.to_previous_value() },
1322 )
1323 })
1324 })
1325 .collect();
1326 Ok(changesets)
1327 } else {
1328 let storage_history_exists = self
1332 .storage_provider
1333 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1334 .and_then(|checkpoint| {
1335 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1340 })
1341 .unwrap_or(true);
1342
1343 if !storage_history_exists {
1344 return Err(ProviderError::StateAtBlockPruned(block_number))
1345 }
1346
1347 self.storage_provider.storage_changeset(block_number)
1348 }
1349 }
1350}
1351
1352impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1353 fn account_block_changeset(
1354 &self,
1355 block_number: BlockNumber,
1356 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1357 if let Some(state) =
1358 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1359 {
1360 let changesets = state
1361 .block_ref()
1362 .execution_output
1363 .bundle
1364 .reverts
1365 .clone()
1366 .to_plain_state_reverts()
1367 .accounts
1368 .into_iter()
1369 .flatten()
1370 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1371 .collect();
1372 Ok(changesets)
1373 } else {
1374 let account_history_exists = self
1378 .storage_provider
1379 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1380 .and_then(|checkpoint| {
1381 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1386 })
1387 .unwrap_or(true);
1388
1389 if !account_history_exists {
1390 return Err(ProviderError::StateAtBlockPruned(block_number))
1391 }
1392
1393 self.storage_provider.account_block_changeset(block_number)
1394 }
1395 }
1396
1397 fn get_account_before_block(
1398 &self,
1399 block_number: BlockNumber,
1400 address: Address,
1401 ) -> ProviderResult<Option<AccountBeforeTx>> {
1402 if let Some(state) =
1403 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1404 {
1405 let changeset = state
1407 .block_ref()
1408 .execution_output
1409 .bundle
1410 .reverts
1411 .clone()
1412 .to_plain_state_reverts()
1413 .accounts
1414 .into_iter()
1415 .flatten()
1416 .find(|(addr, _)| addr == &address)
1417 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1418 Ok(changeset)
1419 } else {
1420 let account_history_exists = self
1423 .storage_provider
1424 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1425 .and_then(|checkpoint| {
1426 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1431 })
1432 .unwrap_or(true);
1433
1434 if !account_history_exists {
1435 return Err(ProviderError::StateAtBlockPruned(block_number))
1436 }
1437
1438 self.storage_provider.get_account_before_block(block_number, address)
1440 }
1441 }
1442}
1443
1444impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1445 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1447 let state_provider = self.latest_ref()?;
1449 state_provider.basic_account(address)
1450 }
1451}
1452
1453impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1454 type Receipt = ReceiptTy<N>;
1455
1456 fn get_state(
1466 &self,
1467 block: BlockNumber,
1468 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1469 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1470 let state = state.block_ref().execution_outcome().clone();
1471 Ok(Some(state))
1472 } else {
1473 Self::get_state(self, block..=block)
1474 }
1475 }
1476}
1477
1478impl<N: ProviderNodeTypes> TrieReader for ConsistentProvider<N> {
1479 fn trie_reverts(&self, from: BlockNumber) -> ProviderResult<TrieUpdatesSorted> {
1480 self.storage_provider.trie_reverts(from)
1481 }
1482
1483 fn get_block_trie_updates(
1484 &self,
1485 block_number: BlockNumber,
1486 ) -> ProviderResult<TrieUpdatesSorted> {
1487 self.storage_provider.get_block_trie_updates(block_number)
1488 }
1489}
1490
1491#[cfg(test)]
1492mod tests {
1493 use crate::{
1494 providers::blockchain_provider::BlockchainProvider,
1495 test_utils::create_test_provider_factory, BlockWriter,
1496 };
1497 use alloy_eips::BlockHashOrNumber;
1498 use alloy_primitives::B256;
1499 use itertools::Itertools;
1500 use rand::Rng;
1501 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1502 use reth_db_api::models::AccountBeforeTx;
1503 use reth_ethereum_primitives::Block;
1504 use reth_execution_types::ExecutionOutcome;
1505 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1506 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1507 use reth_testing_utils::generators::{
1508 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1509 };
1510 use revm_database::BundleState;
1511 use std::{
1512 ops::{Bound, Range, RangeBounds},
1513 sync::Arc,
1514 };
1515
1516 const TEST_BLOCKS_COUNT: usize = 5;
1517
1518 fn random_blocks(
1519 rng: &mut impl Rng,
1520 database_blocks: usize,
1521 in_memory_blocks: usize,
1522 requests_count: Option<Range<u8>>,
1523 withdrawals_count: Option<Range<u8>>,
1524 tx_count: impl RangeBounds<u8>,
1525 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1526 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1527
1528 let tx_start = match tx_count.start_bound() {
1529 Bound::Included(&n) | Bound::Excluded(&n) => n,
1530 Bound::Unbounded => u8::MIN,
1531 };
1532 let tx_end = match tx_count.end_bound() {
1533 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1534 Bound::Unbounded => u8::MAX,
1535 };
1536
1537 let blocks = random_block_range(
1538 rng,
1539 0..=block_range,
1540 BlockRangeParams {
1541 parent: Some(B256::ZERO),
1542 tx_count: tx_start..tx_end,
1543 requests_count,
1544 withdrawals_count,
1545 },
1546 );
1547 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1548 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1549 }
1550
1551 #[test]
1552 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1553 let mut rng = generators::rng();
1555 let factory = create_test_provider_factory();
1556
1557 let blocks = random_block_range(
1559 &mut rng,
1560 0..=10,
1561 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1562 );
1563 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1564
1565 let provider_rw = factory.provider_rw()?;
1567 for block in database_blocks {
1568 provider_rw.insert_block(
1569 block.clone().try_recover().expect("failed to seal block with senders"),
1570 )?;
1571 }
1572 provider_rw.commit()?;
1573
1574 let provider = BlockchainProvider::new(factory)?;
1576 let consistent_provider = provider.consistent_provider()?;
1577
1578 let first_db_block = database_blocks.first().unwrap();
1580 let first_in_mem_block = in_memory_blocks.first().unwrap();
1581 let last_in_mem_block = in_memory_blocks.last().unwrap();
1582
1583 assert_eq!(
1585 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1586 None
1587 );
1588 assert_eq!(
1589 consistent_provider
1590 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1591 None
1592 );
1593 assert_eq!(
1595 consistent_provider
1596 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1597 None
1598 );
1599
1600 let in_memory_block_senders =
1602 first_in_mem_block.senders().expect("failed to recover senders");
1603 let chain = NewCanonicalChain::Commit {
1604 new: vec![ExecutedBlock {
1605 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1606 first_in_mem_block.clone(),
1607 in_memory_block_senders,
1608 )),
1609 ..Default::default()
1610 }],
1611 };
1612 consistent_provider.canonical_in_memory_state.update_chain(chain);
1613 let consistent_provider = provider.consistent_provider()?;
1614
1615 assert_eq!(
1617 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1618 Some(first_in_mem_block.clone().into_block())
1619 );
1620 assert_eq!(
1621 consistent_provider
1622 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1623 Some(first_in_mem_block.clone().into_block())
1624 );
1625
1626 assert_eq!(
1628 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1629 Some(first_db_block.clone().into_block())
1630 );
1631 assert_eq!(
1632 consistent_provider
1633 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1634 Some(first_db_block.clone().into_block())
1635 );
1636
1637 assert_eq!(
1639 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1640 None
1641 );
1642
1643 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1645 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1646 last_in_mem_block.clone(),
1647 Default::default(),
1648 )),
1649 ..Default::default()
1650 });
1651
1652 assert_eq!(
1654 consistent_provider
1655 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1656 Some(last_in_mem_block.clone_block())
1657 );
1658
1659 Ok(())
1660 }
1661
1662 #[test]
1663 fn test_block_reader_block() -> eyre::Result<()> {
1664 let mut rng = generators::rng();
1666 let factory = create_test_provider_factory();
1667
1668 let blocks = random_block_range(
1670 &mut rng,
1671 0..=10,
1672 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1673 );
1674 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1675
1676 let provider_rw = factory.provider_rw()?;
1678 for block in database_blocks {
1679 provider_rw.insert_block(
1680 block.clone().try_recover().expect("failed to seal block with senders"),
1681 )?;
1682 }
1683 provider_rw.commit()?;
1684
1685 let provider = BlockchainProvider::new(factory)?;
1687 let consistent_provider = provider.consistent_provider()?;
1688
1689 let first_in_mem_block = in_memory_blocks.first().unwrap();
1691 let first_db_block = database_blocks.first().unwrap();
1693
1694 assert_eq!(
1696 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1697 None
1698 );
1699 assert_eq!(
1700 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1701 None
1702 );
1703
1704 let in_memory_block_senders =
1706 first_in_mem_block.senders().expect("failed to recover senders");
1707 let chain = NewCanonicalChain::Commit {
1708 new: vec![ExecutedBlock {
1709 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1710 first_in_mem_block.clone(),
1711 in_memory_block_senders,
1712 )),
1713 ..Default::default()
1714 }],
1715 };
1716 consistent_provider.canonical_in_memory_state.update_chain(chain);
1717
1718 let consistent_provider = provider.consistent_provider()?;
1719
1720 assert_eq!(
1722 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1723 Some(first_in_mem_block.clone().into_block())
1724 );
1725 assert_eq!(
1726 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1727 Some(first_in_mem_block.clone().into_block())
1728 );
1729
1730 assert_eq!(
1732 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1733 Some(first_db_block.clone().into_block())
1734 );
1735 assert_eq!(
1736 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1737 Some(first_db_block.clone().into_block())
1738 );
1739
1740 Ok(())
1741 }
1742
1743 #[test]
1744 fn test_changeset_reader() -> eyre::Result<()> {
1745 let mut rng = generators::rng();
1746
1747 let (database_blocks, in_memory_blocks) =
1748 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1749
1750 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1751 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1752 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1753
1754 let accounts = random_eoa_accounts(&mut rng, 2);
1755
1756 let (database_changesets, database_state) = random_changeset_range(
1757 &mut rng,
1758 &database_blocks,
1759 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1760 0..0,
1761 0..0,
1762 );
1763 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1764 &mut rng,
1765 &in_memory_blocks,
1766 database_state
1767 .iter()
1768 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1769 0..0,
1770 0..0,
1771 );
1772
1773 let factory = create_test_provider_factory();
1774
1775 let provider_rw = factory.provider_rw()?;
1776 provider_rw.append_blocks_with_state(
1777 database_blocks
1778 .into_iter()
1779 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1780 .collect(),
1781 &ExecutionOutcome {
1782 bundle: BundleState::new(
1783 database_state.into_iter().map(|(address, (account, _))| {
1784 (address, None, Some(account.into()), Default::default())
1785 }),
1786 database_changesets
1787 .iter()
1788 .map(|block_changesets| {
1789 block_changesets.iter().map(|(address, account, _)| {
1790 (*address, Some(Some((*account).into())), [])
1791 })
1792 })
1793 .collect::<Vec<_>>(),
1794 Vec::new(),
1795 ),
1796 first_block: first_database_block,
1797 ..Default::default()
1798 },
1799 Default::default(),
1800 )?;
1801 provider_rw.commit()?;
1802
1803 let provider = BlockchainProvider::new(factory)?;
1804
1805 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1806 let chain = NewCanonicalChain::Commit {
1807 new: vec![in_memory_blocks
1808 .first()
1809 .map(|block| {
1810 let senders = block.senders().expect("failed to recover senders");
1811 ExecutedBlock {
1812 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1813 block.clone(),
1814 senders,
1815 )),
1816 execution_output: Arc::new(ExecutionOutcome {
1817 bundle: BundleState::new(
1818 in_memory_state.into_iter().map(|(address, (account, _))| {
1819 (address, None, Some(account.into()), Default::default())
1820 }),
1821 [in_memory_changesets.iter().map(|(address, account, _)| {
1822 (*address, Some(Some((*account).into())), Vec::new())
1823 })],
1824 [],
1825 ),
1826 first_block: first_in_memory_block,
1827 ..Default::default()
1828 }),
1829 ..Default::default()
1830 }
1831 })
1832 .unwrap()],
1833 };
1834 provider.canonical_in_memory_state.update_chain(chain);
1835
1836 let consistent_provider = provider.consistent_provider()?;
1837
1838 assert_eq!(
1839 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1840 database_changesets
1841 .into_iter()
1842 .next_back()
1843 .unwrap()
1844 .into_iter()
1845 .sorted_by_key(|(address, _, _)| *address)
1846 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1847 .collect::<Vec<_>>()
1848 );
1849 assert_eq!(
1850 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1851 in_memory_changesets
1852 .into_iter()
1853 .sorted_by_key(|(address, _, _)| *address)
1854 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1855 .collect::<Vec<_>>()
1856 );
1857
1858 Ok(())
1859 }
1860}