1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eip7928::bal::DecodedBal;
7use alloy_eips::BlockHashOrNumber;
8use alloy_primitives::{Address, TxHash, B256};
9use futures::{stream::FuturesOrdered, Stream, StreamExt};
10use reth_chain_state::CanonStateNotification;
11use reth_errors::{ProviderError, ProviderResult};
12use reth_execution_types::Chain;
13use reth_primitives_traits::{Block, BlockBody, InMemorySize, NodePrimitives, RecoveredBlock};
14use reth_revm::{
15 bytecode::Bytecode,
16 primitives::{StorageKey, StorageValue},
17 state::bal::{
18 AccountBal as RevmAccountBal, AccountInfoBal as RevmAccountInfoBal, Bal as RevmBal,
19 BalWrites as RevmBalWrites, StorageBal as RevmStorageBal,
20 },
21};
22use reth_storage_api::{BalProvider, BlockReader, TransactionVariant};
23use reth_tasks::Runtime;
24use schnellru::{ByLength, Limiter, LruMap};
25use std::{
26 future::Future,
27 pin::Pin,
28 sync::Arc,
29 task::{Context, Poll},
30};
31use tokio::sync::{
32 mpsc::{unbounded_channel, UnboundedSender},
33 oneshot, Semaphore,
34};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36
37pub mod config;
38pub mod db;
39pub mod metrics;
40pub mod multi_consumer;
41
42type BlockWithSendersResponseSender<B> =
44 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
45
46type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
48
49type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
50
51type CachedBlockAndReceiptsResponseSender<B, R> =
52 oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
53
54type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
56
57type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
59
60type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
62
63type BalResponseSender = oneshot::Sender<ProviderResult<Option<CachedRevmBal>>>;
65
66type BlockLruCache<B, L> =
67 MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
68
69type ReceiptsLruCache<R, L> =
70 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
71
72type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
73
74type BalLruCache<L> = MultiConsumerLruCache<B256, CachedRevmBal, L, BalResponseSender>;
75
76#[derive(Debug)]
81pub struct EthStateCache<N: NodePrimitives> {
82 to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
83}
84
85impl<N: NodePrimitives> Clone for EthStateCache<N> {
86 fn clone(&self) -> Self {
87 Self { to_service: self.to_service.clone() }
88 }
89}
90
91impl<N: NodePrimitives> EthStateCache<N> {
92 fn create<Provider>(
94 provider: Provider,
95 action_task_spawner: Runtime,
96 config: EthStateCacheConfig,
97 ) -> (Self, EthStateCacheService<Provider, Runtime>)
98 where
99 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + BalProvider,
100 {
101 let EthStateCacheConfig {
102 max_blocks,
103 max_receipts,
104 max_headers,
105 max_bals,
106 max_concurrent_db_requests,
107 max_cached_tx_hashes,
108 } = config;
109 let (to_service, rx) = unbounded_channel();
110
111 let service = EthStateCacheService {
112 provider,
113 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
114 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
115 headers_cache: HeaderLruCache::new(max_headers, "headers"),
116 bal_cache: BalLruCache::new(max_bals, "bals"),
117 action_tx: to_service.clone(),
118 action_rx: UnboundedReceiverStream::new(rx),
119 action_task_spawner,
120 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_requests)),
121 tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
122 };
123 let cache = Self { to_service };
124 (cache, service)
125 }
126
127 pub fn spawn_with<Provider>(
132 provider: Provider,
133 config: EthStateCacheConfig,
134 executor: Runtime,
135 ) -> Self
136 where
137 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>
138 + BalProvider
139 + Clone
140 + Unpin
141 + 'static,
142 {
143 let (this, service) = Self::create(provider, executor.clone(), config);
144 executor.spawn_critical_task("eth state cache", service);
145 this
146 }
147
148 pub async fn get_recovered_block(
152 &self,
153 block_hash: B256,
154 ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
155 let (response_tx, rx) = oneshot::channel();
156 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
157 rx.await.map_err(|_| CacheServiceUnavailable)?
158 }
159
160 pub async fn get_receipts(
164 &self,
165 block_hash: B256,
166 ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
167 let (response_tx, rx) = oneshot::channel();
168 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
169 rx.await.map_err(|_| CacheServiceUnavailable)?
170 }
171
172 pub async fn get_block_and_receipts(
174 &self,
175 block_hash: B256,
176 ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
177 let block = self.get_recovered_block(block_hash);
178 let receipts = self.get_receipts(block_hash);
179
180 let (block, receipts) = futures::try_join!(block, receipts)?;
181
182 Ok(block.zip(receipts))
183 }
184
185 pub async fn get_receipts_and_maybe_block(
187 &self,
188 block_hash: B256,
189 ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
190 let (response_tx, rx) = oneshot::channel();
191 let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
192
193 let receipts = self.get_receipts(block_hash);
194
195 let (receipts, block) = futures::join!(receipts, rx);
196
197 let block = block.map_err(|_| CacheServiceUnavailable)?;
198 Ok(receipts?.map(|r| (r, block)))
199 }
200
201 pub async fn maybe_cached_block_and_receipts(
203 &self,
204 block_hash: B256,
205 ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
206 let (response_tx, rx) = oneshot::channel();
207 let _ = self
208 .to_service
209 .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
210 rx.await.map_err(|_| CacheServiceUnavailable.into())
211 }
212
213 #[expect(clippy::type_complexity)]
215 pub fn get_receipts_and_maybe_block_stream<'a>(
216 &'a self,
217 hashes: Vec<B256>,
218 ) -> impl Stream<
219 Item = ProviderResult<
220 Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
221 >,
222 > + 'a {
223 let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
224
225 futures.collect::<FuturesOrdered<_>>()
226 }
227
228 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
232 let (response_tx, rx) = oneshot::channel();
233 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
234 rx.await.map_err(|_| CacheServiceUnavailable)?
235 }
236
237 pub async fn get_cached_parent_blocks(
245 &self,
246 block_hash: B256,
247 max_blocks: usize,
248 ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
249 let (response_tx, rx) = oneshot::channel();
250 let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
251 block_hash,
252 max_blocks,
253 response_tx,
254 });
255
256 let blocks = rx.await.unwrap_or_default();
257 if blocks.is_empty() {
258 None
259 } else {
260 Some(blocks)
261 }
262 }
263
264 pub async fn get_transaction_by_hash(
269 &self,
270 tx_hash: TxHash,
271 ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
272 let (response_tx, rx) = oneshot::channel();
273 let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
274 rx.await.ok()?
275 }
276
277 pub async fn get_bal(
281 &self,
282 block_hash: B256,
283 ) -> ProviderResult<Option<Arc<DecodedBal<Arc<RevmBal>>>>> {
284 let (response_tx, rx) = oneshot::channel();
285 let _ = self.to_service.send(CacheAction::GetBal { block_hash, response_tx });
286 rx.await
287 .map_err(|_| CacheServiceUnavailable)?
288 .map(|maybe_bal| maybe_bal.map(|cached| cached.0))
289 }
290}
291#[derive(Debug, thiserror::Error)]
293#[error("cache service task stopped")]
294pub struct CacheServiceUnavailable;
295
296impl From<CacheServiceUnavailable> for ProviderError {
297 fn from(err: CacheServiceUnavailable) -> Self {
298 Self::other(err)
299 }
300}
301
302#[must_use = "Type does nothing unless spawned"]
319pub(crate) struct EthStateCacheService<
320 Provider,
321 Tasks,
322 LimitBlocks = ByLength,
323 LimitReceipts = ByLength,
324 LimitHeaders = ByLength,
325 LimitBals = ByLength,
326> where
327 Provider: BlockReader + BalProvider,
328 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
329 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
330 LimitHeaders: Limiter<B256, Provider::Header>,
331 LimitBals: Limiter<B256, CachedRevmBal>,
332{
333 provider: Provider,
335 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
337 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
339 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
344 bal_cache: BalLruCache<LimitBals>,
346 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
348 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
350 action_task_spawner: Tasks,
352 rate_limiter: Arc<Semaphore>,
356 tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
358}
359
360impl<Provider> EthStateCacheService<Provider, Runtime>
361where
362 Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
363{
364 fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
366 let block_hash = block.hash();
367 for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
368 self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
369 }
370 }
371
372 fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
374 for tx in block.body().transactions() {
375 self.tx_hash_index.remove(tx.tx_hash());
376 }
377 }
378
379 fn on_new_block(
380 &mut self,
381 block_hash: B256,
382 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
383 ) {
384 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
385 for tx in queued {
387 let _ = tx.send(res.clone());
388 }
389 }
390
391 if let Ok(Some(block)) = res {
393 self.full_block_cache.insert(block_hash, block);
394 }
395 }
396
397 fn on_new_receipts(
398 &mut self,
399 block_hash: B256,
400 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
401 ) {
402 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
403 for tx in queued {
405 let _ = tx.send(res.clone());
406 }
407 }
408
409 if let Ok(Some(receipts)) = res {
411 self.receipts_cache.insert(block_hash, receipts);
412 }
413 }
414
415 fn on_new_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
416 if let Some(queued) = self.bal_cache.remove(&block_hash) {
417 for tx in queued {
418 let _ = tx.send(res.clone());
419 }
420 }
421
422 if let Ok(Some(bal)) = res {
423 self.bal_cache.insert(block_hash, bal);
424 }
425 }
426
427 fn on_reorg_block(
428 &mut self,
429 block_hash: B256,
430 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
431 ) {
432 let res = res.map(|b| b.map(Arc::new));
433 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
434 for tx in queued {
436 let _ = tx.send(res.clone());
437 }
438 }
439 }
440
441 fn on_reorg_receipts(
442 &mut self,
443 block_hash: B256,
444 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
445 ) {
446 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
447 for tx in queued {
449 let _ = tx.send(res.clone());
450 }
451 }
452 }
453
454 fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
455 if let Some(queued) = self.headers_cache.remove(&block_hash) {
456 for tx in queued {
458 let _ = tx.send(res.clone());
459 }
460 }
461 }
462
463 fn on_reorg_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
464 if let Some(queued) = self.bal_cache.remove(&block_hash) {
465 for tx in queued {
466 let _ = tx.send(res.clone());
467 }
468 }
469 }
470
471 fn shrink_queues(&mut self) {
473 let min_capacity = 2;
474 self.full_block_cache.shrink_to(min_capacity);
475 self.receipts_cache.shrink_to(min_capacity);
476 self.headers_cache.shrink_to(min_capacity);
477 self.bal_cache.shrink_to(min_capacity);
478 }
479
480 fn update_cached_metrics(&self) {
481 self.full_block_cache.update_cached_metrics();
482 self.receipts_cache.update_cached_metrics();
483 self.headers_cache.update_cached_metrics();
484 self.bal_cache.update_cached_metrics();
485 }
486}
487
488impl<Provider> Future for EthStateCacheService<Provider, Runtime>
489where
490 Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
491{
492 type Output = ();
493
494 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
495 let this = self.get_mut();
496
497 loop {
498 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
499 this.shrink_queues();
501 return Poll::Pending;
502 };
503
504 match action {
505 None => {
506 unreachable!("can't close")
507 }
508 Some(action) => {
509 match action {
510 CacheAction::GetCachedBlock { block_hash, response_tx } => {
511 let _ =
512 response_tx.send(this.full_block_cache.get(&block_hash).cloned());
513 }
514 CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
515 let block = this.full_block_cache.get(&block_hash).cloned();
516 let receipts = this.receipts_cache.get(&block_hash).cloned();
517 let _ = response_tx.send((block, receipts));
518 }
519 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
520 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
521 let _ = response_tx.send(Ok(Some(block)));
522 continue
523 }
524
525 if this.full_block_cache.queue(block_hash, response_tx) {
527 let provider = this.provider.clone();
528 let action_tx = this.action_tx.clone();
529 let rate_limiter = this.rate_limiter.clone();
530 let mut action_sender =
531 ActionSender::new(CacheKind::Block, block_hash, action_tx);
532 this.action_task_spawner.spawn_blocking_task(async move {
533 let _permit = rate_limiter.acquire().await;
535 let block_sender = provider
538 .sealed_block_with_senders(
539 BlockHashOrNumber::Hash(block_hash),
540 TransactionVariant::WithHash,
541 )
542 .map(|maybe_block| maybe_block.map(Arc::new));
543 action_sender.send_block(block_sender);
544 });
545 }
546 }
547 CacheAction::GetReceipts { block_hash, response_tx } => {
548 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
550 let _ = response_tx.send(Ok(Some(receipts)));
551 continue
552 }
553
554 if this.receipts_cache.queue(block_hash, response_tx) {
556 let provider = this.provider.clone();
557 let action_tx = this.action_tx.clone();
558 let rate_limiter = this.rate_limiter.clone();
559 let mut action_sender =
560 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
561 this.action_task_spawner.spawn_blocking_task(async move {
562 let _permit = rate_limiter.acquire().await;
564 let res = provider
565 .receipts_by_block(block_hash.into())
566 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
567
568 action_sender.send_receipts(res);
569 });
570 }
571 }
572 CacheAction::GetHeader { block_hash, response_tx } => {
573 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
575 let _ = response_tx.send(Ok(header));
576 continue
577 }
578
579 if let Some(block) = this.full_block_cache.get(&block_hash) {
581 let _ = response_tx.send(Ok(block.clone_header()));
582 continue
583 }
584
585 if this.headers_cache.queue(block_hash, response_tx) {
588 let provider = this.provider.clone();
589 let action_tx = this.action_tx.clone();
590 let rate_limiter = this.rate_limiter.clone();
591 let mut action_sender =
592 ActionSender::new(CacheKind::Header, block_hash, action_tx);
593 this.action_task_spawner.spawn_blocking_task(async move {
594 let _permit = rate_limiter.acquire().await;
596 let header = provider.header(block_hash).and_then(|header| {
597 header.ok_or_else(|| {
598 ProviderError::HeaderNotFound(block_hash.into())
599 })
600 });
601 action_sender.send_header(header);
602 });
603 }
604 }
605 CacheAction::GetBal { block_hash, response_tx } => {
606 if let Some(bal) = this.bal_cache.get(&block_hash).cloned() {
607 let _ = response_tx.send(Ok(Some(bal)));
608 continue
609 }
610
611 if this.bal_cache.queue(block_hash, response_tx) {
612 let provider = this.provider.clone();
613 let action_tx = this.action_tx.clone();
614 let rate_limiter = this.rate_limiter.clone();
615 let mut action_sender =
616 ActionSender::new(CacheKind::Bal, block_hash, action_tx);
617 this.action_task_spawner.spawn_blocking_task(async move {
618 let _permit = rate_limiter.acquire().await;
619 let res = provider
620 .bal_store()
621 .revm_bal_by_hash(block_hash)
622 .map(|maybe_bal| maybe_bal.map(CachedRevmBal::new));
623 action_sender.send_bal(res);
624 });
625 }
626 }
627 CacheAction::ReceiptsResult { block_hash, res } => {
628 this.on_new_receipts(block_hash, res);
629 }
630 CacheAction::BalResult { block_hash, res } => {
631 this.on_new_bal(block_hash, res);
632 }
633 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
634 Ok(Some(block_with_senders)) => {
635 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
636 }
637 Ok(None) => {
638 this.on_new_block(block_hash, Ok(None));
639 }
640 Err(e) => {
641 this.on_new_block(block_hash, Err(e));
642 }
643 },
644 CacheAction::HeaderResult { block_hash, res } => {
645 let res = *res;
646 if let Some(queued) = this.headers_cache.remove(&block_hash) {
647 for tx in queued {
649 let _ = tx.send(res.clone());
650 }
651 }
652
653 if let Ok(data) = res {
655 this.headers_cache.insert(block_hash, data);
656 }
657 }
658 CacheAction::CacheNewCanonicalChain { chain_change } => {
659 for block in chain_change.blocks {
660 this.index_block_transactions(&block);
662 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
663 }
664
665 for block_receipts in chain_change.receipts {
666 this.on_new_receipts(
667 block_receipts.block_hash,
668 Ok(Some(Arc::new(block_receipts.receipts))),
669 );
670 }
671 }
672 CacheAction::RemoveReorgedChain { chain_change } => {
673 for block in chain_change.blocks {
674 let block_hash = block.hash();
675 let header = block.clone_header();
676 this.remove_block_transactions(&block);
678 this.on_reorg_block(block_hash, Ok(Some(block)));
679 this.on_reorg_header(block_hash, Ok(header));
680 this.on_reorg_bal(block_hash, Ok(None));
681 }
682
683 for block_receipts in chain_change.receipts {
684 this.on_reorg_receipts(
685 block_receipts.block_hash,
686 Ok(Some(Arc::new(block_receipts.receipts))),
687 );
688 }
689 }
690 CacheAction::GetCachedParentBlocks {
691 block_hash,
692 max_blocks,
693 response_tx,
694 } => {
695 let mut blocks = Vec::new();
696 let mut current_hash = block_hash;
697
698 while blocks.len() < max_blocks {
700 if let Some(block) =
701 this.full_block_cache.get(¤t_hash).cloned()
702 {
703 current_hash = block.header().parent_hash();
705 blocks.push(block);
706 } else {
707 break;
709 }
710 }
711
712 let _ = response_tx.send(blocks);
713 }
714 CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
715 let result =
716 this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
717 let block = this.full_block_cache.get(block_hash).cloned()?;
718 let receipts = this.receipts_cache.get(block_hash).cloned();
719 Some(CachedTransaction::new(block, *idx, receipts))
720 });
721 let _ = response_tx.send(result);
722 }
723 };
724 this.update_cached_metrics();
725 }
726 }
727 }
728 }
729}
730
731enum CacheAction<B: Block, R> {
733 GetBlockWithSenders {
734 block_hash: B256,
735 response_tx: BlockWithSendersResponseSender<B>,
736 },
737 GetHeader {
738 block_hash: B256,
739 response_tx: HeaderResponseSender<B::Header>,
740 },
741 GetReceipts {
742 block_hash: B256,
743 response_tx: ReceiptsResponseSender<R>,
744 },
745 GetBal {
746 block_hash: B256,
747 response_tx: BalResponseSender,
748 },
749 GetCachedBlock {
750 block_hash: B256,
751 response_tx: CachedBlockResponseSender<B>,
752 },
753 GetCachedBlockAndReceipts {
754 block_hash: B256,
755 response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
756 },
757 BlockWithSendersResult {
758 block_hash: B256,
759 res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
760 },
761 ReceiptsResult {
762 block_hash: B256,
763 res: ProviderResult<Option<Arc<Vec<R>>>>,
764 },
765 HeaderResult {
766 block_hash: B256,
767 res: Box<ProviderResult<B::Header>>,
768 },
769 BalResult {
770 block_hash: B256,
771 res: ProviderResult<Option<CachedRevmBal>>,
772 },
773 CacheNewCanonicalChain {
774 chain_change: ChainChange<B, R>,
775 },
776 RemoveReorgedChain {
777 chain_change: ChainChange<B, R>,
778 },
779 GetCachedParentBlocks {
780 block_hash: B256,
781 max_blocks: usize,
782 response_tx: CachedParentBlocksResponseSender<B>,
783 },
784 GetTransactionByHash {
786 tx_hash: TxHash,
787 response_tx: TransactionHashResponseSender<B, R>,
788 },
789}
790
791struct BlockReceipts<R> {
792 block_hash: B256,
793 receipts: Vec<R>,
794}
795
796struct ChainChange<B: Block, R> {
798 blocks: Vec<RecoveredBlock<B>>,
799 receipts: Vec<BlockReceipts<R>>,
800}
801
802impl<B: Block, R: Clone> ChainChange<B, R> {
803 fn new<N>(chain: Arc<Chain<N>>) -> Self
804 where
805 N: NodePrimitives<Block = B, Receipt = R>,
806 {
807 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
808 .blocks_and_receipts()
809 .map(|(block, receipts)| {
810 let block_receipts =
811 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
812 (block.clone(), block_receipts)
813 })
814 .unzip();
815 Self { blocks, receipts }
816 }
817}
818
819#[derive(Copy, Clone, Debug)]
821enum CacheKind {
822 Block,
823 Receipt,
824 Header,
825 Bal,
826}
827
828#[derive(Debug)]
833struct ActionSender<B: Block, R: Send + Sync> {
834 kind: CacheKind,
835 blockhash: B256,
836 tx: Option<UnboundedSender<CacheAction<B, R>>>,
837}
838
839impl<R: Send + Sync, B: Block> ActionSender<B, R> {
840 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
841 Self { kind, blockhash, tx: Some(tx) }
842 }
843
844 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
845 if let Some(tx) = self.tx.take() {
846 let _ = tx.send(CacheAction::BlockWithSendersResult {
847 block_hash: self.blockhash,
848 res: block_sender,
849 });
850 }
851 }
852
853 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
854 if let Some(tx) = self.tx.take() {
855 let _ =
856 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
857 }
858 }
859
860 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
861 if let Some(tx) = self.tx.take() {
862 let _ = tx.send(CacheAction::HeaderResult {
863 block_hash: self.blockhash,
864 res: Box::new(header),
865 });
866 }
867 }
868
869 fn send_bal(&mut self, bal: Result<Option<CachedRevmBal>, ProviderError>) {
870 if let Some(tx) = self.tx.take() {
871 let _ = tx.send(CacheAction::BalResult { block_hash: self.blockhash, res: bal });
872 }
873 }
874}
875impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
876 fn drop(&mut self) {
877 if let Some(tx) = self.tx.take() {
878 let msg = match self.kind {
879 CacheKind::Block => CacheAction::BlockWithSendersResult {
880 block_hash: self.blockhash,
881 res: Err(CacheServiceUnavailable.into()),
882 },
883 CacheKind::Receipt => CacheAction::ReceiptsResult {
884 block_hash: self.blockhash,
885 res: Err(CacheServiceUnavailable.into()),
886 },
887 CacheKind::Header => CacheAction::HeaderResult {
888 block_hash: self.blockhash,
889 res: Box::new(Err(CacheServiceUnavailable.into())),
890 },
891 CacheKind::Bal => CacheAction::BalResult {
892 block_hash: self.blockhash,
893 res: Err(CacheServiceUnavailable.into()),
894 },
895 };
896 let _ = tx.send(msg);
897 }
898 }
899}
900
901pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
906 eth_state_cache: EthStateCache<N>,
907 mut events: St,
908) where
909 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
910{
911 while let Some(event) = events.next().await {
912 if let Some(reverted) = event.reverted() {
913 let chain_change = ChainChange::new(reverted);
914
915 let _ =
916 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
917 }
918
919 let chain_change = ChainChange::new(event.committed());
920
921 let _ =
922 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
923 }
924}
925
926#[derive(Clone, Debug)]
928pub(crate) struct CachedRevmBal(Arc<DecodedBal<Arc<RevmBal>>>);
929
930impl CachedRevmBal {
931 #[inline]
933 fn new(bal: DecodedBal<Arc<RevmBal>>) -> Self {
934 Self(Arc::new(bal))
935 }
936}
937
938impl InMemorySize for CachedRevmBal {
939 fn size(&self) -> usize {
940 core::mem::size_of::<Self>() + decoded_revm_bal_size(&self.0)
941 }
942}
943
944fn decoded_revm_bal_size(bal: &DecodedBal<Arc<RevmBal>>) -> usize {
945 core::mem::size_of::<DecodedBal<Arc<RevmBal>>>() +
946 bal.as_raw().len() +
947 revm_bal_size(bal.as_bal())
948}
949
950fn revm_bal_size(bal: &Arc<RevmBal>) -> usize {
951 core::mem::size_of::<RevmBal>() +
952 bal.accounts.capacity() * core::mem::size_of::<(Address, RevmAccountBal)>() +
953 bal.accounts.values().map(revm_account_bal_heap_size).sum::<usize>()
954}
955
956fn revm_account_bal_heap_size(account: &RevmAccountBal) -> usize {
957 revm_account_info_bal_heap_size(&account.account_info) +
958 revm_storage_bal_heap_size(&account.storage)
959}
960
961fn revm_account_info_bal_heap_size(account_info: &RevmAccountInfoBal) -> usize {
962 revm_bal_writes_heap_size(&account_info.nonce, |_| 0) +
963 revm_bal_writes_heap_size(&account_info.balance, |_| 0) +
964 revm_bal_writes_heap_size(&account_info.code, revm_code_write_heap_size)
965}
966
967fn revm_storage_bal_heap_size(storage: &RevmStorageBal) -> usize {
968 storage.storage.len() * core::mem::size_of::<(StorageKey, RevmBalWrites<StorageValue>)>() +
969 storage
970 .storage
971 .values()
972 .map(|writes| revm_bal_writes_heap_size(writes, |_| 0))
973 .sum::<usize>()
974}
975
976fn revm_bal_writes_heap_size<T, F>(writes: &RevmBalWrites<T>, mut item_heap_size: F) -> usize
977where
978 T: PartialEq + Clone,
979 F: FnMut(&T) -> usize,
980{
981 writes.writes.capacity() * core::mem::size_of::<(u64, T)>() +
982 writes.writes.iter().map(|(_, item)| item_heap_size(item)).sum::<usize>()
983}
984
985fn revm_code_write_heap_size((_, bytecode): &(B256, Bytecode)) -> usize {
986 bytecode.bytes_ref().len()
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992 use alloy_consensus::{transaction::TransactionMeta, Header};
993 use alloy_eip7928::BlockAccessIndex;
994 use alloy_eips::{BlockHashOrNumber, NumHash};
995 use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, Signature, TxHash, TxNumber};
996 use core::ops::{RangeBounds, RangeInclusive};
997 use reth_db_models::StoredBlockBodyIndices;
998 use reth_ethereum_primitives::{
999 Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
1000 };
1001 use reth_primitives_traits::{RecoveredBlock, SealedHeader};
1002 use reth_storage_api::{
1003 noop::NoopProvider, BalProvider, BalStore, BalStoreHandle, BlockBodyIndicesProvider,
1004 BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, ReceiptProvider,
1005 TransactionVariant, TransactionsProvider,
1006 };
1007 use std::sync::atomic::{AtomicUsize, Ordering};
1008
1009 fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
1010 let (_cache, service) = EthStateCache::<EthPrimitives>::create(
1011 NoopProvider::default(),
1012 Runtime::test(),
1013 EthStateCacheConfig {
1014 max_blocks: 4,
1015 max_receipts: 4,
1016 max_headers: 4,
1017 max_bals: 4,
1018 max_concurrent_db_requests: 1,
1019 max_cached_tx_hashes: 16,
1020 },
1021 );
1022 service
1023 }
1024
1025 fn test_decoded_revm_bal() -> DecodedBal<Arc<RevmBal>> {
1026 DecodedBal::new(Arc::new(RevmBal::default()), Bytes::from_static(&[0xc0]))
1027 }
1028
1029 fn test_block() -> RecoveredBlock<Block> {
1030 RecoveredBlock::new_unhashed(
1031 Block {
1032 header: Header { number: 1, ..Default::default() },
1033 body: BlockBody {
1034 transactions: vec![TransactionSigned::new_unhashed(
1035 Transaction::Legacy(Default::default()),
1036 Signature::test_signature(),
1037 )],
1038 ..Default::default()
1039 },
1040 },
1041 vec![Address::ZERO],
1042 )
1043 }
1044
1045 #[test]
1046 fn reorg_evicts_cached_headers() {
1047 let mut service = test_service();
1048 let block_hash = B256::repeat_byte(0x11);
1049
1050 assert!(service
1051 .headers_cache
1052 .insert(block_hash, Header { number: 42, ..Default::default() }));
1053 assert!(service.headers_cache.get(&block_hash).is_some());
1054
1055 service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
1056
1057 assert!(service.headers_cache.get(&block_hash).is_none());
1058 }
1059
1060 #[test]
1061 fn reorg_forwards_header_to_queued_requests() {
1062 let mut service = test_service();
1063 let block_hash = B256::repeat_byte(0x22);
1064 let (response_tx, mut response_rx) = oneshot::channel();
1065 let header = Header { number: 7, ..Default::default() };
1066
1067 assert!(service.headers_cache.queue(block_hash, response_tx));
1068
1069 service.on_reorg_header(block_hash, Ok(header));
1070
1071 let header =
1072 response_rx.try_recv().expect("queued header response").expect("header result");
1073
1074 assert_eq!(header.number, 7);
1075 }
1076
1077 #[test]
1078 fn reorg_removes_tx_hash_index_entries_unconditionally() {
1079 let mut service = test_service();
1080 let block = test_block();
1081 let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
1082
1083 service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
1084
1085 service.remove_block_transactions(&block);
1086
1087 assert!(service.tx_hash_index.get(&tx_hash).is_none());
1088 }
1089
1090 #[test]
1091 fn reorg_evicts_cached_bal() {
1092 let mut service = test_service();
1093 let block_hash = B256::repeat_byte(0x44);
1094
1095 assert!(service.bal_cache.insert(block_hash, CachedRevmBal::new(test_decoded_revm_bal())));
1096 assert!(service.bal_cache.get(&block_hash).is_some());
1097
1098 service.on_reorg_bal(block_hash, Ok(None));
1099
1100 assert!(service.bal_cache.get(&block_hash).is_none());
1101 }
1102
1103 #[test]
1104 fn reorg_forwards_bal_to_queued_requests() {
1105 let mut service = test_service();
1106 let block_hash = B256::repeat_byte(0x55);
1107 let (response_tx, mut response_rx) = oneshot::channel();
1108 let bal = CachedRevmBal::new(test_decoded_revm_bal());
1109
1110 assert!(service.bal_cache.queue(block_hash, response_tx));
1111
1112 service.on_reorg_bal(block_hash, Ok(Some(bal)));
1113
1114 let bal = response_rx.try_recv().expect("queued BAL response").expect("BAL result");
1115
1116 assert!(bal.is_some());
1117 }
1118
1119 #[test]
1120 fn cached_revm_bal_size_accounts_for_nested_allocations() {
1121 let mut account = RevmAccountBal::default();
1122 account.account_info.nonce.writes.push((BlockAccessIndex::new(1), 1));
1123 account
1124 .account_info
1125 .balance
1126 .writes
1127 .push((BlockAccessIndex::new(2), StorageValue::from(1u64)));
1128 account.account_info.code.writes.push((
1129 BlockAccessIndex::new(3),
1130 (B256::repeat_byte(0xaa), Bytecode::new_raw(Bytes::from_static(&[0x60, 0x00]))),
1131 ));
1132 account.storage.storage.insert(
1133 StorageKey::from(1u64),
1134 RevmBalWrites::new(vec![(BlockAccessIndex::new(4), StorageValue::from(2u64))]),
1135 );
1136
1137 let mut bal = RevmBal::default();
1138 bal.accounts.insert(Address::ZERO, account);
1139
1140 let raw = Bytes::from_static(&[0xc0, 0x01, 0x02]);
1141 let previous_estimate = core::mem::size_of::<CachedRevmBal>() +
1142 core::mem::size_of::<DecodedBal<Arc<RevmBal>>>() +
1143 raw.len() +
1144 core::mem::size_of::<RevmBal>();
1145 assert!(CachedRevmBal::new(DecodedBal::new(Arc::new(bal), raw)).size() > previous_estimate);
1146 }
1147
1148 #[tokio::test]
1149 async fn get_bal_uses_cached_revm_bal() {
1150 let fetches = Arc::new(AtomicUsize::default());
1151 let provider = TestBalProvider::new(fetches.clone());
1152 let cache = EthStateCache::<EthPrimitives>::spawn_with(
1153 provider,
1154 EthStateCacheConfig {
1155 max_blocks: 0,
1156 max_receipts: 0,
1157 max_headers: 0,
1158 max_bals: 4,
1159 max_concurrent_db_requests: 1,
1160 max_cached_tx_hashes: 0,
1161 },
1162 Runtime::test(),
1163 );
1164 let block_hash = B256::repeat_byte(0x66);
1165
1166 assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1167 assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1168
1169 assert_eq!(fetches.load(Ordering::SeqCst), 1);
1170 }
1171
1172 #[tokio::test]
1173 async fn concurrent_get_bal_requests_share_fetch() {
1174 let fetches = Arc::new(AtomicUsize::default());
1175 let provider = TestBalProvider::new(fetches.clone());
1176 let cache = EthStateCache::<EthPrimitives>::spawn_with(
1177 provider,
1178 EthStateCacheConfig {
1179 max_blocks: 0,
1180 max_receipts: 0,
1181 max_headers: 0,
1182 max_bals: 4,
1183 max_concurrent_db_requests: 1,
1184 max_cached_tx_hashes: 0,
1185 },
1186 Runtime::test(),
1187 );
1188 let block_hash = B256::repeat_byte(0x77);
1189
1190 let (first, second) = tokio::join!(cache.get_bal(block_hash), cache.get_bal(block_hash));
1191
1192 assert!(first.unwrap().is_some());
1193 assert!(second.unwrap().is_some());
1194 assert_eq!(fetches.load(Ordering::SeqCst), 1);
1195 }
1196
1197 #[derive(Clone, Debug, Default)]
1198 struct TestBalProvider {
1199 bal_store: BalStoreHandle,
1200 }
1201
1202 impl TestBalProvider {
1203 fn new(fetches: Arc<AtomicUsize>) -> Self {
1204 Self { bal_store: BalStoreHandle::new(TestBalStore { fetches }) }
1205 }
1206 }
1207
1208 impl BalProvider for TestBalProvider {
1209 fn bal_store(&self) -> &BalStoreHandle {
1210 &self.bal_store
1211 }
1212 }
1213
1214 #[derive(Debug)]
1215 struct TestBalStore {
1216 fetches: Arc<AtomicUsize>,
1217 }
1218
1219 impl BalStore for TestBalStore {
1220 fn insert(&self, _num_hash: NumHash, _bal: reth_storage_api::RawBal) -> ProviderResult<()> {
1221 Ok(())
1222 }
1223
1224 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
1225 Ok(0)
1226 }
1227
1228 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
1229 Ok(block_hashes.iter().map(|_| None).collect())
1230 }
1231
1232 fn revm_bal_by_hash(
1233 &self,
1234 _block_hash: BlockHash,
1235 ) -> ProviderResult<Option<DecodedBal<Arc<RevmBal>>>> {
1236 self.fetches.fetch_add(1, Ordering::SeqCst);
1237 Ok(Some(test_decoded_revm_bal()))
1238 }
1239
1240 fn bal_stream(&self) -> reth_storage_api::BalNotificationStream {
1241 reth_storage_api::NoopBalStore.bal_stream()
1242 }
1243 }
1244
1245 impl BlockHashReader for TestBalProvider {
1246 fn block_hash(&self, _number: BlockNumber) -> ProviderResult<Option<B256>> {
1247 Ok(None)
1248 }
1249
1250 fn canonical_hashes_range(
1251 &self,
1252 _start: BlockNumber,
1253 _end: BlockNumber,
1254 ) -> ProviderResult<Vec<B256>> {
1255 Ok(Vec::new())
1256 }
1257 }
1258
1259 impl BlockNumReader for TestBalProvider {
1260 fn chain_info(&self) -> ProviderResult<reth_chainspec::ChainInfo> {
1261 Ok(reth_chainspec::ChainInfo::default())
1262 }
1263
1264 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1265 Ok(0)
1266 }
1267
1268 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1269 Ok(0)
1270 }
1271
1272 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1273 Ok(None)
1274 }
1275 }
1276
1277 impl HeaderProvider for TestBalProvider {
1278 type Header = Header;
1279
1280 fn header(&self, _block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1281 Ok(None)
1282 }
1283
1284 fn header_by_number(&self, _num: u64) -> ProviderResult<Option<Self::Header>> {
1285 Ok(None)
1286 }
1287
1288 fn headers_range(
1289 &self,
1290 _range: impl RangeBounds<BlockNumber>,
1291 ) -> ProviderResult<Vec<Self::Header>> {
1292 Ok(Vec::new())
1293 }
1294
1295 fn sealed_header(
1296 &self,
1297 _number: BlockNumber,
1298 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1299 Ok(None)
1300 }
1301
1302 fn sealed_headers_while(
1303 &self,
1304 _range: impl RangeBounds<BlockNumber>,
1305 _predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1306 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1307 Ok(Vec::new())
1308 }
1309 }
1310
1311 impl BlockBodyIndicesProvider for TestBalProvider {
1312 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1313 Ok(None)
1314 }
1315
1316 fn block_body_indices_range(
1317 &self,
1318 _range: RangeInclusive<BlockNumber>,
1319 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1320 Ok(Vec::new())
1321 }
1322 }
1323
1324 impl TransactionsProvider for TestBalProvider {
1325 type Transaction = TransactionSigned;
1326
1327 fn transaction_id(&self, _tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1328 Ok(None)
1329 }
1330
1331 fn transaction_by_id(&self, _id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1332 Ok(None)
1333 }
1334
1335 fn transaction_by_id_unhashed(
1336 &self,
1337 _id: TxNumber,
1338 ) -> ProviderResult<Option<Self::Transaction>> {
1339 Ok(None)
1340 }
1341
1342 fn transaction_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1343 Ok(None)
1344 }
1345
1346 fn transaction_by_hash_with_meta(
1347 &self,
1348 _hash: TxHash,
1349 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1350 Ok(None)
1351 }
1352
1353 fn transactions_by_block(
1354 &self,
1355 _block: BlockHashOrNumber,
1356 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1357 Ok(None)
1358 }
1359
1360 fn transactions_by_block_range(
1361 &self,
1362 _range: impl RangeBounds<BlockNumber>,
1363 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1364 Ok(Vec::new())
1365 }
1366
1367 fn transactions_by_tx_range(
1368 &self,
1369 _range: impl RangeBounds<TxNumber>,
1370 ) -> ProviderResult<Vec<Self::Transaction>> {
1371 Ok(Vec::new())
1372 }
1373
1374 fn senders_by_tx_range(
1375 &self,
1376 _range: impl RangeBounds<TxNumber>,
1377 ) -> ProviderResult<Vec<Address>> {
1378 Ok(Vec::new())
1379 }
1380
1381 fn transaction_sender(&self, _id: TxNumber) -> ProviderResult<Option<Address>> {
1382 Ok(None)
1383 }
1384 }
1385
1386 impl ReceiptProvider for TestBalProvider {
1387 type Receipt = Receipt;
1388
1389 fn receipt(&self, _id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1390 Ok(None)
1391 }
1392
1393 fn receipt_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1394 Ok(None)
1395 }
1396
1397 fn receipts_by_block(
1398 &self,
1399 _block: BlockHashOrNumber,
1400 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1401 Ok(None)
1402 }
1403
1404 fn receipts_by_tx_range(
1405 &self,
1406 _range: impl RangeBounds<TxNumber>,
1407 ) -> ProviderResult<Vec<Self::Receipt>> {
1408 Ok(Vec::new())
1409 }
1410
1411 fn receipts_by_block_range(
1412 &self,
1413 _block_range: RangeInclusive<BlockNumber>,
1414 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1415 Ok(Vec::new())
1416 }
1417 }
1418
1419 impl BlockReader for TestBalProvider {
1420 type Block = Block;
1421
1422 fn find_block_by_hash(
1423 &self,
1424 _hash: B256,
1425 _source: BlockSource,
1426 ) -> ProviderResult<Option<Self::Block>> {
1427 Ok(None)
1428 }
1429
1430 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1431 Ok(None)
1432 }
1433
1434 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1435 Ok(None)
1436 }
1437
1438 fn pending_block_and_receipts(
1439 &self,
1440 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1441 Ok(None)
1442 }
1443
1444 fn recovered_block(
1445 &self,
1446 _id: BlockHashOrNumber,
1447 _transaction_kind: TransactionVariant,
1448 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1449 Ok(None)
1450 }
1451
1452 fn sealed_block_with_senders(
1453 &self,
1454 _id: BlockHashOrNumber,
1455 _transaction_kind: TransactionVariant,
1456 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1457 Ok(None)
1458 }
1459
1460 fn block_range(
1461 &self,
1462 _range: RangeInclusive<BlockNumber>,
1463 ) -> ProviderResult<Vec<Self::Block>> {
1464 Ok(Vec::new())
1465 }
1466
1467 fn block_with_senders_range(
1468 &self,
1469 _range: RangeInclusive<BlockNumber>,
1470 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1471 Ok(Vec::new())
1472 }
1473
1474 fn recovered_block_range(
1475 &self,
1476 _range: RangeInclusive<BlockNumber>,
1477 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1478 Ok(Vec::new())
1479 }
1480
1481 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1482 Ok(None)
1483 }
1484 }
1485}