1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::{StaticFileProvider, StaticFileProviderRWRefMut},
4 to_range, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
5 BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
6 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
7 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
8 TransactionsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
11use alloy_eips::{
12 eip2718::Encodable2718, BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag,
13 HashOrNumber,
14};
15use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
16use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
17use reth_chainspec::ChainInfo;
18use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
19use reth_execution_types::ExecutionOutcome;
20use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
21use reth_primitives_traits::{Account, BlockBody, RecoveredBlock, SealedHeader, StorageEntry};
22use reth_prune_types::{PruneCheckpoint, PruneSegment};
23use reth_stages_types::{StageCheckpoint, StageId};
24use reth_static_file_types::StaticFileSegment;
25use reth_storage_api::{
26 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, StateProvider,
27 StateProviderBox, StorageChangeSetReader, TryIntoHistoricalStateProvider,
28};
29use reth_storage_errors::provider::ProviderResult;
30use revm_database::states::PlainStorageRevert;
31use std::{
32 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
33 sync::Arc,
34};
35use tracing::trace;
36
37#[derive(Debug)]
44#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
46 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
48 head_block: Option<Arc<BlockState<N::Primitives>>>,
50 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
52}
53
54impl<N: ProviderNodeTypes> ConsistentProvider<N> {
55 pub fn new(
61 storage_provider_factory: ProviderFactory<N>,
62 state: CanonicalInMemoryState<N::Primitives>,
63 ) -> ProviderResult<Self> {
64 let head_block = state.head_state();
72 let storage_provider = storage_provider_factory.database_provider_ro()?;
73 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
74 }
75
76 fn convert_range_bounds<T>(
78 &self,
79 range: impl RangeBounds<T>,
80 end_unbounded: impl FnOnce() -> T,
81 ) -> (T, T)
82 where
83 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
84 {
85 let start = match range.start_bound() {
86 Bound::Included(&n) => n,
87 Bound::Excluded(&n) => n + T::from(1u8),
88 Bound::Unbounded => T::from(0u8),
89 };
90
91 let end = match range.end_bound() {
92 Bound::Included(&n) => n,
93 Bound::Excluded(&n) => n - T::from(1u8),
94 Bound::Unbounded => end_unbounded(),
95 };
96
97 (start, end)
98 }
99
100 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
102 trace!(target: "providers::blockchain", "Getting latest block state provider");
103
104 if let Some(state) = &self.head_block {
106 trace!(target: "providers::blockchain", "Using head state for latest state provider");
107 Ok(self.block_state_provider_ref(state)?.boxed())
108 } else {
109 trace!(target: "providers::blockchain", "Using database state for latest state provider");
110 Ok(self.storage_provider.latest())
111 }
112 }
113
114 fn history_by_block_hash_ref<'a>(
115 &'a self,
116 block_hash: BlockHash,
117 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
118 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
119
120 self.get_in_memory_or_storage_by_block(
121 block_hash.into(),
122 |_| self.storage_provider.history_by_block_hash(block_hash),
123 |block_state| {
124 let state_provider = self.block_state_provider_ref(block_state)?;
125 Ok(Box::new(state_provider))
126 },
127 )
128 }
129
130 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
142 &self,
143 range: impl RangeBounds<BlockNumber>,
144 fetch_db_range: F,
145 map_block_state_item: G,
146 mut predicate: P,
147 ) -> ProviderResult<Vec<T>>
148 where
149 F: FnOnce(
150 &DatabaseProviderRO<N::DB, N>,
151 RangeInclusive<BlockNumber>,
152 &mut P,
153 ) -> ProviderResult<Vec<T>>,
154 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
155 P: FnMut(&T) -> bool,
156 {
157 let mut in_memory_chain =
165 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
166 let db_provider = &self.storage_provider;
167
168 let (start, end) = self.convert_range_bounds(range, || {
169 in_memory_chain
171 .first()
172 .map(|b| b.number())
173 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
174 });
175
176 if start > end {
177 return Ok(vec![])
178 }
179
180 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
185 Some(lowest_memory_block) if lowest_memory_block <= end => {
186 let highest_memory_block =
187 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
188
189 let in_memory_range =
193 lowest_memory_block.max(start)..=end.min(highest_memory_block);
194
195 in_memory_chain.truncate(
198 in_memory_chain
199 .len()
200 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
201 );
202
203 let storage_range =
204 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
205
206 (Some((in_memory_chain, in_memory_range)), storage_range)
207 }
208 _ => {
209 drop(in_memory_chain);
211
212 (None, Some(start..=end))
213 }
214 };
215
216 let mut items = Vec::with_capacity((end - start + 1) as usize);
217
218 if let Some(storage_range) = storage_range {
219 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
220 items.append(&mut db_items);
221
222 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
225 return Ok(items)
226 }
227 }
228
229 if let Some((in_memory_chain, in_memory_range)) = in_memory {
230 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
231 debug_assert!(num == block.number());
232 if let Some(item) = map_block_state_item(block, &mut predicate) {
233 items.push(item);
234 } else {
235 break
236 }
237 }
238 }
239
240 Ok(items)
241 }
242
243 fn block_state_provider_ref(
245 &self,
246 state: &BlockState<N::Primitives>,
247 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
248 let anchor_hash = state.anchor().hash;
249 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
250 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
251 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
252 }
253
254 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
260 &self,
261 range: impl RangeBounds<BlockNumber>,
262 fetch_from_db: S,
263 fetch_from_block_state: M,
264 ) -> ProviderResult<Vec<R>>
265 where
266 S: FnOnce(
267 &DatabaseProviderRO<N::DB, N>,
268 RangeInclusive<TxNumber>,
269 ) -> ProviderResult<Vec<R>>,
270 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
271 {
272 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
273 let provider = &self.storage_provider;
274
275 let last_database_block_number = in_mem_chain
278 .last()
279 .map(|b| Ok(b.anchor().number))
280 .unwrap_or_else(|| provider.last_block_number())?;
281
282 let last_block_body_index = provider
285 .block_body_indices(last_database_block_number)?
286 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
287 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
288
289 let (start, end) = self.convert_range_bounds(range, || {
290 in_mem_chain
291 .iter()
292 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
293 .sum::<u64>() +
294 last_block_body_index.last_tx_num()
295 });
296
297 if start > end {
298 return Ok(vec![])
299 }
300
301 let mut tx_range = start..=end;
302
303 if *tx_range.end() < in_memory_tx_num {
306 return fetch_from_db(provider, tx_range);
307 }
308
309 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
310
311 if *tx_range.start() < in_memory_tx_num {
313 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
315
316 tx_range = in_memory_tx_num..=*tx_range.end();
318
319 items.extend(fetch_from_db(provider, db_range)?);
320 }
321
322 for block_state in in_mem_chain.iter().rev() {
324 let block_tx_count =
325 block_state.block_ref().recovered_block().body().transactions().len();
326 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
327
328 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
331 in_memory_tx_num += block_tx_count as u64;
332 continue
333 }
334
335 let skip = (tx_range.start() - in_memory_tx_num) as usize;
337
338 items.extend(fetch_from_block_state(
339 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
340 block_state,
341 )?);
342
343 in_memory_tx_num += block_tx_count as u64;
344
345 if in_memory_tx_num > *tx_range.end() {
347 break
348 }
349
350 tx_range = in_memory_tx_num..=*tx_range.end();
352 }
353
354 Ok(items)
355 }
356
357 fn get_in_memory_or_storage_by_tx<S, M, R>(
360 &self,
361 id: HashOrNumber,
362 fetch_from_db: S,
363 fetch_from_block_state: M,
364 ) -> ProviderResult<Option<R>>
365 where
366 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
367 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
368 {
369 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
370 let provider = &self.storage_provider;
371
372 let last_database_block_number = in_mem_chain
375 .last()
376 .map(|b| Ok(b.anchor().number))
377 .unwrap_or_else(|| provider.last_block_number())?;
378
379 let last_block_body_index = provider
382 .block_body_indices(last_database_block_number)?
383 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
384 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
385
386 if let HashOrNumber::Number(id) = id &&
389 id < in_memory_tx_num
390 {
391 return fetch_from_db(provider)
392 }
393
394 for block_state in in_mem_chain.iter().rev() {
396 let executed_block = block_state.block_ref();
397 let block = executed_block.recovered_block();
398
399 for tx_index in 0..block.body().transactions().len() {
400 match id {
401 HashOrNumber::Hash(tx_hash) => {
402 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
403 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
404 }
405 }
406 HashOrNumber::Number(id) => {
407 if id == in_memory_tx_num {
408 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
409 }
410 }
411 }
412
413 in_memory_tx_num += 1;
414 }
415 }
416
417 if let HashOrNumber::Hash(_) = id {
419 return fetch_from_db(provider)
420 }
421
422 Ok(None)
423 }
424
425 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
427 &self,
428 id: BlockHashOrNumber,
429 fetch_from_db: S,
430 fetch_from_block_state: M,
431 ) -> ProviderResult<R>
432 where
433 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
434 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
435 {
436 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
437 return fetch_from_block_state(block_state)
438 }
439 fetch_from_db(&self.storage_provider)
440 }
441
442 pub(crate) fn into_state_provider_at_block_hash(
444 self,
445 block_hash: BlockHash,
446 ) -> ProviderResult<StateProviderBox> {
447 let block_number =
449 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
450 self.ensure_canonical_block(block_number)?;
451
452 let Self { storage_provider, head_block, .. } = self;
453 if let Some(Some(block_state)) =
454 head_block.as_ref().map(|b| b.block_on_chain(block_hash.into()))
455 {
456 let anchor_hash = block_state.anchor().hash;
457 let block_number = storage_provider
458 .block_number(anchor_hash)?
459 .ok_or(ProviderError::BlockHashNotFound(anchor_hash))?;
460 let latest_historical = storage_provider.try_into_history_at_block(block_number)?;
461 return Ok(Box::new(block_state.state_provider(latest_historical)));
462 }
463 storage_provider.try_into_history_at_block(block_number)
464 }
465}
466
467impl<N: ProviderNodeTypes> ConsistentProvider<N> {
468 #[inline]
477 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
478 let latest = self.best_block_number()?;
479 if block_number > latest {
480 Err(ProviderError::HeaderNotFound(block_number.into()))
481 } else {
482 Ok(())
483 }
484 }
485}
486
487impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
488 type Primitives = N::Primitives;
489}
490
491impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
492 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
493 self.storage_provider.static_file_provider()
494 }
495
496 fn get_static_file_writer(
497 &self,
498 block: BlockNumber,
499 segment: StaticFileSegment,
500 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
501 self.storage_provider.get_static_file_writer(block, segment)
502 }
503}
504
505impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
506 type Header = HeaderTy<N>;
507
508 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
509 self.get_in_memory_or_storage_by_block(
510 block_hash.into(),
511 |db_provider| db_provider.header(block_hash),
512 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
513 )
514 }
515
516 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
517 self.get_in_memory_or_storage_by_block(
518 num.into(),
519 |db_provider| db_provider.header_by_number(num),
520 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
521 )
522 }
523
524 fn headers_range(
525 &self,
526 range: impl RangeBounds<BlockNumber>,
527 ) -> ProviderResult<Vec<Self::Header>> {
528 self.get_in_memory_or_storage_by_block_range_while(
529 range,
530 |db_provider, range, _| db_provider.headers_range(range),
531 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
532 |_| true,
533 )
534 }
535
536 fn sealed_header(
537 &self,
538 number: BlockNumber,
539 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
540 self.get_in_memory_or_storage_by_block(
541 number.into(),
542 |db_provider| db_provider.sealed_header(number),
543 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
544 )
545 }
546
547 fn sealed_headers_range(
548 &self,
549 range: impl RangeBounds<BlockNumber>,
550 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
551 self.get_in_memory_or_storage_by_block_range_while(
552 range,
553 |db_provider, range, _| db_provider.sealed_headers_range(range),
554 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
555 |_| true,
556 )
557 }
558
559 fn sealed_headers_while(
560 &self,
561 range: impl RangeBounds<BlockNumber>,
562 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
563 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
564 self.get_in_memory_or_storage_by_block_range_while(
565 range,
566 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
567 |block_state, predicate| {
568 let header = block_state.block_ref().recovered_block().sealed_header();
569 predicate(header).then(|| header.clone())
570 },
571 predicate,
572 )
573 }
574}
575
576impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
577 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
578 self.get_in_memory_or_storage_by_block(
579 number.into(),
580 |db_provider| db_provider.block_hash(number),
581 |block_state| Ok(Some(block_state.hash())),
582 )
583 }
584
585 fn canonical_hashes_range(
586 &self,
587 start: BlockNumber,
588 end: BlockNumber,
589 ) -> ProviderResult<Vec<B256>> {
590 self.get_in_memory_or_storage_by_block_range_while(
591 start..end,
592 |db_provider, inclusive_range, _| {
593 db_provider
594 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
595 },
596 |block_state, _| Some(block_state.hash()),
597 |_| true,
598 )
599 }
600}
601
602impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
603 fn chain_info(&self) -> ProviderResult<ChainInfo> {
604 let best_number = self.best_block_number()?;
605 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
606 }
607
608 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
609 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
610 }
611
612 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
613 self.storage_provider.last_block_number()
614 }
615
616 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
617 self.get_in_memory_or_storage_by_block(
618 hash.into(),
619 |db_provider| db_provider.block_number(hash),
620 |block_state| Ok(Some(block_state.number())),
621 )
622 }
623}
624
625impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
626 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
627 Ok(self.canonical_in_memory_state.pending_block_num_hash())
628 }
629
630 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
631 Ok(self.canonical_in_memory_state.get_safe_num_hash())
632 }
633
634 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
635 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
636 }
637}
638
639impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
640 type Block = BlockTy<N>;
641
642 fn find_block_by_hash(
643 &self,
644 hash: B256,
645 source: BlockSource,
646 ) -> ProviderResult<Option<Self::Block>> {
647 if matches!(source, BlockSource::Canonical | BlockSource::Any) &&
648 let Some(block) = self.get_in_memory_or_storage_by_block(
649 hash.into(),
650 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
651 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
652 )?
653 {
654 return Ok(Some(block))
655 }
656
657 if matches!(source, BlockSource::Pending | BlockSource::Any) {
658 return Ok(self
659 .canonical_in_memory_state
660 .pending_block()
661 .filter(|b| b.hash() == hash)
662 .map(|b| b.into_block()))
663 }
664
665 Ok(None)
666 }
667
668 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
669 self.get_in_memory_or_storage_by_block(
670 id,
671 |db_provider| db_provider.block(id),
672 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
673 )
674 }
675
676 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
677 Ok(self.canonical_in_memory_state.pending_recovered_block())
678 }
679
680 fn pending_block_and_receipts(
681 &self,
682 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
683 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
684 }
685
686 fn recovered_block(
693 &self,
694 id: BlockHashOrNumber,
695 transaction_kind: TransactionVariant,
696 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
697 self.get_in_memory_or_storage_by_block(
698 id,
699 |db_provider| db_provider.recovered_block(id, transaction_kind),
700 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
701 )
702 }
703
704 fn sealed_block_with_senders(
705 &self,
706 id: BlockHashOrNumber,
707 transaction_kind: TransactionVariant,
708 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
709 self.get_in_memory_or_storage_by_block(
710 id,
711 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
712 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
713 )
714 }
715
716 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
717 self.get_in_memory_or_storage_by_block_range_while(
718 range,
719 |db_provider, range, _| db_provider.block_range(range),
720 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
721 |_| true,
722 )
723 }
724
725 fn block_with_senders_range(
726 &self,
727 range: RangeInclusive<BlockNumber>,
728 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
729 self.get_in_memory_or_storage_by_block_range_while(
730 range,
731 |db_provider, range, _| db_provider.block_with_senders_range(range),
732 |block_state, _| Some(block_state.block().recovered_block().clone()),
733 |_| true,
734 )
735 }
736
737 fn recovered_block_range(
738 &self,
739 range: RangeInclusive<BlockNumber>,
740 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
741 self.get_in_memory_or_storage_by_block_range_while(
742 range,
743 |db_provider, range, _| db_provider.recovered_block_range(range),
744 |block_state, _| Some(block_state.block().recovered_block().clone()),
745 |_| true,
746 )
747 }
748
749 fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
750 self.get_in_memory_or_storage_by_tx(
751 id.into(),
752 |db_provider| db_provider.block_by_transaction_id(id),
753 |_, _, block_state| Ok(Some(block_state.number())),
754 )
755 }
756}
757
758impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
759 type Transaction = TxTy<N>;
760
761 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
762 self.get_in_memory_or_storage_by_tx(
763 tx_hash.into(),
764 |db_provider| db_provider.transaction_id(tx_hash),
765 |_, tx_number, _| Ok(Some(tx_number)),
766 )
767 }
768
769 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
770 self.get_in_memory_or_storage_by_tx(
771 id.into(),
772 |provider| provider.transaction_by_id(id),
773 |tx_index, _, block_state| {
774 Ok(block_state
775 .block_ref()
776 .recovered_block()
777 .body()
778 .transactions()
779 .get(tx_index)
780 .cloned())
781 },
782 )
783 }
784
785 fn transaction_by_id_unhashed(
786 &self,
787 id: TxNumber,
788 ) -> ProviderResult<Option<Self::Transaction>> {
789 self.get_in_memory_or_storage_by_tx(
790 id.into(),
791 |provider| provider.transaction_by_id_unhashed(id),
792 |tx_index, _, block_state| {
793 Ok(block_state
794 .block_ref()
795 .recovered_block()
796 .body()
797 .transactions()
798 .get(tx_index)
799 .cloned())
800 },
801 )
802 }
803
804 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
805 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
806 return Ok(Some(tx))
807 }
808
809 self.storage_provider.transaction_by_hash(hash)
810 }
811
812 fn transaction_by_hash_with_meta(
813 &self,
814 tx_hash: TxHash,
815 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
816 if let Some((tx, meta)) =
817 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
818 {
819 return Ok(Some((tx, meta)))
820 }
821
822 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
823 }
824
825 fn transactions_by_block(
826 &self,
827 id: BlockHashOrNumber,
828 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
829 self.get_in_memory_or_storage_by_block(
830 id,
831 |provider| provider.transactions_by_block(id),
832 |block_state| {
833 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
834 },
835 )
836 }
837
838 fn transactions_by_block_range(
839 &self,
840 range: impl RangeBounds<BlockNumber>,
841 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
842 self.get_in_memory_or_storage_by_block_range_while(
843 range,
844 |db_provider, range, _| db_provider.transactions_by_block_range(range),
845 |block_state, _| {
846 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
847 },
848 |_| true,
849 )
850 }
851
852 fn transactions_by_tx_range(
853 &self,
854 range: impl RangeBounds<TxNumber>,
855 ) -> ProviderResult<Vec<Self::Transaction>> {
856 self.get_in_memory_or_storage_by_tx_range(
857 range,
858 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
859 |index_range, block_state| {
860 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
861 .to_vec())
862 },
863 )
864 }
865
866 fn senders_by_tx_range(
867 &self,
868 range: impl RangeBounds<TxNumber>,
869 ) -> ProviderResult<Vec<Address>> {
870 self.get_in_memory_or_storage_by_tx_range(
871 range,
872 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
873 |index_range, block_state| {
874 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
875 },
876 )
877 }
878
879 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
880 self.get_in_memory_or_storage_by_tx(
881 id.into(),
882 |provider| provider.transaction_sender(id),
883 |tx_index, _, block_state| {
884 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
885 },
886 )
887 }
888}
889
890impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
891 type Receipt = ReceiptTy<N>;
892
893 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
894 self.get_in_memory_or_storage_by_tx(
895 id.into(),
896 |provider| provider.receipt(id),
897 |tx_index, _, block_state| {
898 Ok(block_state.executed_block_receipts_ref().get(tx_index).cloned())
899 },
900 )
901 }
902
903 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
904 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
905 let executed_block = block_state.block_ref();
906 let block = executed_block.recovered_block();
907 let receipts = block_state.executed_block_receipts_ref();
908
909 debug_assert_eq!(
911 block.body().transactions().len(),
912 receipts.len(),
913 "Mismatch between transaction and receipt count"
914 );
915
916 if let Some(tx_index) =
917 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
918 {
919 return Ok(receipts.get(tx_index).cloned());
921 }
922 }
923
924 self.storage_provider.receipt_by_hash(hash)
925 }
926
927 fn receipts_by_block(
928 &self,
929 block: BlockHashOrNumber,
930 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
931 self.get_in_memory_or_storage_by_block(
932 block,
933 |db_provider| db_provider.receipts_by_block(block),
934 |block_state| Ok(Some(block_state.executed_block_receipts())),
935 )
936 }
937
938 fn receipts_by_tx_range(
939 &self,
940 range: impl RangeBounds<TxNumber>,
941 ) -> ProviderResult<Vec<Self::Receipt>> {
942 self.get_in_memory_or_storage_by_tx_range(
943 range,
944 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
945 |index_range, block_state| {
946 Ok(block_state.executed_block_receipts_ref()[index_range].to_vec())
947 },
948 )
949 }
950
951 fn receipts_by_block_range(
952 &self,
953 block_range: RangeInclusive<BlockNumber>,
954 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
955 self.storage_provider.receipts_by_block_range(block_range)
956 }
957}
958
959impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
960 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
961 match block {
962 BlockId::Hash(rpc_block_hash) => {
963 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
964 if receipts.is_none() &&
965 !rpc_block_hash.require_canonical.unwrap_or(false) &&
966 let Some(state) = self
967 .head_block
968 .as_ref()
969 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
970 {
971 receipts = Some(state.executed_block_receipts());
972 }
973 Ok(receipts)
974 }
975 BlockId::Number(num_tag) => match num_tag {
976 BlockNumberOrTag::Pending => Ok(self
977 .canonical_in_memory_state
978 .pending_state()
979 .map(|block_state| block_state.executed_block_receipts())),
980 _ => {
981 if let Some(num) = self.convert_block_number(num_tag)? {
982 self.receipts_by_block(num.into())
983 } else {
984 Ok(None)
985 }
986 }
987 },
988 }
989 }
990}
991
992impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
993 fn block_body_indices(
994 &self,
995 number: BlockNumber,
996 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
997 self.get_in_memory_or_storage_by_block(
998 number.into(),
999 |db_provider| db_provider.block_body_indices(number),
1000 |block_state| {
1001 let last_storage_block_number = block_state.anchor().number;
1003 let mut stored_indices = self
1004 .storage_provider
1005 .block_body_indices(last_storage_block_number)?
1006 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1007
1008 stored_indices.first_tx_num = stored_indices.next_tx_num();
1010 stored_indices.tx_count = 0;
1011
1012 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1014 let block_tx_count =
1015 state.block_ref().recovered_block().body().transactions().len() as u64;
1016 if state.block_ref().recovered_block().number() == number {
1017 stored_indices.tx_count = block_tx_count;
1018 } else {
1019 stored_indices.first_tx_num += block_tx_count;
1020 }
1021 }
1022
1023 Ok(Some(stored_indices))
1024 },
1025 )
1026 }
1027
1028 fn block_body_indices_range(
1029 &self,
1030 range: RangeInclusive<BlockNumber>,
1031 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1032 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1033 }
1034}
1035
1036impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1037 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1038 self.storage_provider.get_stage_checkpoint(id)
1039 }
1040
1041 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1042 self.storage_provider.get_stage_checkpoint_progress(id)
1043 }
1044
1045 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1046 self.storage_provider.get_all_checkpoints()
1047 }
1048}
1049
1050impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1051 fn get_prune_checkpoint(
1052 &self,
1053 segment: PruneSegment,
1054 ) -> ProviderResult<Option<PruneCheckpoint>> {
1055 self.storage_provider.get_prune_checkpoint(segment)
1056 }
1057
1058 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1059 self.storage_provider.get_prune_checkpoints()
1060 }
1061}
1062
1063impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1064 type ChainSpec = N::ChainSpec;
1065
1066 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1067 ChainSpecProvider::chain_spec(&self.storage_provider)
1068 }
1069}
1070
1071impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1072 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1073 match id {
1074 BlockId::Number(num) => self.block_by_number_or_tag(num),
1075 BlockId::Hash(hash) => {
1076 if Some(true) == hash.require_canonical {
1081 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1083 } else {
1084 self.block_by_hash(hash.block_hash)
1085 }
1086 }
1087 }
1088 }
1089
1090 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1091 Ok(match id {
1092 BlockNumberOrTag::Latest => {
1093 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1094 }
1095 BlockNumberOrTag::Finalized => {
1096 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1097 }
1098 BlockNumberOrTag::Safe => {
1099 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1100 }
1101 BlockNumberOrTag::Earliest => self.header_by_number(self.earliest_block_number()?)?,
1102 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1103
1104 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1105 })
1106 }
1107
1108 fn sealed_header_by_number_or_tag(
1109 &self,
1110 id: BlockNumberOrTag,
1111 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1112 match id {
1113 BlockNumberOrTag::Latest => {
1114 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1115 }
1116 BlockNumberOrTag::Finalized => {
1117 Ok(self.canonical_in_memory_state.get_finalized_header())
1118 }
1119 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1120 BlockNumberOrTag::Earliest => self
1121 .header_by_number(self.earliest_block_number()?)?
1122 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1123 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1124 BlockNumberOrTag::Number(num) => self
1125 .header_by_number(num)?
1126 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1127 }
1128 }
1129
1130 fn sealed_header_by_id(
1131 &self,
1132 id: BlockId,
1133 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1134 Ok(match id {
1135 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1136 BlockId::Hash(hash) => self
1137 .header(hash.block_hash)?
1138 .map(|header| SealedHeader::new(header, hash.block_hash)),
1139 })
1140 }
1141
1142 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1143 Ok(match id {
1144 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1145 BlockId::Hash(hash) => self.header(hash.block_hash)?,
1146 })
1147 }
1148}
1149
1150impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1151 fn storage_changeset(
1152 &self,
1153 block_number: BlockNumber,
1154 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1155 if let Some(state) =
1156 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1157 {
1158 let changesets = state
1159 .block()
1160 .execution_output
1161 .state
1162 .reverts
1163 .to_plain_state_reverts()
1164 .storage
1165 .into_iter()
1166 .flatten()
1167 .flat_map(|revert: PlainStorageRevert| {
1168 revert.storage_revert.into_iter().map(move |(key, value)| {
1169 let plain_key = B256::from(key.to_be_bytes());
1170 (
1171 BlockNumberAddress((block_number, revert.address)),
1172 StorageEntry { key: plain_key, value: value.to_previous_value() },
1173 )
1174 })
1175 })
1176 .collect();
1177 Ok(changesets)
1178 } else {
1179 let storage_history_exists = self
1183 .storage_provider
1184 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1185 .and_then(|checkpoint| {
1186 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1191 })
1192 .unwrap_or(true);
1193
1194 if !storage_history_exists {
1195 return Err(ProviderError::StateAtBlockPruned(block_number))
1196 }
1197
1198 self.storage_provider.storage_changeset(block_number)
1199 }
1200 }
1201
1202 fn get_storage_before_block(
1203 &self,
1204 block_number: BlockNumber,
1205 address: Address,
1206 storage_key: B256,
1207 ) -> ProviderResult<Option<StorageEntry>> {
1208 if let Some(state) =
1209 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1210 {
1211 let changeset = state
1212 .block_ref()
1213 .execution_output
1214 .state
1215 .reverts
1216 .to_plain_state_reverts()
1217 .storage
1218 .into_iter()
1219 .flatten()
1220 .find_map(|revert: PlainStorageRevert| {
1221 if revert.address != address {
1222 return None
1223 }
1224 revert.storage_revert.into_iter().find_map(|(key, value)| {
1225 let plain_key = B256::from(key.to_be_bytes());
1226 (plain_key == storage_key).then(|| StorageEntry {
1227 key: plain_key,
1228 value: value.to_previous_value(),
1229 })
1230 })
1231 });
1232 Ok(changeset)
1233 } else {
1234 let storage_history_exists = self
1235 .storage_provider
1236 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1237 .and_then(|checkpoint| {
1238 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1239 })
1240 .unwrap_or(true);
1241
1242 if !storage_history_exists {
1243 return Err(ProviderError::StateAtBlockPruned(block_number))
1244 }
1245
1246 self.storage_provider.get_storage_before_block(block_number, address, storage_key)
1247 }
1248 }
1249
1250 fn storage_changesets_range(
1251 &self,
1252 range: impl RangeBounds<BlockNumber>,
1253 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1254 let range = to_range(range);
1255 let mut changesets = Vec::new();
1256 let database_start = range.start;
1257 let mut database_end = range.end;
1258
1259 if let Some(head_block) = &self.head_block {
1260 database_end = head_block.anchor().number;
1261
1262 for state in head_block.chain() {
1263 let block_changesets = state
1264 .block_ref()
1265 .execution_output
1266 .state
1267 .reverts
1268 .to_plain_state_reverts()
1269 .storage
1270 .into_iter()
1271 .flatten()
1272 .flat_map(|revert: PlainStorageRevert| {
1273 revert.storage_revert.into_iter().map(move |(key, value)| {
1274 let plain_key = B256::from(key.to_be_bytes());
1275 (
1276 BlockNumberAddress((state.number(), revert.address)),
1277 StorageEntry { key: plain_key, value: value.to_previous_value() },
1278 )
1279 })
1280 });
1281
1282 changesets.extend(block_changesets);
1283 }
1284 }
1285
1286 if database_start < database_end {
1287 let storage_history_exists = self
1288 .storage_provider
1289 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1290 .and_then(|checkpoint| {
1291 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1292 })
1293 .unwrap_or(true);
1294
1295 if !storage_history_exists {
1296 return Err(ProviderError::StateAtBlockPruned(database_start))
1297 }
1298
1299 let db_changesets = self
1300 .storage_provider
1301 .storage_changesets_range(database_start..=database_end - 1)?;
1302 changesets.extend(db_changesets);
1303 }
1304
1305 changesets.sort_by_key(|(block_address, _)| block_address.block_number());
1306
1307 Ok(changesets)
1308 }
1309}
1310
1311impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1312 fn account_block_changeset(
1313 &self,
1314 block_number: BlockNumber,
1315 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1316 if let Some(state) =
1317 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1318 {
1319 let changesets = state
1320 .block_ref()
1321 .execution_output
1322 .state
1323 .reverts
1324 .to_plain_state_reverts()
1325 .accounts
1326 .into_iter()
1327 .flatten()
1328 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1329 .collect();
1330 Ok(changesets)
1331 } else {
1332 let account_history_exists = self
1336 .storage_provider
1337 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1338 .and_then(|checkpoint| {
1339 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1344 })
1345 .unwrap_or(true);
1346
1347 if !account_history_exists {
1348 return Err(ProviderError::StateAtBlockPruned(block_number))
1349 }
1350
1351 self.storage_provider.account_block_changeset(block_number)
1352 }
1353 }
1354
1355 fn get_account_before_block(
1356 &self,
1357 block_number: BlockNumber,
1358 address: Address,
1359 ) -> ProviderResult<Option<AccountBeforeTx>> {
1360 if let Some(state) =
1361 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1362 {
1363 let changeset = state
1365 .block_ref()
1366 .execution_output
1367 .state
1368 .reverts
1369 .to_plain_state_reverts()
1370 .accounts
1371 .into_iter()
1372 .flatten()
1373 .find(|(addr, _)| addr == &address)
1374 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1375 Ok(changeset)
1376 } else {
1377 let account_history_exists = self
1380 .storage_provider
1381 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1382 .and_then(|checkpoint| {
1383 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1388 })
1389 .unwrap_or(true);
1390
1391 if !account_history_exists {
1392 return Err(ProviderError::StateAtBlockPruned(block_number))
1393 }
1394
1395 self.storage_provider.get_account_before_block(block_number, address)
1397 }
1398 }
1399
1400 fn account_changesets_range(
1401 &self,
1402 range: impl core::ops::RangeBounds<BlockNumber>,
1403 ) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
1404 let range = to_range(range);
1405 let mut changesets = Vec::new();
1406 let database_start = range.start;
1407 let mut database_end = range.end;
1408
1409 if let Some(head_block) = &self.head_block {
1411 database_end = head_block.anchor().number;
1413
1414 for state in head_block.chain() {
1415 let block_changesets = state
1417 .block_ref()
1418 .execution_output
1419 .state
1420 .reverts
1421 .to_plain_state_reverts()
1422 .accounts
1423 .into_iter()
1424 .flatten()
1425 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) });
1426
1427 for changeset in block_changesets {
1428 changesets.push((state.number(), changeset));
1429 }
1430 }
1431 }
1432
1433 if database_start < database_end {
1435 let account_history_exists = self
1437 .storage_provider
1438 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1439 .and_then(|checkpoint| {
1440 checkpoint.block_number.map(|checkpoint| database_start > checkpoint)
1441 })
1442 .unwrap_or(true);
1443
1444 if !account_history_exists {
1445 return Err(ProviderError::StateAtBlockPruned(database_start))
1446 }
1447
1448 let db_changesets =
1449 self.storage_provider.account_changesets_range(database_start..database_end)?;
1450 changesets.extend(db_changesets);
1451 }
1452
1453 changesets.sort_by_key(|(block_num, _)| *block_num);
1454
1455 Ok(changesets)
1456 }
1457}
1458
1459impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1460 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1462 let state_provider = self.latest_ref()?;
1464 state_provider.basic_account(address)
1465 }
1466}
1467
1468impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1469 type Receipt = ReceiptTy<N>;
1470
1471 fn get_state(
1481 &self,
1482 block: BlockNumber,
1483 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1484 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1485 let state = state.block_ref().execution_outcome().clone();
1486 Ok(Some(ExecutionOutcome::from((state, block))))
1487 } else {
1488 self.storage_provider.get_state(block)
1489 }
1490 }
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495 use crate::{
1496 providers::blockchain_provider::BlockchainProvider,
1497 test_utils::create_test_provider_factory, BlockWriter,
1498 };
1499 use alloy_eips::BlockHashOrNumber;
1500 use alloy_primitives::B256;
1501 use itertools::Itertools;
1502 use rand::Rng;
1503 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1504 use reth_db_api::models::AccountBeforeTx;
1505 use reth_ethereum_primitives::Block;
1506 use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome};
1507 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1508 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader, StateReader};
1509 use reth_testing_utils::generators::{
1510 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1511 };
1512 use revm_database::BundleState;
1513 use std::{
1514 ops::{Bound, Range, RangeBounds},
1515 sync::Arc,
1516 };
1517
1518 const TEST_BLOCKS_COUNT: usize = 5;
1519
1520 fn random_blocks(
1521 rng: &mut impl Rng,
1522 database_blocks: usize,
1523 in_memory_blocks: usize,
1524 requests_count: Option<Range<u8>>,
1525 withdrawals_count: Option<Range<u8>>,
1526 tx_count: impl RangeBounds<u8>,
1527 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1528 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1529
1530 let tx_start = match tx_count.start_bound() {
1531 Bound::Included(&n) | Bound::Excluded(&n) => n,
1532 Bound::Unbounded => u8::MIN,
1533 };
1534 let tx_end = match tx_count.end_bound() {
1535 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1536 Bound::Unbounded => u8::MAX,
1537 };
1538
1539 let blocks = random_block_range(
1540 rng,
1541 0..=block_range,
1542 BlockRangeParams {
1543 parent: Some(B256::ZERO),
1544 tx_count: tx_start..tx_end,
1545 requests_count,
1546 withdrawals_count,
1547 },
1548 );
1549 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1550 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1551 }
1552
1553 #[test]
1554 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1555 let mut rng = generators::rng();
1557 let factory = create_test_provider_factory();
1558
1559 let blocks = random_block_range(
1561 &mut rng,
1562 0..=10,
1563 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1564 );
1565 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1566
1567 let provider_rw = factory.provider_rw()?;
1569 for block in database_blocks {
1570 provider_rw.insert_block(
1571 &block.clone().try_recover().expect("failed to seal block with senders"),
1572 )?;
1573 }
1574 provider_rw.commit()?;
1575
1576 let provider = BlockchainProvider::new(factory)?;
1578 let consistent_provider = provider.consistent_provider()?;
1579
1580 let first_db_block = database_blocks.first().unwrap();
1582 let first_in_mem_block = in_memory_blocks.first().unwrap();
1583 let last_in_mem_block = in_memory_blocks.last().unwrap();
1584
1585 assert_eq!(
1587 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1588 None
1589 );
1590 assert_eq!(
1591 consistent_provider
1592 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1593 None
1594 );
1595 assert_eq!(
1597 consistent_provider
1598 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1599 None
1600 );
1601
1602 let in_memory_block_senders =
1604 first_in_mem_block.senders().expect("failed to recover senders");
1605 let chain = NewCanonicalChain::Commit {
1606 new: vec![ExecutedBlock {
1607 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1608 first_in_mem_block.clone(),
1609 in_memory_block_senders,
1610 )),
1611 ..Default::default()
1612 }],
1613 };
1614 consistent_provider.canonical_in_memory_state.update_chain(chain);
1615 let consistent_provider = provider.consistent_provider()?;
1616
1617 assert_eq!(
1619 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1620 Some(first_in_mem_block.clone().into_block())
1621 );
1622 assert_eq!(
1623 consistent_provider
1624 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1625 Some(first_in_mem_block.clone().into_block())
1626 );
1627
1628 assert_eq!(
1630 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1631 Some(first_db_block.clone().into_block())
1632 );
1633 assert_eq!(
1634 consistent_provider
1635 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1636 Some(first_db_block.clone().into_block())
1637 );
1638
1639 assert_eq!(
1641 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1642 None
1643 );
1644
1645 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1647 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1648 last_in_mem_block.clone(),
1649 Default::default(),
1650 )),
1651 ..Default::default()
1652 });
1653
1654 assert_eq!(
1656 consistent_provider
1657 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1658 Some(last_in_mem_block.clone_block())
1659 );
1660
1661 Ok(())
1662 }
1663
1664 #[test]
1665 fn test_block_reader_block() -> eyre::Result<()> {
1666 let mut rng = generators::rng();
1668 let factory = create_test_provider_factory();
1669
1670 let blocks = random_block_range(
1672 &mut rng,
1673 0..=10,
1674 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1675 );
1676 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1677
1678 let provider_rw = factory.provider_rw()?;
1680 for block in database_blocks {
1681 provider_rw.insert_block(
1682 &block.clone().try_recover().expect("failed to seal block with senders"),
1683 )?;
1684 }
1685 provider_rw.commit()?;
1686
1687 let provider = BlockchainProvider::new(factory)?;
1689 let consistent_provider = provider.consistent_provider()?;
1690
1691 let first_in_mem_block = in_memory_blocks.first().unwrap();
1693 let first_db_block = database_blocks.first().unwrap();
1695
1696 assert_eq!(
1698 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1699 None
1700 );
1701 assert_eq!(
1702 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1703 None
1704 );
1705
1706 let in_memory_block_senders =
1708 first_in_mem_block.senders().expect("failed to recover senders");
1709 let chain = NewCanonicalChain::Commit {
1710 new: vec![ExecutedBlock {
1711 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1712 first_in_mem_block.clone(),
1713 in_memory_block_senders,
1714 )),
1715 ..Default::default()
1716 }],
1717 };
1718 consistent_provider.canonical_in_memory_state.update_chain(chain);
1719
1720 let consistent_provider = provider.consistent_provider()?;
1721
1722 assert_eq!(
1724 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1725 Some(first_in_mem_block.clone().into_block())
1726 );
1727 assert_eq!(
1728 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1729 Some(first_in_mem_block.clone().into_block())
1730 );
1731
1732 assert_eq!(
1734 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1735 Some(first_db_block.clone().into_block())
1736 );
1737 assert_eq!(
1738 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1739 Some(first_db_block.clone().into_block())
1740 );
1741
1742 Ok(())
1743 }
1744
1745 #[test]
1746 fn test_changeset_reader() -> eyre::Result<()> {
1747 let mut rng = generators::rng();
1748
1749 let (database_blocks, in_memory_blocks) =
1750 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1751
1752 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1753 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1754 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1755
1756 let accounts = random_eoa_accounts(&mut rng, 2);
1757
1758 let (database_changesets, database_state) = random_changeset_range(
1759 &mut rng,
1760 &database_blocks,
1761 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1762 0..0,
1763 0..0,
1764 );
1765 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1766 &mut rng,
1767 &in_memory_blocks,
1768 database_state
1769 .iter()
1770 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1771 0..0,
1772 0..0,
1773 );
1774
1775 let factory = create_test_provider_factory();
1776
1777 let provider_rw = factory.provider_rw()?;
1778 provider_rw.append_blocks_with_state(
1779 database_blocks
1780 .into_iter()
1781 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1782 .collect(),
1783 &ExecutionOutcome {
1784 bundle: BundleState::new(
1785 database_state.into_iter().map(|(address, (account, _))| {
1786 (address, None, Some(account.into()), Default::default())
1787 }),
1788 database_changesets.iter().map(|block_changesets| {
1789 block_changesets.iter().map(|(address, account, _)| {
1790 (*address, Some(Some((*account).into())), [])
1791 })
1792 }),
1793 Vec::new(),
1794 ),
1795 first_block: first_database_block,
1796 ..Default::default()
1797 },
1798 Default::default(),
1799 )?;
1800 provider_rw.commit()?;
1801
1802 let provider = BlockchainProvider::new(factory)?;
1803
1804 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1805 let chain = NewCanonicalChain::Commit {
1806 new: vec![in_memory_blocks
1807 .first()
1808 .map(|block| {
1809 let senders = block.senders().expect("failed to recover senders");
1810 ExecutedBlock {
1811 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1812 block.clone(),
1813 senders,
1814 )),
1815 execution_output: Arc::new(BlockExecutionOutput {
1816 state: BundleState::new(
1817 in_memory_state.into_iter().map(|(address, (account, _))| {
1818 (address, None, Some(account.into()), Default::default())
1819 }),
1820 [in_memory_changesets.iter().map(|(address, account, _)| {
1821 (*address, Some(Some((*account).into())), Vec::new())
1822 })],
1823 [],
1824 ),
1825 result: BlockExecutionResult {
1826 receipts: Default::default(),
1827 requests: Default::default(),
1828 gas_used: 0,
1829 blob_gas_used: 0,
1830 },
1831 }),
1832 ..Default::default()
1833 }
1834 })
1835 .unwrap()],
1836 };
1837 provider.canonical_in_memory_state.update_chain(chain);
1838
1839 let consistent_provider = provider.consistent_provider()?;
1840
1841 assert_eq!(
1842 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1843 database_changesets
1844 .into_iter()
1845 .next_back()
1846 .unwrap()
1847 .into_iter()
1848 .sorted_by_key(|(address, _, _)| *address)
1849 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1850 .collect::<Vec<_>>()
1851 );
1852 assert_eq!(
1853 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1854 in_memory_changesets
1855 .into_iter()
1856 .sorted_by_key(|(address, _, _)| *address)
1857 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1858 .collect::<Vec<_>>()
1859 );
1860
1861 Ok(())
1862 }
1863 #[test]
1864 fn test_get_state_storage_value_plain_state() -> eyre::Result<()> {
1865 use alloy_primitives::U256;
1866 use reth_db_api::{models::StorageSettings, tables, transaction::DbTxMut};
1867 use reth_primitives_traits::StorageEntry;
1868 use reth_storage_api::StorageSettingsCache;
1869 use std::collections::HashMap;
1870
1871 let address = alloy_primitives::Address::with_last_byte(1);
1872 let account = reth_primitives_traits::Account {
1873 nonce: 1,
1874 balance: U256::from(1000),
1875 bytecode_hash: None,
1876 };
1877 let slot = U256::from(0x42);
1878 let slot_b256 = B256::from(slot);
1879
1880 let mut rng = generators::rng();
1881 let factory = create_test_provider_factory();
1882 factory.set_storage_settings_cache(StorageSettings::v1());
1883
1884 let blocks = random_block_range(
1885 &mut rng,
1886 0..=1,
1887 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1888 );
1889
1890 let provider_rw = factory.provider_rw()?;
1891 provider_rw.append_blocks_with_state(
1892 blocks
1893 .into_iter()
1894 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1895 .collect(),
1896 &ExecutionOutcome {
1897 bundle: BundleState::new(
1898 [(address, None, Some(account.into()), {
1899 let mut s = HashMap::default();
1900 s.insert(slot, (U256::ZERO, U256::from(100)));
1901 s
1902 })],
1903 [
1904 Vec::new(),
1905 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
1906 ],
1907 [],
1908 ),
1909 first_block: 0,
1910 ..Default::default()
1911 },
1912 Default::default(),
1913 )?;
1914
1915 provider_rw.tx_ref().put::<tables::PlainStorageState>(
1916 address,
1917 StorageEntry { key: slot_b256, value: U256::from(100) },
1918 )?;
1919 provider_rw.tx_ref().put::<tables::PlainAccountState>(address, account)?;
1920
1921 provider_rw.commit()?;
1922
1923 let provider = BlockchainProvider::new(factory)?;
1924 let consistent_provider = provider.consistent_provider()?;
1925
1926 let outcome = consistent_provider.get_state(1)?.expect("should return execution outcome");
1927
1928 let state = &outcome.bundle.state;
1929 let account_state = state.get(&address).expect("should have account in bundle state");
1930 let storage = &account_state.storage;
1931
1932 let storage_slot = storage.get(&slot).expect("should have the slot in storage");
1933
1934 assert_eq!(
1935 storage_slot.present_value,
1936 U256::from(100),
1937 "present_value should be 100 (the actual value in PlainStorageState)"
1938 );
1939
1940 Ok(())
1941 }
1942
1943 #[test]
1944 fn test_storage_changeset_consistent_keys_plain_state() -> eyre::Result<()> {
1945 use alloy_primitives::U256;
1946 use reth_db_api::models::StorageSettings;
1947 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
1948 use std::collections::HashMap;
1949
1950 let mut rng = generators::rng();
1951 let factory = create_test_provider_factory();
1952 factory.set_storage_settings_cache(StorageSettings::v1());
1953
1954 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 1, 1, None, None, 0..1);
1955
1956 let address = alloy_primitives::Address::with_last_byte(1);
1957 let account = reth_primitives_traits::Account {
1958 nonce: 1,
1959 balance: U256::from(1000),
1960 bytecode_hash: None,
1961 };
1962 let slot = U256::from(0x42);
1963
1964 let provider_rw = factory.provider_rw()?;
1965 provider_rw.append_blocks_with_state(
1966 database_blocks
1967 .into_iter()
1968 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1969 .collect(),
1970 &ExecutionOutcome {
1971 bundle: BundleState::new(
1972 [(address, None, Some(account.into()), {
1973 let mut s = HashMap::default();
1974 s.insert(slot, (U256::ZERO, U256::from(100)));
1975 s
1976 })],
1977 [[(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])]],
1978 [],
1979 ),
1980 first_block: 0,
1981 ..Default::default()
1982 },
1983 Default::default(),
1984 )?;
1985 provider_rw.commit()?;
1986
1987 let provider = BlockchainProvider::new(factory)?;
1988
1989 let in_mem_block = in_memory_blocks.first().unwrap();
1990 let senders = in_mem_block.senders().expect("failed to recover senders");
1991 let chain = NewCanonicalChain::Commit {
1992 new: vec![ExecutedBlock {
1993 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1994 in_mem_block.clone(),
1995 senders,
1996 )),
1997 execution_output: Arc::new(BlockExecutionOutput {
1998 state: BundleState::new(
1999 [(address, None, Some(account.into()), {
2000 let mut s = HashMap::default();
2001 s.insert(slot, (U256::from(100), U256::from(200)));
2002 s
2003 })],
2004 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2005 [],
2006 ),
2007 result: BlockExecutionResult {
2008 receipts: Default::default(),
2009 requests: Default::default(),
2010 gas_used: 0,
2011 blob_gas_used: 0,
2012 },
2013 }),
2014 ..Default::default()
2015 }],
2016 };
2017 provider.canonical_in_memory_state.update_chain(chain);
2018
2019 let consistent_provider = provider.consistent_provider()?;
2020
2021 let db_changeset = consistent_provider.storage_changeset(0)?;
2022 let mem_changeset = consistent_provider.storage_changeset(1)?;
2023
2024 let slot_b256 = B256::from(slot);
2025
2026 assert_eq!(db_changeset.len(), 1);
2027 assert_eq!(mem_changeset.len(), 1);
2028
2029 let db_key = db_changeset[0].1.key;
2030 let mem_key = mem_changeset[0].1.key;
2031
2032 assert_eq!(db_key, slot_b256, "DB changeset should use plain (unhashed) key");
2033 assert_eq!(mem_key, slot_b256, "In-memory changeset should use plain (unhashed) key");
2034 assert_eq!(
2035 db_key, mem_key,
2036 "DB and in-memory changesets should return the same key format (plain) for the same logical slot"
2037 );
2038
2039 Ok(())
2040 }
2041
2042 #[test]
2043 fn test_storage_changesets_range_consistent_keys_plain_state() -> eyre::Result<()> {
2044 use alloy_primitives::U256;
2045 use reth_db_api::models::StorageSettings;
2046 use reth_storage_api::{StorageChangeSetReader, StorageSettingsCache};
2047 use std::collections::HashMap;
2048
2049 let mut rng = generators::rng();
2050 let factory = create_test_provider_factory();
2051 factory.set_storage_settings_cache(StorageSettings::v1());
2052
2053 let (database_blocks, in_memory_blocks) = random_blocks(&mut rng, 2, 1, None, None, 0..1);
2054
2055 let address = alloy_primitives::Address::with_last_byte(1);
2056 let account = reth_primitives_traits::Account {
2057 nonce: 1,
2058 balance: U256::from(1000),
2059 bytecode_hash: None,
2060 };
2061 let slot = U256::from(0x42);
2062
2063 let provider_rw = factory.provider_rw()?;
2064 provider_rw.append_blocks_with_state(
2065 database_blocks
2066 .into_iter()
2067 .map(|b| b.try_recover().expect("failed to seal block with senders"))
2068 .collect(),
2069 &ExecutionOutcome {
2070 bundle: BundleState::new(
2071 [(address, None, Some(account.into()), {
2072 let mut s = HashMap::default();
2073 s.insert(slot, (U256::ZERO, U256::from(100)));
2074 s
2075 })],
2076 vec![
2077 vec![(address, Some(Some(account.into())), vec![(slot, U256::ZERO)])],
2078 vec![],
2079 ],
2080 [],
2081 ),
2082 first_block: 0,
2083 ..Default::default()
2084 },
2085 Default::default(),
2086 )?;
2087 provider_rw.commit()?;
2088
2089 let provider = BlockchainProvider::new(factory)?;
2090
2091 let in_mem_block = in_memory_blocks.first().unwrap();
2092 let senders = in_mem_block.senders().expect("failed to recover senders");
2093 let chain = NewCanonicalChain::Commit {
2094 new: vec![ExecutedBlock {
2095 recovered_block: Arc::new(RecoveredBlock::new_sealed(
2096 in_mem_block.clone(),
2097 senders,
2098 )),
2099 execution_output: Arc::new(BlockExecutionOutput {
2100 state: BundleState::new(
2101 [(address, None, Some(account.into()), {
2102 let mut s = HashMap::default();
2103 s.insert(slot, (U256::from(100), U256::from(200)));
2104 s
2105 })],
2106 [[(address, Some(Some(account.into())), vec![(slot, U256::from(100))])]],
2107 [],
2108 ),
2109 result: BlockExecutionResult {
2110 receipts: Default::default(),
2111 requests: Default::default(),
2112 gas_used: 0,
2113 blob_gas_used: 0,
2114 },
2115 }),
2116 ..Default::default()
2117 }],
2118 };
2119 provider.canonical_in_memory_state.update_chain(chain);
2120
2121 let consistent_provider = provider.consistent_provider()?;
2122
2123 let all_changesets = consistent_provider.storage_changesets_range(0..=2)?;
2124
2125 assert_eq!(all_changesets.len(), 2, "should have one changeset entry per block");
2126
2127 let slot_b256 = B256::from(slot);
2128 let keys: Vec<B256> = all_changesets.iter().map(|(_, entry)| entry.key).collect();
2129
2130 assert_eq!(
2131 keys[0], keys[1],
2132 "same logical slot should produce identical keys whether from DB or memory"
2133 );
2134 assert_eq!(
2135 keys[0], slot_b256,
2136 "keys should be plain/unhashed when use_hashed_state is false"
2137 );
2138
2139 Ok(())
2140 }
2141}