1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4 to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5 BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8 TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13 HashOrNumber,
14};
15use alloy_primitives::{
16 map::{hash_map, HashMap},
17 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256,
18};
19use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
20use reth_chainspec::ChainInfo;
21use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
22use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
23use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
24use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
25use reth_prune_types::{PruneCheckpoint, PruneSegment};
26use reth_stages_types::{StageCheckpoint, StageId};
27use reth_static_file_types::StaticFileSegment;
28use reth_storage_api::{
29 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
30 StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
221 &self,
222 account_changeset: Vec<(u64, AccountBeforeTx)>,
223 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224 block_range_end: BlockNumber,
225 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226 let mut state: BundleStateInit = HashMap::default();
227 let mut reverts: RevertsInit = HashMap::default();
228 let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230 for (block_number, account_before) in account_changeset.into_iter().rev() {
232 let AccountBeforeTx { info: old_info, address } = account_before;
233 match state.entry(address) {
234 hash_map::Entry::Vacant(entry) => {
235 let new_info = state_provider.basic_account(&address)?;
236 entry.insert((old_info, new_info, HashMap::default()));
237 }
238 hash_map::Entry::Occupied(mut entry) => {
239 entry.get_mut().0 = old_info;
241 }
242 }
243 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245 }
246
247 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249 let BlockNumberAddress((block_number, address)) = block_and_address;
250 let account_state = match state.entry(address) {
252 hash_map::Entry::Vacant(entry) => {
253 let present_info = state_provider.basic_account(&address)?;
254 entry.insert((present_info, present_info, HashMap::default()))
255 }
256 hash_map::Entry::Occupied(entry) => entry.into_mut(),
257 };
258
259 match account_state.2.entry(old_storage.key) {
261 hash_map::Entry::Vacant(entry) => {
262 let new_storage_value =
263 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264 entry.insert((old_storage.value, new_storage_value));
265 }
266 hash_map::Entry::Occupied(mut entry) => {
267 entry.get_mut().0 = old_storage.value;
268 }
269 };
270
271 reverts
272 .entry(block_number)
273 .or_default()
274 .entry(address)
275 .or_default()
276 .1
277 .push(old_storage);
278 }
279
280 Ok((state, reverts))
281 }
282
283 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
295 &self,
296 range: impl RangeBounds<BlockNumber>,
297 fetch_db_range: F,
298 map_block_state_item: G,
299 mut predicate: P,
300 ) -> ProviderResult<Vec<T>>
301 where
302 F: FnOnce(
303 &DatabaseProviderRO<N::DB, N>,
304 RangeInclusive<BlockNumber>,
305 &mut P,
306 ) -> ProviderResult<Vec<T>>,
307 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
308 P: FnMut(&T) -> bool,
309 {
310 let mut in_memory_chain =
318 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
319 let db_provider = &self.storage_provider;
320
321 let (start, end) = self.convert_range_bounds(range, || {
322 in_memory_chain
324 .first()
325 .map(|b| b.number())
326 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
327 });
328
329 if start > end {
330 return Ok(vec![])
331 }
332
333 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
338 Some(lowest_memory_block) if lowest_memory_block <= end => {
339 let highest_memory_block =
340 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
341
342 let in_memory_range =
346 lowest_memory_block.max(start)..=end.min(highest_memory_block);
347
348 in_memory_chain.truncate(
351 in_memory_chain
352 .len()
353 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
354 );
355
356 let storage_range =
357 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
358
359 (Some((in_memory_chain, in_memory_range)), storage_range)
360 }
361 _ => {
362 drop(in_memory_chain);
364
365 (None, Some(start..=end))
366 }
367 };
368
369 let mut items = Vec::with_capacity((end - start + 1) as usize);
370
371 if let Some(storage_range) = storage_range {
372 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
373 items.append(&mut db_items);
374
375 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
378 return Ok(items)
379 }
380 }
381
382 if let Some((in_memory_chain, in_memory_range)) = in_memory {
383 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
384 debug_assert!(num == block.number());
385 if let Some(item) = map_block_state_item(block, &mut predicate) {
386 items.push(item);
387 } else {
388 break
389 }
390 }
391 }
392
393 Ok(items)
394 }
395
396 fn block_state_provider_ref(
398 &self,
399 state: &BlockState<N::Primitives>,
400 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
401 let anchor_hash = state.anchor().hash;
402 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
403 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
404 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
405 }
406
407 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
413 &self,
414 range: impl RangeBounds<BlockNumber>,
415 fetch_from_db: S,
416 fetch_from_block_state: M,
417 ) -> ProviderResult<Vec<R>>
418 where
419 S: FnOnce(
420 &DatabaseProviderRO<N::DB, N>,
421 RangeInclusive<TxNumber>,
422 ) -> ProviderResult<Vec<R>>,
423 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
424 {
425 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
426 let provider = &self.storage_provider;
427
428 let last_database_block_number = in_mem_chain
431 .last()
432 .map(|b| Ok(b.anchor().number))
433 .unwrap_or_else(|| provider.last_block_number())?;
434
435 let last_block_body_index = provider
438 .block_body_indices(last_database_block_number)?
439 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
440 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
441
442 let (start, end) = self.convert_range_bounds(range, || {
443 in_mem_chain
444 .iter()
445 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
446 .sum::<u64>() +
447 last_block_body_index.last_tx_num()
448 });
449
450 if start > end {
451 return Ok(vec![])
452 }
453
454 let mut tx_range = start..=end;
455
456 if *tx_range.end() < in_memory_tx_num {
459 return fetch_from_db(provider, tx_range);
460 }
461
462 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
463
464 if *tx_range.start() < in_memory_tx_num {
466 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
468
469 tx_range = in_memory_tx_num..=*tx_range.end();
471
472 items.extend(fetch_from_db(provider, db_range)?);
473 }
474
475 for block_state in in_mem_chain.iter().rev() {
477 let block_tx_count =
478 block_state.block_ref().recovered_block().body().transactions().len();
479 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
480
481 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
484 in_memory_tx_num += block_tx_count as u64;
485 continue
486 }
487
488 let skip = (tx_range.start() - in_memory_tx_num) as usize;
490
491 items.extend(fetch_from_block_state(
492 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
493 block_state,
494 )?);
495
496 in_memory_tx_num += block_tx_count as u64;
497
498 if in_memory_tx_num > *tx_range.end() {
500 break
501 }
502
503 tx_range = in_memory_tx_num..=*tx_range.end();
505 }
506
507 Ok(items)
508 }
509
510 fn get_in_memory_or_storage_by_tx<S, M, R>(
513 &self,
514 id: HashOrNumber,
515 fetch_from_db: S,
516 fetch_from_block_state: M,
517 ) -> ProviderResult<Option<R>>
518 where
519 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
520 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
521 {
522 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
523 let provider = &self.storage_provider;
524
525 let last_database_block_number = in_mem_chain
528 .last()
529 .map(|b| Ok(b.anchor().number))
530 .unwrap_or_else(|| provider.last_block_number())?;
531
532 let last_block_body_index = provider
535 .block_body_indices(last_database_block_number)?
536 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
537 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
538
539 if let HashOrNumber::Number(id) = id &&
542 id < in_memory_tx_num
543 {
544 return fetch_from_db(provider)
545 }
546
547 for block_state in in_mem_chain.iter().rev() {
549 let executed_block = block_state.block_ref();
550 let block = executed_block.recovered_block();
551
552 for tx_index in 0..block.body().transactions().len() {
553 match id {
554 HashOrNumber::Hash(tx_hash) => {
555 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
556 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
557 }
558 }
559 HashOrNumber::Number(id) => {
560 if id == in_memory_tx_num {
561 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
562 }
563 }
564 }
565
566 in_memory_tx_num += 1;
567 }
568 }
569
570 if let HashOrNumber::Hash(_) = id {
572 return fetch_from_db(provider)
573 }
574
575 Ok(None)
576 }
577
578 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
580 &self,
581 id: BlockHashOrNumber,
582 fetch_from_db: S,
583 fetch_from_block_state: M,
584 ) -> ProviderResult<R>
585 where
586 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
587 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
588 {
589 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
590 return fetch_from_block_state(block_state)
591 }
592 fetch_from_db(&self.storage_provider)
593 }
594
595 pub(crate) fn into_state_provider_at_block_hash(
597 self,
598 block_hash: BlockHash,
599 ) -> ProviderResult<StateProviderBox> {
600 let Self { storage_provider, head_block, .. } = self;
601 let into_history_at_block_hash = |block_hash| -> ProviderResult<StateProviderBox> {
602 let block_number = storage_provider
603 .block_number(block_hash)?
604 .ok_or(ProviderError::BlockHashNotFound(block_hash))?;
605 storage_provider.try_into_history_at_block(block_number)
606 };
607 if let Some(Some(block_state)) =
608 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
609 {
610 let anchor_hash = block_state.anchor().hash;
611 let latest_historical = into_history_at_block_hash(anchor_hash)?;
612 return Ok(Box::new(block_state.state_provider(latest_historical)));
613 }
614 into_history_at_block_hash(block_hash)
615 }
616}
617
618impl<N: ProviderNodeTypes> ConsistentProvider<N> {
619 #[inline]
628 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
629 let latest = self.best_block_number()?;
630 if block_number > latest {
631 Err(ProviderError::HeaderNotFound(block_number.into()))
632 } else {
633 Ok(())
634 }
635 }
636}
637
638impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
639 type Primitives = N::Primitives;
640}
641
642impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
643 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
644 self.storage_provider.static_file_provider()
645 }
646
647 fn get_static_file_writer(
648 &self,
649 block: BlockNumber,
650 segment: StaticFileSegment,
651 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
652 self.storage_provider.get_static_file_writer(block, segment)
653 }
654}
655
656impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
657 type Header = HeaderTy<N>;
658
659 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
660 self.get_in_memory_or_storage_by_block(
661 block_hash.into(),
662 |db_provider| db_provider.header(block_hash),
663 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
664 )
665 }
666
667 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
668 self.get_in_memory_or_storage_by_block(
669 num.into(),
670 |db_provider| db_provider.header_by_number(num),
671 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
672 )
673 }
674
675 fn headers_range(
676 &self,
677 range: impl RangeBounds<BlockNumber>,
678 ) -> ProviderResult<Vec<Self::Header>> {
679 self.get_in_memory_or_storage_by_block_range_while(
680 range,
681 |db_provider, range, _| db_provider.headers_range(range),
682 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
683 |_| true,
684 )
685 }
686
687 fn sealed_header(
688 &self,
689 number: BlockNumber,
690 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
691 self.get_in_memory_or_storage_by_block(
692 number.into(),
693 |db_provider| db_provider.sealed_header(number),
694 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
695 )
696 }
697
698 fn sealed_headers_range(
699 &self,
700 range: impl RangeBounds<BlockNumber>,
701 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
702 self.get_in_memory_or_storage_by_block_range_while(
703 range,
704 |db_provider, range, _| db_provider.sealed_headers_range(range),
705 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
706 |_| true,
707 )
708 }
709
710 fn sealed_headers_while(
711 &self,
712 range: impl RangeBounds<BlockNumber>,
713 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
714 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
715 self.get_in_memory_or_storage_by_block_range_while(
716 range,
717 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
718 |block_state, predicate| {
719 let header = block_state.block_ref().recovered_block().sealed_header();
720 predicate(header).then(|| header.clone())
721 },
722 predicate,
723 )
724 }
725}
726
727impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
728 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
729 self.get_in_memory_or_storage_by_block(
730 number.into(),
731 |db_provider| db_provider.block_hash(number),
732 |block_state| Ok(Some(block_state.hash())),
733 )
734 }
735
736 fn canonical_hashes_range(
737 &self,
738 start: BlockNumber,
739 end: BlockNumber,
740 ) -> ProviderResult<Vec<B256>> {
741 self.get_in_memory_or_storage_by_block_range_while(
742 start..end,
743 |db_provider, inclusive_range, _| {
744 db_provider
745 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
746 },
747 |block_state, _| Some(block_state.hash()),
748 |_| true,
749 )
750 }
751}
752
753impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
754 fn chain_info(&self) -> ProviderResult<ChainInfo> {
755 let best_number = self.best_block_number()?;
756 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
757 }
758
759 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
760 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
761 }
762
763 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
764 self.storage_provider.last_block_number()
765 }
766
767 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
768 self.get_in_memory_or_storage_by_block(
769 hash.into(),
770 |db_provider| db_provider.block_number(hash),
771 |block_state| Ok(Some(block_state.number())),
772 )
773 }
774}
775
776impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
777 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
778 Ok(self.canonical_in_memory_state.pending_block_num_hash())
779 }
780
781 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
782 Ok(self.canonical_in_memory_state.get_safe_num_hash())
783 }
784
785 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
786 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
787 }
788}
789
790impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
791 type Block = BlockTy<N>;
792
793 fn find_block_by_hash(
794 &self,
795 hash: B256,
796 source: BlockSource,
797 ) -> ProviderResult<Option<Self::Block>> {
798 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
799 let Some(block) = self.get_in_memory_or_storage_by_block(
800 hash.into(),
801 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
802 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
803 )?
804 {
805 return Ok(Some(block))
806 }
807
808 if matches!(source, BlockSource::Pending | BlockSource::Any) {
809 return Ok(self
810 .canonical_in_memory_state
811 .pending_block()
812 .filter(|b| b.hash() == hash)
813 .map(|b| b.into_block()))
814 }
815
816 Ok(None)
817 }
818
819 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
820 self.get_in_memory_or_storage_by_block(
821 id,
822 |db_provider| db_provider.block(id),
823 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
824 )
825 }
826
827 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
828 Ok(self.canonical_in_memory_state.pending_recovered_block())
829 }
830
831 fn pending_block_and_receipts(
832 &self,
833 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
834 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
835 }
836
837 fn recovered_block(
844 &self,
845 id: BlockHashOrNumber,
846 transaction_kind: TransactionVariant,
847 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
848 self.get_in_memory_or_storage_by_block(
849 id,
850 |db_provider| db_provider.recovered_block(id, transaction_kind),
851 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
852 )
853 }
854
855 fn sealed_block_with_senders(
856 &self,
857 id: BlockHashOrNumber,
858 transaction_kind: TransactionVariant,
859 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
860 self.get_in_memory_or_storage_by_block(
861 id,
862 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
863 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
864 )
865 }
866
867 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
868 self.get_in_memory_or_storage_by_block_range_while(
869 range,
870 |db_provider, range, _| db_provider.block_range(range),
871 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
872 |_| true,
873 )
874 }
875
876 fn block_with_senders_range(
877 &self,
878 range: RangeInclusive<BlockNumber>,
879 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
880 self.get_in_memory_or_storage_by_block_range_while(
881 range,
882 |db_provider, range, _| db_provider.block_with_senders_range(range),
883 |block_state, _| Some(block_state.block().recovered_block().clone()),
884 |_| true,
885 )
886 }
887
888 fn recovered_block_range(
889 &self,
890 range: RangeInclusive<BlockNumber>,
891 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
892 self.get_in_memory_or_storage_by_block_range_while(
893 range,
894 |db_provider, range, _| db_provider.recovered_block_range(range),
895 |block_state, _| Some(block_state.block().recovered_block().clone()),
896 |_| true,
897 )
898 }
899
900 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
901 self.get_in_memory_or_storage_by_tx(
902 id.into(),
903 |db_provider| db_provider.block_by_transaction_id(id),
904 |_, _, block_state| Ok(Some(block_state.number())),
905 )
906 }
907}
908
909impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
910 type Transaction = TxTy<N>;
911
912 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
913 self.get_in_memory_or_storage_by_tx(
914 tx_hash.into(),
915 |db_provider| db_provider.transaction_id(tx_hash),
916 |_, tx_number, _| Ok(Some(tx_number)),
917 )
918 }
919
920 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
921 self.get_in_memory_or_storage_by_tx(
922 id.into(),
923 |provider| provider.transaction_by_id(id),
924 |tx_index, _, block_state| {
925 Ok(block_state
926 .block_ref()
927 .recovered_block()
928 .body()
929 .transactions()
930 .get(tx_index)
931 .cloned())
932 },
933 )
934 }
935
936 fn transaction_by_id_unhashed(
937 &self,
938 id: TxNumber,
939 ) -> ProviderResult<Option<Self::Transaction>> {
940 self.get_in_memory_or_storage_by_tx(
941 id.into(),
942 |provider| provider.transaction_by_id_unhashed(id),
943 |tx_index, _, block_state| {
944 Ok(block_state
945 .block_ref()
946 .recovered_block()
947 .body()
948 .transactions()
949 .get(tx_index)
950 .cloned())
951 },
952 )
953 }
954
955 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
956 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
957 return Ok(Some(tx))
958 }
959
960 self.storage_provider.transaction_by_hash(hash)
961 }
962
963 fn transaction_by_hash_with_meta(
964 &self,
965 tx_hash: TxHash,
966 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
967 if let Some((tx, meta)) =
968 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
969 {
970 return Ok(Some((tx, meta)))
971 }
972
973 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
974 }
975
976 fn transactions_by_block(
977 &self,
978 id: BlockHashOrNumber,
979 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
980 self.get_in_memory_or_storage_by_block(
981 id,
982 |provider| provider.transactions_by_block(id),
983 |block_state| {
984 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
985 },
986 )
987 }
988
989 fn transactions_by_block_range(
990 &self,
991 range: impl RangeBounds<BlockNumber>,
992 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
993 self.get_in_memory_or_storage_by_block_range_while(
994 range,
995 |db_provider, range, _| db_provider.transactions_by_block_range(range),
996 |block_state, _| {
997 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
998 },
999 |_| true,
1000 )
1001 }
1002
1003 fn transactions_by_tx_range(
1004 &self,
1005 range: impl RangeBounds<TxNumber>,
1006 ) -> ProviderResult<Vec<Self::Transaction>> {
1007 self.get_in_memory_or_storage_by_tx_range(
1008 range,
1009 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1010 |index_range, block_state| {
1011 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1012 .to_vec())
1013 },
1014 )
1015 }
1016
1017 fn senders_by_tx_range(
1018 &self,
1019 range: impl RangeBounds<TxNumber>,
1020 ) -> ProviderResult<Vec<Address>> {
1021 self.get_in_memory_or_storage_by_tx_range(
1022 range,
1023 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1024 |index_range, block_state| {
1025 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1026 },
1027 )
1028 }
1029
1030 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1031 self.get_in_memory_or_storage_by_tx(
1032 id.into(),
1033 |provider| provider.transaction_sender(id),
1034 |tx_index, _, block_state| {
1035 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1036 },
1037 )
1038 }
1039}
1040
1041impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1042 type Receipt = ReceiptTy<N>;
1043
1044 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1045 self.get_in_memory_or_storage_by_tx(
1046 id.into(),
1047 |provider| provider.receipt(id),
1048 |tx_index, _, block_state| {
1049 Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
1050 },
1051 )
1052 }
1053
1054 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1055 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1056 let executed_block = block_state.block_ref();
1057 let block = executed_block.recovered_block();
1058 let receipts = block_state.executed_block_receipts_ref();
1059
1060 debug_assert_eq!(
1062 block.body().transactions().len(),
1063 receipts.len(),
1064 "Mismatch between transaction and receipt count"
1065 );
1066
1067 if let Some(tx_index) =
1068 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1069 {
1070 return Ok(receipts.get(tx_index).cloned());
1072 }
1073 }
1074
1075 self.storage_provider.receipt_by_hash(hash)
1076 }
1077
1078 fn receipts_by_block(
1079 &self,
1080 block: BlockHashOrNumber,
1081 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1082 self.get_in_memory_or_storage_by_block(
1083 block,
1084 |db_provider| db_provider.receipts_by_block(block),
1085 |block_state| Ok(Some(block_state.executed_block_receipts())),
1086 )
1087 }
1088
1089 fn receipts_by_tx_range(
1090 &self,
1091 range: impl RangeBounds<TxNumber>,
1092 ) -> ProviderResult<Vec<Self::Receipt>> {
1093 self.get_in_memory_or_storage_by_tx_range(
1094 range,
1095 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1096 |index_range, block_state| {
1097 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1098 },
1099 )
1100 }
1101
1102 fn receipts_by_block_range(
1103 &self,
1104 block_range: RangeInclusive<BlockNumber>,
1105 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1106 self.storage_provider.receipts_by_block_range(block_range)
1107 }
1108}
1109
1110impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1111 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1112 match block {
1113 BlockId::Hash(rpc_block_hash) => {
1114 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1115 if receipts.is_none() &&
1116 !rpc_block_hash.require_canonical.unwrap_or(false) &&
1117 let Some(state) = self
1118 .head_block
1119 .as_ref()
1120 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1121 {
1122 receipts = Some(state.executed_block_receipts());
1123 }
1124 Ok(receipts)
1125 }
1126 BlockId::Number(num_tag) => match num_tag {
1127 BlockNumberOrTag::Pending => Ok(self
1128 .canonical_in_memory_state
1129 .pending_state()
1130 .map(|block_state| block_state.executed_block_receipts())),
1131 _ => {
1132 if let Some(num) = self.convert_block_number(num_tag)? {
1133 self.receipts_by_block(num.into())
1134 } else {
1135 Ok(None)
1136 }
1137 }
1138 },
1139 }
1140 }
1141}
1142
1143impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1144 fn block_body_indices(
1145 &self,
1146 number: BlockNumber,
1147 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1148 self.get_in_memory_or_storage_by_block(
1149 number.into(),
1150 |db_provider| db_provider.block_body_indices(number),
1151 |block_state| {
1152 let last_storage_block_number = block_state.anchor().number;
1154 let mut stored_indices = self
1155 .storage_provider
1156 .block_body_indices(last_storage_block_number)?
1157 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1158
1159 stored_indices.first_tx_num = stored_indices.next_tx_num();
1161 stored_indices.tx_count = 0;
1162
1163 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1165 let block_tx_count =
1166 state.block_ref().recovered_block().body().transactions().len() as u64;
1167 if state.block_ref().recovered_block().number() == number {
1168 stored_indices.tx_count = block_tx_count;
1169 } else {
1170 stored_indices.first_tx_num += block_tx_count;
1171 }
1172 }
1173
1174 Ok(Some(stored_indices))
1175 },
1176 )
1177 }
1178
1179 fn block_body_indices_range(
1180 &self,
1181 range: RangeInclusive<BlockNumber>,
1182 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1183 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1184 }
1185}
1186
1187impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1188 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1189 self.storage_provider.get_stage_checkpoint(id)
1190 }
1191
1192 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1193 self.storage_provider.get_stage_checkpoint_progress(id)
1194 }
1195
1196 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1197 self.storage_provider.get_all_checkpoints()
1198 }
1199}
1200
1201impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1202 fn get_prune_checkpoint(
1203 &self,
1204 segment: PruneSegment,
1205 ) -> ProviderResult<Option<PruneCheckpoint>> {
1206 self.storage_provider.get_prune_checkpoint(segment)
1207 }
1208
1209 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1210 self.storage_provider.get_prune_checkpoints()
1211 }
1212}
1213
1214impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1215 type ChainSpec = N::ChainSpec;
1216
1217 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1218 ChainSpecProvider::chain_spec(&self.storage_provider)
1219 }
1220}
1221
1222impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1223 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1224 match id {
1225 BlockId::Number(num) => self.block_by_number_or_tag(num),
1226 BlockId::Hash(hash) => {
1227 if Some(true) == hash.require_canonical {
1232 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1234 } else {
1235 self.block_by_hash(hash.block_hash)
1236 }
1237 }
1238 }
1239 }
1240
1241 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1242 Ok(match id {
1243 BlockNumberOrTag::Latest => {
1244 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1245 }
1246 BlockNumberOrTag::Finalized => {
1247 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1248 }
1249 BlockNumberOrTag::Safe => {
1250 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1251 }
1252 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1253 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1254
1255 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1256 })
1257 }
1258
1259 fn sealed_header_by_number_or_tag(
1260 &self,
1261 id: BlockNumberOrTag,
1262 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1263 match id {
1264 BlockNumberOrTag::Latest => {
1265 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1266 }
1267 BlockNumberOrTag::Finalized => {
1268 Ok(self.canonical_in_memory_state.get_finalized_header())
1269 }
1270 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1271 BlockNumberOrTag::Earliest => self
1272 .header_by_number(self.earliest_block_number()?)?
1273 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1274 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1275 BlockNumberOrTag::Number(num) => self
1276 .header_by_number(num)?
1277 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1278 }
1279 }
1280
1281 fn sealed_header_by_id(
1282 &self,
1283 id: BlockId,
1284 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1285 Ok(match id {
1286 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1287 BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow),
1288 })
1289 }
1290
1291 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1292 Ok(match id {
1293 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1294 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1295 })
1296 }
1297}
1298
1299impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1300 fn storage_changeset(
1301 &self,
1302 block_number: BlockNumber,
1303 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1304 if let Some(state) =
1305 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1306 {
1307 let changesets = state
1308 .block()
1309 .execution_output
1310 .state
1311 .reverts
1312 .clone()
1313 .to_plain_state_reverts()
1314 .storage
1315 .into_iter()
1316 .flatten()
1317 .flat_map(|revert: PlainStorageRevert| {
1318 revert.storage_revert.into_iter().map(move |(key, value)| {
1319 (
1320 BlockNumberAddress((block_number, revert.address)),
1321 StorageEntry { key: key.into(), value: value.to_previous_value() },
1322 )
1323 })
1324 })
1325 .collect();
1326 Ok(changesets)
1327 } else {
1328 let storage_history_exists = self
1332 .storage_provider
1333 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1334 .and_then(|checkpoint| {
1335 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1340 })
1341 .unwrap_or(true);
1342
1343 if !storage_history_exists {
1344 return Err(ProviderError::StateAtBlockPruned(block_number))
1345 }
1346
1347 self.storage_provider.storage_changeset(block_number)
1348 }
1349 }
1350
1351 fn get_storage_before_block(
1352 &self,
1353 block_number: BlockNumber,
1354 address: Address,
1355 storage_key: B256,
1356 ) -> ProviderResult<Option<StorageEntry>> {
1357 if let Some(state) =
1358 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1359 {
1360 let changeset = state
1361 .block_ref()
1362 .execution_output
1363 .state
1364 .reverts
1365 .clone()
1366 .to_plain_state_reverts()
1367 .storage
1368 .into_iter()
1369 .flatten()
1370 .find_map(|revert: PlainStorageRevert| {
1371 if revert.address != address {
1372 return None
1373 }
1374 revert.storage_revert.into_iter().find_map(|(key, value)| {
1375 let key = key.into();
1376 (key == storage_key)
1377 .then(|| StorageEntry { key, value: value.to_previous_value() })
1378 })
1379 });
1380 Ok(changeset)
1381 } else {
1382 let storage_history_exists = self
1383 .storage_provider
1384 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1385 .and_then(|checkpoint| {
1386 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1387 })
1388 .unwrap_or(true);
1389
1390 if !storage_history_exists {
1391 return Err(ProviderError::StateAtBlockPruned(block_number))
1392 }
1393
1394 self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1395 }
1396 }
1397
1398 fn storage_changesets_range(
1399 &self,
1400 range: impl RangeBounds<BlockNumber>,
1401 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1402 let range = to_range(range);
1403 let mut changesets = Vec::new();
1404 let database_start = range.start;
1405 let mut database_end = range.end;
1406
1407 if let Some(head_block) = &self.head_block {
1408 database_end = head_block.anchor().number;
1409
1410 let chain = head_block.chain().collect::<Vec<_>>();
1411 for state in chain {
1412 let block_changesets = state
1413 .block_ref()
1414 .execution_output
1415 .state
1416 .reverts
1417 .clone()
1418 .to_plain_state_reverts()
1419 .storage
1420 .into_iter()
1421 .flatten()
1422 .flat_map(|revert: PlainStorageRevert| {
1423 revert.storage_revert.into_iter().map(move |(key, value)| {
1424 (
1425 BlockNumberAddress((state.number(), revert.address)),
1426 StorageEntry { key: key.into(), value: value.to_previous_value() },
1427 )
1428 })
1429 });
1430
1431 changesets.extend(block_changesets);
1432 }
1433 }
1434
1435 if database_start < database_end {
1436 let storage_history_exists = self
1437 .storage_provider
1438 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1439 .and_then(|checkpoint| {
1440 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1441 })
1442 .unwrap_or(true);
1443
1444 if !storage_history_exists {
1445 return Err(ProviderError::StateAtBlockPruned(database_start))
1446 }
1447
1448 let db_changesets = self
1449 .storage_provider
1450 .storage_changesets_range(database_start..=database_end - 1)?;
1451 changesets.extend(db_changesets);
1452 }
1453
1454 changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1455
1456 Ok(changesets)
1457 }
1458
1459 fn storage_changeset_count(&self) -> ProviderResult<usize> {
1460 let mut count = 0;
1461 if let Some(head_block) = &self.head_block {
1462 for state in head_block.chain() {
1463 count += state
1464 .block_ref()
1465 .execution_output
1466 .state
1467 .reverts
1468 .clone()
1469 .to_plain_state_reverts()
1470 .storage
1471 .into_iter()
1472 .flatten()
1473 .map(|revert: PlainStorageRevert| revert.storage_revert.len())
1474 .sum::<usize>();
1475 }
1476 }
1477
1478 count += self.storage_provider.storage_changeset_count()?;
1479
1480 Ok(count)
1481 }
1482}
1483
1484impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1485 fn account_block_changeset(
1486 &self,
1487 block_number: BlockNumber,
1488 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1489 if let Some(state) =
1490 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1491 {
1492 let changesets = state
1493 .block_ref()
1494 .execution_output
1495 .state
1496 .reverts
1497 .clone()
1498 .to_plain_state_reverts()
1499 .accounts
1500 .into_iter()
1501 .flatten()
1502 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1503 .collect();
1504 Ok(changesets)
1505 } else {
1506 let account_history_exists = self
1510 .storage_provider
1511 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1512 .and_then(|checkpoint| {
1513 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1518 })
1519 .unwrap_or(true);
1520
1521 if !account_history_exists {
1522 return Err(ProviderError::StateAtBlockPruned(block_number))
1523 }
1524
1525 self.storage_provider.account_block_changeset(block_number)
1526 }
1527 }
1528
1529 fn get_account_before_block(
1530 &self,
1531 block_number: BlockNumber,
1532 address: Address,
1533 ) -> ProviderResult<Option<AccountBeforeTx>> {
1534 if let Some(state) =
1535 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1536 {
1537 let changeset = state
1539 .block_ref()
1540 .execution_output
1541 .state
1542 .reverts
1543 .clone()
1544 .to_plain_state_reverts()
1545 .accounts
1546 .into_iter()
1547 .flatten()
1548 .find(|(addr, _)| addr == &address)
1549 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1550 Ok(changeset)
1551 } else {
1552 let account_history_exists = self
1555 .storage_provider
1556 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1557 .and_then(|checkpoint| {
1558 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1563 })
1564 .unwrap_or(true);
1565
1566 if !account_history_exists {
1567 return Err(ProviderError::StateAtBlockPruned(block_number))
1568 }
1569
1570 self.storage_provider.get_account_before_block(block_number, address)
1572 }
1573 }
1574
1575 fn account_changesets_range(
1576 &self,
1577 range: impl core::ops::RangeBounds<BlockNumber>,
1578 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1579 let range = to_range(range);
1580 let mut changesets = Vec::new();
1581 let database_start = range.start;
1582 let mut database_end = range.end;
1583
1584 if let Some(head_block) = &self.head_block {
1586 database_end = head_block.anchor().number;
1588
1589 let chain = head_block.chain().collect::<Vec<_>>();
1590 for state in chain {
1591 let block_changesets = state
1593 .block_ref()
1594 .execution_output
1595 .state
1596 .reverts
1597 .clone()
1598 .to_plain_state_reverts()
1599 .accounts
1600 .into_iter()
1601 .flatten()
1602 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1603
1604 for changeset in block_changesets {
1605 changesets.push((state.number(), changeset));
1606 }
1607 }
1608 }
1609
1610 if database_start < database_end {
1612 let account_history_exists = self
1614 .storage_provider
1615 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1616 .and_then(|checkpoint| {
1617 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1618 })
1619 .unwrap_or(true);
1620
1621 if !account_history_exists {
1622 return Err(ProviderError::StateAtBlockPruned(database_start))
1623 }
1624
1625 let db_changesets =
1626 self.storage_provider.account_changesets_range(database_start..database_end)?;
1627 changesets.extend(db_changesets);
1628 }
1629
1630 changesets.sort_by_key(|(block_num, _)| *block_num);
1631
1632 Ok(changesets)
1633 }
1634
1635 fn account_changeset_count(&self) -> ProviderResult<usize> {
1636 let mut count = 0;
1638 if let Some(head_block) = &self.head_block {
1639 for state in head_block.chain() {
1640 count += state
1641 .block_ref()
1642 .execution_output
1643 .state
1644 .reverts
1645 .clone()
1646 .to_plain_state_reverts()
1647 .accounts
1648 .len();
1649 }
1650 }
1651
1652 count += self.storage_provider.account_changeset_count()?;
1654
1655 Ok(count)
1656 }
1657}
1658
1659impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1660 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1662 let state_provider = self.latest_ref()?;
1664 state_provider.basic_account(address)
1665 }
1666}
1667
1668impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1669 type Receipt = ReceiptTy<N>;
1670
1671 fn get_state(
1681 &self,
1682 block: BlockNumber,
1683 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1684 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1685 let state = state.block_ref().execution_outcome().clone();
1686 Ok(Some(ExecutionOutcome::from((state, block))))
1687 } else {
1688 Self::get_state(self, block..=block)
1689 }
1690 }
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695 use crate::{
1696 providers::blockchain_provider::BlockchainProvider,
1697 test_utils::create_test_provider_factory, BlockWriter,
1698 };
1699 use alloy_eips::BlockHashOrNumber;
1700 use alloy_primitives::B256;
1701 use itertools::Itertools;
1702 use rand::Rng;
1703 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1704 use reth_db_api::models::AccountBeforeTx;
1705 use reth_ethereum_primitives::Block;
1706 use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1707 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1708 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1709 use reth_testing_utils::generators::{
1710 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1711 };
1712 use revm_database::BundleState;
1713 use std::{
1714 ops::{Bound, Range, RangeBounds},
1715 sync::Arc,
1716 };
1717
1718 const TEST_BLOCKS_COUNT: usize = 5;
1719
1720 fn random_blocks(
1721 rng: &mut impl Rng,
1722 database_blocks: usize,
1723 in_memory_blocks: usize,
1724 requests_count: Option<Range<u8>>,
1725 withdrawals_count: Option<Range<u8>>,
1726 tx_count: impl RangeBounds<u8>,
1727 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1728 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1729
1730 let tx_start = match tx_count.start_bound() {
1731 Bound::Included(&n) | Bound::Excluded(&n) => n,
1732 Bound::Unbounded => u8::MIN,
1733 };
1734 let tx_end = match tx_count.end_bound() {
1735 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1736 Bound::Unbounded => u8::MAX,
1737 };
1738
1739 let blocks = random_block_range(
1740 rng,
1741 0..=block_range,
1742 BlockRangeParams {
1743 parent: Some(B256::ZERO),
1744 tx_count: tx_start..tx_end,
1745 requests_count,
1746 withdrawals_count,
1747 },
1748 );
1749 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1750 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1751 }
1752
1753 #[test]
1754 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1755 let mut rng = generators::rng();
1757 let factory = create_test_provider_factory();
1758
1759 let blocks = random_block_range(
1761 &mut rng,
1762 0..=10,
1763 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1764 );
1765 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1766
1767 let provider_rw = factory.provider_rw()?;
1769 for block in database_blocks {
1770 provider_rw.insert_block(
1771 &block.clone().try_recover().expect("failed to seal block with senders"),
1772 )?;
1773 }
1774 provider_rw.commit()?;
1775
1776 let provider = BlockchainProvider::new(factory)?;
1778 let consistent_provider = provider.consistent_provider()?;
1779
1780 let first_db_block = database_blocks.first().unwrap();
1782 let first_in_mem_block = in_memory_blocks.first().unwrap();
1783 let last_in_mem_block = in_memory_blocks.last().unwrap();
1784
1785 assert_eq!(
1787 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1788 None
1789 );
1790 assert_eq!(
1791 consistent_provider
1792 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1793 None
1794 );
1795 assert_eq!(
1797 consistent_provider
1798 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1799 None
1800 );
1801
1802 let in_memory_block_senders =
1804 first_in_mem_block.senders().expect("failed to recover senders");
1805 let chain = NewCanonicalChain::Commit {
1806 new: vec![ExecutedBlock {
1807 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1808 first_in_mem_block.clone(),
1809 in_memory_block_senders,
1810 )),
1811 ..Default::default()
1812 }],
1813 };
1814 consistent_provider.canonical_in_memory_state.update_chain(chain);
1815 let consistent_provider = provider.consistent_provider()?;
1816
1817 assert_eq!(
1819 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1820 Some(first_in_mem_block.clone().into_block())
1821 );
1822 assert_eq!(
1823 consistent_provider
1824 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1825 Some(first_in_mem_block.clone().into_block())
1826 );
1827
1828 assert_eq!(
1830 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1831 Some(first_db_block.clone().into_block())
1832 );
1833 assert_eq!(
1834 consistent_provider
1835 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1836 Some(first_db_block.clone().into_block())
1837 );
1838
1839 assert_eq!(
1841 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1842 None
1843 );
1844
1845 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1847 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1848 last_in_mem_block.clone(),
1849 Default::default(),
1850 )),
1851 ..Default::default()
1852 });
1853
1854 assert_eq!(
1856 consistent_provider
1857 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1858 Some(last_in_mem_block.clone_block())
1859 );
1860
1861 Ok(())
1862 }
1863
1864 #[test]
1865 fn test_block_reader_block() -> eyre::Result<()> {
1866 let mut rng = generators::rng();
1868 let factory = create_test_provider_factory();
1869
1870 let blocks = random_block_range(
1872 &mut rng,
1873 0..=10,
1874 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1875 );
1876 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1877
1878 let provider_rw = factory.provider_rw()?;
1880 for block in database_blocks {
1881 provider_rw.insert_block(
1882 &block.clone().try_recover().expect("failed to seal block with senders"),
1883 )?;
1884 }
1885 provider_rw.commit()?;
1886
1887 let provider = BlockchainProvider::new(factory)?;
1889 let consistent_provider = provider.consistent_provider()?;
1890
1891 let first_in_mem_block = in_memory_blocks.first().unwrap();
1893 let first_db_block = database_blocks.first().unwrap();
1895
1896 assert_eq!(
1898 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1899 None
1900 );
1901 assert_eq!(
1902 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1903 None
1904 );
1905
1906 let in_memory_block_senders =
1908 first_in_mem_block.senders().expect("failed to recover senders");
1909 let chain = NewCanonicalChain::Commit {
1910 new: vec![ExecutedBlock {
1911 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1912 first_in_mem_block.clone(),
1913 in_memory_block_senders,
1914 )),
1915 ..Default::default()
1916 }],
1917 };
1918 consistent_provider.canonical_in_memory_state.update_chain(chain);
1919
1920 let consistent_provider = provider.consistent_provider()?;
1921
1922 assert_eq!(
1924 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1925 Some(first_in_mem_block.clone().into_block())
1926 );
1927 assert_eq!(
1928 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1929 Some(first_in_mem_block.clone().into_block())
1930 );
1931
1932 assert_eq!(
1934 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1935 Some(first_db_block.clone().into_block())
1936 );
1937 assert_eq!(
1938 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1939 Some(first_db_block.clone().into_block())
1940 );
1941
1942 Ok(())
1943 }
1944
1945 #[test]
1946 fn test_changeset_reader() -> eyre::Result<()> {
1947 let mut rng = generators::rng();
1948
1949 let (database_blocks, in_memory_blocks) =
1950 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1951
1952 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1953 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1954 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1955
1956 let accounts = random_eoa_accounts(&mut rng, 2);
1957
1958 let (database_changesets, database_state) = random_changeset_range(
1959 &mut rng,
1960 &database_blocks,
1961 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1962 0..0,
1963 0..0,
1964 );
1965 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1966 &mut rng,
1967 &in_memory_blocks,
1968 database_state
1969 .iter()
1970 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1971 0..0,
1972 0..0,
1973 );
1974
1975 let factory = create_test_provider_factory();
1976
1977 let provider_rw = factory.provider_rw()?;
1978 provider_rw.append_blocks_with_state(
1979 database_blocks
1980 .into_iter()
1981 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1982 .collect(),
1983 &ExecutionOutcome {
1984 bundle: BundleState::new(
1985 database_state.into_iter().map(|(address, (account, _))| {
1986 (address, None, Some(account.into()), Default::default())
1987 }),
1988 database_changesets.iter().map(|block_changesets| {
1989 block_changesets.iter().map(|(address, account, _)| {
1990 (*address, Some(Some((*account).into())), [])
1991 })
1992 }),
1993 Vec::new(),
1994 ),
1995 first_block: first_database_block,
1996 ..Default::default()
1997 },
1998 Default::default(),
1999 )?;
2000 provider_rw.commit()?;
2001
2002 let provider = BlockchainProvider::new(factory)?;
2003
2004 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
2005 let chain = NewCanonicalChain::Commit {
2006 new: vec![in_memory_blocks
2007 .first()
2008 .map(|block| {
2009 let senders = block.senders().expect("failed to recover senders");
2010 ExecutedBlock {
2011 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2012 block.clone(),
2013 senders,
2014 )),
2015 execution_output: Arc::new(BlockExecutionOutput {
2016 state: BundleState::new(
2017 in_memory_state.into_iter().map(|(address, (account, _))| {
2018 (address, None, Some(account.into()), Default::default())
2019 }),
2020 [in_memory_changesets.iter().map(|(address, account, _)| {
2021 (*address, Some(Some((*account).into())), Vec::new())
2022 })],
2023 [],
2024 ),
2025 result: BlockExecutionResult {
2026 receipts: Default::default(),
2027 requests: Default::default(),
2028 gas_used: 0,
2029 blob_gas_used: 0,
2030 },
2031 }),
2032 ..Default::default()
2033 }
2034 })
2035 .unwrap()],
2036 };
2037 provider.canonical_in_memory_state.update_chain(chain);
2038
2039 let consistent_provider = provider.consistent_provider()?;
2040
2041 assert_eq!(
2042 consistent_provider.account_block_changeset(last_database_block).unwrap(),
2043 database_changesets
2044 .into_iter()
2045 .next_back()
2046 .unwrap()
2047 .into_iter()
2048 .sorted_by_key(|(address, _, _)| *address)
2049 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2050 .collect::<Vec<_>>()
2051 );
2052 assert_eq!(
2053 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
2054 in_memory_changesets
2055 .into_iter()
2056 .sorted_by_key(|(address, _, _)| *address)
2057 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
2058 .collect::<Vec<_>>()
2059 );
2060
2061 Ok(())
2062 }
2063}