1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eip7928::bal::DecodedBal;
7use alloy_eips::BlockHashOrNumber;
8use alloy_primitives::{Address, TxHash, B256};
9use futures::{stream::FuturesOrdered, Stream, StreamExt};
10use reth_chain_state::CanonStateNotification;
11use reth_errors::{ProviderError, ProviderResult};
12use reth_execution_types::Chain;
13use reth_primitives_traits::{Block, BlockBody, InMemorySize, NodePrimitives, RecoveredBlock};
14use reth_revm::{
15 bytecode::Bytecode,
16 primitives::{StorageKey, StorageValue},
17 state::bal::{
18 AccountBal as RevmAccountBal, AccountInfoBal as RevmAccountInfoBal, Bal as RevmBal,
19 BalWrites as RevmBalWrites, StorageBal as RevmStorageBal,
20 },
21};
22use reth_storage_api::{BalProvider, BlockReader, TransactionVariant};
23use reth_tasks::Runtime;
24use schnellru::{ByLength, Limiter, LruMap};
25use std::{
26 future::Future,
27 pin::Pin,
28 sync::Arc,
29 task::{Context, Poll},
30};
31use tokio::sync::{
32 mpsc::{unbounded_channel, UnboundedSender},
33 oneshot, Semaphore,
34};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36
37pub mod config;
38pub mod db;
39pub mod metrics;
40pub mod multi_consumer;
41
42type BlockWithSendersResponseSender<B> =
44 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
45
46type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
48
49type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
50
51type CachedBlockAndReceiptsResponseSender<B, R> =
52 oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
53
54type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
56
57type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
59
60type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
62
63type BalResponseSender = oneshot::Sender<ProviderResult<Option<CachedRevmBal>>>;
65
66type BlockLruCache<B, L> =
67 MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
68
69type ReceiptsLruCache<R, L> =
70 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
71
72type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
73
74type BalLruCache<L> = MultiConsumerLruCache<B256, CachedRevmBal, L, BalResponseSender>;
75
76#[derive(Debug)]
81pub struct EthStateCache<N: NodePrimitives> {
82 to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
83}
84
85impl<N: NodePrimitives> Clone for EthStateCache<N> {
86 fn clone(&self) -> Self {
87 Self { to_service: self.to_service.clone() }
88 }
89}
90
91impl<N: NodePrimitives> EthStateCache<N> {
92 fn create<Provider>(
94 provider: Provider,
95 action_task_spawner: Runtime,
96 config: EthStateCacheConfig,
97 ) -> (Self, EthStateCacheService<Provider, Runtime>)
98 where
99 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + BalProvider,
100 {
101 let EthStateCacheConfig {
102 max_blocks,
103 max_receipts,
104 max_headers,
105 max_bals,
106 max_concurrent_db_requests,
107 max_cached_tx_hashes,
108 } = config;
109 let (to_service, rx) = unbounded_channel();
110
111 let service = EthStateCacheService {
112 provider,
113 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
114 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
115 headers_cache: HeaderLruCache::new(max_headers, "headers"),
116 bal_cache: BalLruCache::new(max_bals, "bals"),
117 action_tx: to_service.clone(),
118 action_rx: UnboundedReceiverStream::new(rx),
119 action_task_spawner,
120 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_requests)),
121 tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
122 };
123 let cache = Self { to_service };
124 (cache, service)
125 }
126
127 pub fn spawn_with<Provider>(
132 provider: Provider,
133 config: EthStateCacheConfig,
134 executor: Runtime,
135 ) -> Self
136 where
137 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>
138 + BalProvider
139 + Clone
140 + Unpin
141 + 'static,
142 {
143 let (this, service) = Self::create(provider, executor.clone(), config);
144 executor.spawn_critical_task("eth state cache", service);
145 this
146 }
147
148 pub async fn get_recovered_block(
152 &self,
153 block_hash: B256,
154 ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
155 let (response_tx, rx) = oneshot::channel();
156 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
157 rx.await.map_err(|_| CacheServiceUnavailable)?
158 }
159
160 pub async fn get_receipts(
164 &self,
165 block_hash: B256,
166 ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
167 let (response_tx, rx) = oneshot::channel();
168 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
169 rx.await.map_err(|_| CacheServiceUnavailable)?
170 }
171
172 pub async fn get_block_and_receipts(
174 &self,
175 block_hash: B256,
176 ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
177 let block = self.get_recovered_block(block_hash);
178 let receipts = self.get_receipts(block_hash);
179
180 let (block, receipts) = futures::try_join!(block, receipts)?;
181
182 Ok(block.zip(receipts))
183 }
184
185 pub async fn get_receipts_and_maybe_block(
187 &self,
188 block_hash: B256,
189 ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
190 let (response_tx, rx) = oneshot::channel();
191 let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
192
193 let receipts = self.get_receipts(block_hash);
194
195 let (receipts, block) = futures::join!(receipts, rx);
196
197 let block = block.map_err(|_| CacheServiceUnavailable)?;
198 Ok(receipts?.map(|r| (r, block)))
199 }
200
201 pub async fn maybe_cached_block_and_receipts(
203 &self,
204 block_hash: B256,
205 ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
206 let (response_tx, rx) = oneshot::channel();
207 let _ = self
208 .to_service
209 .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
210 rx.await.map_err(|_| CacheServiceUnavailable.into())
211 }
212
213 #[expect(clippy::type_complexity)]
215 pub fn get_receipts_and_maybe_block_stream<'a>(
216 &'a self,
217 hashes: Vec<B256>,
218 ) -> impl Stream<
219 Item = ProviderResult<
220 Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
221 >,
222 > + 'a {
223 let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
224
225 futures.collect::<FuturesOrdered<_>>()
226 }
227
228 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
232 let (response_tx, rx) = oneshot::channel();
233 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
234 rx.await.map_err(|_| CacheServiceUnavailable)?
235 }
236
237 pub async fn get_cached_parent_blocks(
245 &self,
246 block_hash: B256,
247 max_blocks: usize,
248 ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
249 let (response_tx, rx) = oneshot::channel();
250 let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
251 block_hash,
252 max_blocks,
253 response_tx,
254 });
255
256 let blocks = rx.await.unwrap_or_default();
257 if blocks.is_empty() {
258 None
259 } else {
260 Some(blocks)
261 }
262 }
263
264 pub async fn get_transaction_by_hash(
269 &self,
270 tx_hash: TxHash,
271 ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
272 let (response_tx, rx) = oneshot::channel();
273 let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
274 rx.await.ok()?
275 }
276
277 pub async fn get_bal(
281 &self,
282 block_hash: B256,
283 ) -> ProviderResult<Option<Arc<DecodedBal<RevmBal>>>> {
284 let (response_tx, rx) = oneshot::channel();
285 let _ = self.to_service.send(CacheAction::GetBal { block_hash, response_tx });
286 rx.await
287 .map_err(|_| CacheServiceUnavailable)?
288 .map(|maybe_bal| maybe_bal.map(|cached| cached.0))
289 }
290}
291#[derive(Debug, thiserror::Error)]
293#[error("cache service task stopped")]
294pub struct CacheServiceUnavailable;
295
296impl From<CacheServiceUnavailable> for ProviderError {
297 fn from(err: CacheServiceUnavailable) -> Self {
298 Self::other(err)
299 }
300}
301
302#[must_use = "Type does nothing unless spawned"]
319pub(crate) struct EthStateCacheService<
320 Provider,
321 Tasks,
322 LimitBlocks = ByLength,
323 LimitReceipts = ByLength,
324 LimitHeaders = ByLength,
325 LimitBals = ByLength,
326> where
327 Provider: BlockReader + BalProvider,
328 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
329 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
330 LimitHeaders: Limiter<B256, Provider::Header>,
331 LimitBals: Limiter<B256, CachedRevmBal>,
332{
333 provider: Provider,
335 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
337 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
339 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
344 bal_cache: BalLruCache<LimitBals>,
346 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
348 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
350 action_task_spawner: Tasks,
352 rate_limiter: Arc<Semaphore>,
356 tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
358}
359
360impl<Provider> EthStateCacheService<Provider, Runtime>
361where
362 Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
363{
364 fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
366 let block_hash = block.hash();
367 for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
368 self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
369 }
370 }
371
372 fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
374 for tx in block.body().transactions() {
375 self.tx_hash_index.remove(tx.tx_hash());
376 }
377 }
378
379 fn on_new_block(
380 &mut self,
381 block_hash: B256,
382 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
383 ) {
384 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
385 for tx in queued {
387 let _ = tx.send(res.clone());
388 }
389 }
390
391 if let Ok(Some(block)) = res {
393 self.full_block_cache.insert(block_hash, block);
394 }
395 }
396
397 fn on_new_receipts(
398 &mut self,
399 block_hash: B256,
400 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
401 ) {
402 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
403 for tx in queued {
405 let _ = tx.send(res.clone());
406 }
407 }
408
409 if let Ok(Some(receipts)) = res {
411 self.receipts_cache.insert(block_hash, receipts);
412 }
413 }
414
415 fn on_new_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
416 if let Some(queued) = self.bal_cache.remove(&block_hash) {
417 for tx in queued {
418 let _ = tx.send(res.clone());
419 }
420 }
421
422 if let Ok(Some(bal)) = res {
423 self.bal_cache.insert(block_hash, bal);
424 }
425 }
426
427 fn on_reorg_block(
428 &mut self,
429 block_hash: B256,
430 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
431 ) {
432 let res = res.map(|b| b.map(Arc::new));
433 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
434 for tx in queued {
436 let _ = tx.send(res.clone());
437 }
438 }
439 }
440
441 fn on_reorg_receipts(
442 &mut self,
443 block_hash: B256,
444 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
445 ) {
446 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
447 for tx in queued {
449 let _ = tx.send(res.clone());
450 }
451 }
452 }
453
454 fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
455 if let Some(queued) = self.headers_cache.remove(&block_hash) {
456 for tx in queued {
458 let _ = tx.send(res.clone());
459 }
460 }
461 }
462
463 fn on_reorg_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
464 if let Some(queued) = self.bal_cache.remove(&block_hash) {
465 for tx in queued {
466 let _ = tx.send(res.clone());
467 }
468 }
469 }
470
471 fn shrink_queues(&mut self) {
473 let min_capacity = 2;
474 self.full_block_cache.shrink_to(min_capacity);
475 self.receipts_cache.shrink_to(min_capacity);
476 self.headers_cache.shrink_to(min_capacity);
477 self.bal_cache.shrink_to(min_capacity);
478 }
479
480 fn update_cached_metrics(&self) {
481 self.full_block_cache.update_cached_metrics();
482 self.receipts_cache.update_cached_metrics();
483 self.headers_cache.update_cached_metrics();
484 self.bal_cache.update_cached_metrics();
485 }
486}
487
488impl<Provider> Future for EthStateCacheService<Provider, Runtime>
489where
490 Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
491{
492 type Output = ();
493
494 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
495 let this = self.get_mut();
496
497 loop {
498 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
499 this.shrink_queues();
501 return Poll::Pending;
502 };
503
504 match action {
505 None => {
506 unreachable!("can't close")
507 }
508 Some(action) => {
509 match action {
510 CacheAction::GetCachedBlock { block_hash, response_tx } => {
511 let _ =
512 response_tx.send(this.full_block_cache.get(&block_hash).cloned());
513 }
514 CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
515 let block = this.full_block_cache.get(&block_hash).cloned();
516 let receipts = this.receipts_cache.get(&block_hash).cloned();
517 let _ = response_tx.send((block, receipts));
518 }
519 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
520 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
521 let _ = response_tx.send(Ok(Some(block)));
522 continue
523 }
524
525 if this.full_block_cache.queue(block_hash, response_tx) {
527 let provider = this.provider.clone();
528 let action_tx = this.action_tx.clone();
529 let rate_limiter = this.rate_limiter.clone();
530 let mut action_sender =
531 ActionSender::new(CacheKind::Block, block_hash, action_tx);
532 this.action_task_spawner.spawn_blocking_task(async move {
533 let _permit = rate_limiter.acquire().await;
535 let block_sender = provider
538 .sealed_block_with_senders(
539 BlockHashOrNumber::Hash(block_hash),
540 TransactionVariant::WithHash,
541 )
542 .map(|maybe_block| maybe_block.map(Arc::new));
543 action_sender.send_block(block_sender);
544 });
545 }
546 }
547 CacheAction::GetReceipts { block_hash, response_tx } => {
548 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
550 let _ = response_tx.send(Ok(Some(receipts)));
551 continue
552 }
553
554 if this.receipts_cache.queue(block_hash, response_tx) {
556 let provider = this.provider.clone();
557 let action_tx = this.action_tx.clone();
558 let rate_limiter = this.rate_limiter.clone();
559 let mut action_sender =
560 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
561 this.action_task_spawner.spawn_blocking_task(async move {
562 let _permit = rate_limiter.acquire().await;
564 let res = provider
565 .receipts_by_block(block_hash.into())
566 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
567
568 action_sender.send_receipts(res);
569 });
570 }
571 }
572 CacheAction::GetHeader { block_hash, response_tx } => {
573 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
575 let _ = response_tx.send(Ok(header));
576 continue
577 }
578
579 if let Some(block) = this.full_block_cache.get(&block_hash) {
581 let _ = response_tx.send(Ok(block.clone_header()));
582 continue
583 }
584
585 if this.headers_cache.queue(block_hash, response_tx) {
588 let provider = this.provider.clone();
589 let action_tx = this.action_tx.clone();
590 let rate_limiter = this.rate_limiter.clone();
591 let mut action_sender =
592 ActionSender::new(CacheKind::Header, block_hash, action_tx);
593 this.action_task_spawner.spawn_blocking_task(async move {
594 let _permit = rate_limiter.acquire().await;
596 let header = provider.header(block_hash).and_then(|header| {
597 header.ok_or_else(|| {
598 ProviderError::HeaderNotFound(block_hash.into())
599 })
600 });
601 action_sender.send_header(header);
602 });
603 }
604 }
605 CacheAction::GetBal { block_hash, response_tx } => {
606 if let Some(bal) = this.bal_cache.get(&block_hash).cloned() {
607 let _ = response_tx.send(Ok(Some(bal)));
608 continue
609 }
610
611 if this.bal_cache.queue(block_hash, response_tx) {
612 let provider = this.provider.clone();
613 let action_tx = this.action_tx.clone();
614 let rate_limiter = this.rate_limiter.clone();
615 let mut action_sender =
616 ActionSender::new(CacheKind::Bal, block_hash, action_tx);
617 this.action_task_spawner.spawn_blocking_task(async move {
618 let _permit = rate_limiter.acquire().await;
619 let res = provider
620 .bal_store()
621 .revm_bal_by_hash(block_hash)
622 .map(|maybe_bal| maybe_bal.map(CachedRevmBal::new));
623 action_sender.send_bal(res);
624 });
625 }
626 }
627 CacheAction::ReceiptsResult { block_hash, res } => {
628 this.on_new_receipts(block_hash, res);
629 }
630 CacheAction::BalResult { block_hash, res } => {
631 this.on_new_bal(block_hash, res);
632 }
633 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
634 Ok(Some(block_with_senders)) => {
635 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
636 }
637 Ok(None) => {
638 this.on_new_block(block_hash, Ok(None));
639 }
640 Err(e) => {
641 this.on_new_block(block_hash, Err(e));
642 }
643 },
644 CacheAction::HeaderResult { block_hash, res } => {
645 let res = *res;
646 if let Some(queued) = this.headers_cache.remove(&block_hash) {
647 for tx in queued {
649 let _ = tx.send(res.clone());
650 }
651 }
652
653 if let Ok(data) = res {
655 this.headers_cache.insert(block_hash, data);
656 }
657 }
658 CacheAction::CacheNewCanonicalChain { chain_change } => {
659 for block in chain_change.blocks {
660 this.index_block_transactions(&block);
662 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
663 }
664
665 for block_receipts in chain_change.receipts {
666 this.on_new_receipts(
667 block_receipts.block_hash,
668 Ok(Some(Arc::new(block_receipts.receipts))),
669 );
670 }
671 }
672 CacheAction::RemoveReorgedChain { chain_change } => {
673 for block in chain_change.blocks {
674 let block_hash = block.hash();
675 let header = block.clone_header();
676 this.remove_block_transactions(&block);
678 this.on_reorg_block(block_hash, Ok(Some(block)));
679 this.on_reorg_header(block_hash, Ok(header));
680 this.on_reorg_bal(block_hash, Ok(None));
681 }
682
683 for block_receipts in chain_change.receipts {
684 this.on_reorg_receipts(
685 block_receipts.block_hash,
686 Ok(Some(Arc::new(block_receipts.receipts))),
687 );
688 }
689 }
690 CacheAction::GetCachedParentBlocks {
691 block_hash,
692 max_blocks,
693 response_tx,
694 } => {
695 let mut blocks = Vec::new();
696 let mut current_hash = block_hash;
697
698 while blocks.len() < max_blocks {
700 if let Some(block) =
701 this.full_block_cache.get(¤t_hash).cloned()
702 {
703 current_hash = block.header().parent_hash();
705 blocks.push(block);
706 } else {
707 break;
709 }
710 }
711
712 let _ = response_tx.send(blocks);
713 }
714 CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
715 let result =
716 this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
717 let block = this.full_block_cache.get(block_hash).cloned()?;
718 let receipts = this.receipts_cache.get(block_hash).cloned();
719 Some(CachedTransaction::new(block, *idx, receipts))
720 });
721 let _ = response_tx.send(result);
722 }
723 };
724 this.update_cached_metrics();
725 }
726 }
727 }
728 }
729}
730
731enum CacheAction<B: Block, R> {
733 GetBlockWithSenders {
734 block_hash: B256,
735 response_tx: BlockWithSendersResponseSender<B>,
736 },
737 GetHeader {
738 block_hash: B256,
739 response_tx: HeaderResponseSender<B::Header>,
740 },
741 GetReceipts {
742 block_hash: B256,
743 response_tx: ReceiptsResponseSender<R>,
744 },
745 GetBal {
746 block_hash: B256,
747 response_tx: BalResponseSender,
748 },
749 GetCachedBlock {
750 block_hash: B256,
751 response_tx: CachedBlockResponseSender<B>,
752 },
753 GetCachedBlockAndReceipts {
754 block_hash: B256,
755 response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
756 },
757 BlockWithSendersResult {
758 block_hash: B256,
759 res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
760 },
761 ReceiptsResult {
762 block_hash: B256,
763 res: ProviderResult<Option<Arc<Vec<R>>>>,
764 },
765 HeaderResult {
766 block_hash: B256,
767 res: Box<ProviderResult<B::Header>>,
768 },
769 BalResult {
770 block_hash: B256,
771 res: ProviderResult<Option<CachedRevmBal>>,
772 },
773 CacheNewCanonicalChain {
774 chain_change: ChainChange<B, R>,
775 },
776 RemoveReorgedChain {
777 chain_change: ChainChange<B, R>,
778 },
779 GetCachedParentBlocks {
780 block_hash: B256,
781 max_blocks: usize,
782 response_tx: CachedParentBlocksResponseSender<B>,
783 },
784 GetTransactionByHash {
786 tx_hash: TxHash,
787 response_tx: TransactionHashResponseSender<B, R>,
788 },
789}
790
791struct BlockReceipts<R> {
792 block_hash: B256,
793 receipts: Vec<R>,
794}
795
796struct ChainChange<B: Block, R> {
798 blocks: Vec<RecoveredBlock<B>>,
799 receipts: Vec<BlockReceipts<R>>,
800}
801
802impl<B: Block, R: Clone> ChainChange<B, R> {
803 fn new<N>(chain: Arc<Chain<N>>) -> Self
804 where
805 N: NodePrimitives<Block = B, Receipt = R>,
806 {
807 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
808 .blocks_and_receipts()
809 .map(|(block, receipts)| {
810 let block_receipts =
811 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
812 (block.clone(), block_receipts)
813 })
814 .unzip();
815 Self { blocks, receipts }
816 }
817}
818
819#[derive(Copy, Clone, Debug)]
821enum CacheKind {
822 Block,
823 Receipt,
824 Header,
825 Bal,
826}
827
828#[derive(Debug)]
833struct ActionSender<B: Block, R: Send + Sync> {
834 kind: CacheKind,
835 blockhash: B256,
836 tx: Option<UnboundedSender<CacheAction<B, R>>>,
837}
838
839impl<R: Send + Sync, B: Block> ActionSender<B, R> {
840 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
841 Self { kind, blockhash, tx: Some(tx) }
842 }
843
844 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
845 if let Some(tx) = self.tx.take() {
846 let _ = tx.send(CacheAction::BlockWithSendersResult {
847 block_hash: self.blockhash,
848 res: block_sender,
849 });
850 }
851 }
852
853 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
854 if let Some(tx) = self.tx.take() {
855 let _ =
856 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
857 }
858 }
859
860 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
861 if let Some(tx) = self.tx.take() {
862 let _ = tx.send(CacheAction::HeaderResult {
863 block_hash: self.blockhash,
864 res: Box::new(header),
865 });
866 }
867 }
868
869 fn send_bal(&mut self, bal: Result<Option<CachedRevmBal>, ProviderError>) {
870 if let Some(tx) = self.tx.take() {
871 let _ = tx.send(CacheAction::BalResult { block_hash: self.blockhash, res: bal });
872 }
873 }
874}
875impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
876 fn drop(&mut self) {
877 if let Some(tx) = self.tx.take() {
878 let msg = match self.kind {
879 CacheKind::Block => CacheAction::BlockWithSendersResult {
880 block_hash: self.blockhash,
881 res: Err(CacheServiceUnavailable.into()),
882 },
883 CacheKind::Receipt => CacheAction::ReceiptsResult {
884 block_hash: self.blockhash,
885 res: Err(CacheServiceUnavailable.into()),
886 },
887 CacheKind::Header => CacheAction::HeaderResult {
888 block_hash: self.blockhash,
889 res: Box::new(Err(CacheServiceUnavailable.into())),
890 },
891 CacheKind::Bal => CacheAction::BalResult {
892 block_hash: self.blockhash,
893 res: Err(CacheServiceUnavailable.into()),
894 },
895 };
896 let _ = tx.send(msg);
897 }
898 }
899}
900
901pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
906 eth_state_cache: EthStateCache<N>,
907 mut events: St,
908) where
909 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
910{
911 while let Some(event) = events.next().await {
912 if let Some(reverted) = event.reverted() {
913 let chain_change = ChainChange::new(reverted);
914
915 let _ =
916 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
917 }
918
919 let chain_change = ChainChange::new(event.committed());
920
921 let _ =
922 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
923 }
924}
925
926#[derive(Clone, Debug)]
928pub(crate) struct CachedRevmBal(Arc<DecodedBal<RevmBal>>);
929
930impl CachedRevmBal {
931 #[inline]
933 fn new(bal: DecodedBal<RevmBal>) -> Self {
934 Self(Arc::new(bal))
935 }
936}
937
938impl InMemorySize for CachedRevmBal {
939 fn size(&self) -> usize {
940 core::mem::size_of::<Self>() + decoded_revm_bal_size(&self.0)
941 }
942}
943
944fn decoded_revm_bal_size(bal: &DecodedBal<RevmBal>) -> usize {
945 core::mem::size_of::<DecodedBal<RevmBal>>() + bal.as_raw().len() + revm_bal_size(bal.as_bal())
946}
947
948fn revm_bal_size(bal: &RevmBal) -> usize {
949 core::mem::size_of::<RevmBal>() +
950 bal.accounts.capacity() * core::mem::size_of::<(Address, RevmAccountBal)>() +
951 bal.accounts.values().map(revm_account_bal_heap_size).sum::<usize>()
952}
953
954fn revm_account_bal_heap_size(account: &RevmAccountBal) -> usize {
955 revm_account_info_bal_heap_size(&account.account_info) +
956 revm_storage_bal_heap_size(&account.storage)
957}
958
959fn revm_account_info_bal_heap_size(account_info: &RevmAccountInfoBal) -> usize {
960 revm_bal_writes_heap_size(&account_info.nonce, |_| 0) +
961 revm_bal_writes_heap_size(&account_info.balance, |_| 0) +
962 revm_bal_writes_heap_size(&account_info.code, revm_code_write_heap_size)
963}
964
965fn revm_storage_bal_heap_size(storage: &RevmStorageBal) -> usize {
966 storage.storage.len() * core::mem::size_of::<(StorageKey, RevmBalWrites<StorageValue>)>() +
967 storage
968 .storage
969 .values()
970 .map(|writes| revm_bal_writes_heap_size(writes, |_| 0))
971 .sum::<usize>()
972}
973
974fn revm_bal_writes_heap_size<T, F>(writes: &RevmBalWrites<T>, mut item_heap_size: F) -> usize
975where
976 T: PartialEq + Clone,
977 F: FnMut(&T) -> usize,
978{
979 writes.writes.capacity() * core::mem::size_of::<(u64, T)>() +
980 writes.writes.iter().map(|(_, item)| item_heap_size(item)).sum::<usize>()
981}
982
983fn revm_code_write_heap_size((_, bytecode): &(B256, Bytecode)) -> usize {
984 bytecode.bytes_ref().len()
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990 use alloy_consensus::{transaction::TransactionMeta, Header};
991 use alloy_eip7928::BlockAccessIndex;
992 use alloy_eips::{BlockHashOrNumber, NumHash};
993 use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, Signature, TxHash, TxNumber};
994 use core::ops::{RangeBounds, RangeInclusive};
995 use reth_db_models::StoredBlockBodyIndices;
996 use reth_ethereum_primitives::{
997 Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
998 };
999 use reth_primitives_traits::{RecoveredBlock, SealedHeader};
1000 use reth_storage_api::{
1001 noop::NoopProvider, BalProvider, BalStore, BalStoreHandle, BlockBodyIndicesProvider,
1002 BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, ReceiptProvider,
1003 TransactionVariant, TransactionsProvider,
1004 };
1005 use std::sync::atomic::{AtomicUsize, Ordering};
1006
1007 fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
1008 let (_cache, service) = EthStateCache::<EthPrimitives>::create(
1009 NoopProvider::default(),
1010 Runtime::test(),
1011 EthStateCacheConfig {
1012 max_blocks: 4,
1013 max_receipts: 4,
1014 max_headers: 4,
1015 max_bals: 4,
1016 max_concurrent_db_requests: 1,
1017 max_cached_tx_hashes: 16,
1018 },
1019 );
1020 service
1021 }
1022
1023 fn test_decoded_revm_bal() -> DecodedBal<RevmBal> {
1024 DecodedBal::new(RevmBal::default(), Bytes::from_static(&[0xc0]))
1025 }
1026
1027 fn test_block() -> RecoveredBlock<Block> {
1028 RecoveredBlock::new_unhashed(
1029 Block {
1030 header: Header { number: 1, ..Default::default() },
1031 body: BlockBody {
1032 transactions: vec![TransactionSigned::new_unhashed(
1033 Transaction::Legacy(Default::default()),
1034 Signature::test_signature(),
1035 )],
1036 ..Default::default()
1037 },
1038 },
1039 vec![Address::ZERO],
1040 )
1041 }
1042
1043 #[test]
1044 fn reorg_evicts_cached_headers() {
1045 let mut service = test_service();
1046 let block_hash = B256::repeat_byte(0x11);
1047
1048 assert!(service
1049 .headers_cache
1050 .insert(block_hash, Header { number: 42, ..Default::default() }));
1051 assert!(service.headers_cache.get(&block_hash).is_some());
1052
1053 service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
1054
1055 assert!(service.headers_cache.get(&block_hash).is_none());
1056 }
1057
1058 #[test]
1059 fn reorg_forwards_header_to_queued_requests() {
1060 let mut service = test_service();
1061 let block_hash = B256::repeat_byte(0x22);
1062 let (response_tx, mut response_rx) = oneshot::channel();
1063 let header = Header { number: 7, ..Default::default() };
1064
1065 assert!(service.headers_cache.queue(block_hash, response_tx));
1066
1067 service.on_reorg_header(block_hash, Ok(header));
1068
1069 let header =
1070 response_rx.try_recv().expect("queued header response").expect("header result");
1071
1072 assert_eq!(header.number, 7);
1073 }
1074
1075 #[test]
1076 fn reorg_removes_tx_hash_index_entries_unconditionally() {
1077 let mut service = test_service();
1078 let block = test_block();
1079 let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
1080
1081 service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
1082
1083 service.remove_block_transactions(&block);
1084
1085 assert!(service.tx_hash_index.get(&tx_hash).is_none());
1086 }
1087
1088 #[test]
1089 fn reorg_evicts_cached_bal() {
1090 let mut service = test_service();
1091 let block_hash = B256::repeat_byte(0x44);
1092
1093 assert!(service.bal_cache.insert(block_hash, CachedRevmBal::new(test_decoded_revm_bal())));
1094 assert!(service.bal_cache.get(&block_hash).is_some());
1095
1096 service.on_reorg_bal(block_hash, Ok(None));
1097
1098 assert!(service.bal_cache.get(&block_hash).is_none());
1099 }
1100
1101 #[test]
1102 fn reorg_forwards_bal_to_queued_requests() {
1103 let mut service = test_service();
1104 let block_hash = B256::repeat_byte(0x55);
1105 let (response_tx, mut response_rx) = oneshot::channel();
1106 let bal = CachedRevmBal::new(test_decoded_revm_bal());
1107
1108 assert!(service.bal_cache.queue(block_hash, response_tx));
1109
1110 service.on_reorg_bal(block_hash, Ok(Some(bal)));
1111
1112 let bal = response_rx.try_recv().expect("queued BAL response").expect("BAL result");
1113
1114 assert!(bal.is_some());
1115 }
1116
1117 #[test]
1118 fn cached_revm_bal_size_accounts_for_nested_allocations() {
1119 let mut account = RevmAccountBal::default();
1120 account.account_info.nonce.writes.push((BlockAccessIndex::new(1), 1));
1121 account
1122 .account_info
1123 .balance
1124 .writes
1125 .push((BlockAccessIndex::new(2), StorageValue::from(1u64)));
1126 account.account_info.code.writes.push((
1127 BlockAccessIndex::new(3),
1128 (B256::repeat_byte(0xaa), Bytecode::new_raw(Bytes::from_static(&[0x60, 0x00]))),
1129 ));
1130 account.storage.storage.insert(
1131 StorageKey::from(1u64),
1132 RevmBalWrites::new(vec![(BlockAccessIndex::new(4), StorageValue::from(2u64))]),
1133 );
1134
1135 let mut bal = RevmBal::default();
1136 bal.accounts.insert(Address::ZERO, account);
1137
1138 let raw = Bytes::from_static(&[0xc0, 0x01, 0x02]);
1139 let previous_estimate = core::mem::size_of::<CachedRevmBal>() +
1140 core::mem::size_of::<DecodedBal<RevmBal>>() +
1141 raw.len() +
1142 core::mem::size_of::<RevmBal>();
1143 assert!(CachedRevmBal::new(DecodedBal::new(bal, raw)).size() > previous_estimate);
1144 }
1145
1146 #[tokio::test]
1147 async fn get_bal_uses_cached_revm_bal() {
1148 let fetches = Arc::new(AtomicUsize::default());
1149 let provider = TestBalProvider::new(fetches.clone());
1150 let cache = EthStateCache::<EthPrimitives>::spawn_with(
1151 provider,
1152 EthStateCacheConfig {
1153 max_blocks: 0,
1154 max_receipts: 0,
1155 max_headers: 0,
1156 max_bals: 4,
1157 max_concurrent_db_requests: 1,
1158 max_cached_tx_hashes: 0,
1159 },
1160 Runtime::test(),
1161 );
1162 let block_hash = B256::repeat_byte(0x66);
1163
1164 assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1165 assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1166
1167 assert_eq!(fetches.load(Ordering::SeqCst), 1);
1168 }
1169
1170 #[tokio::test]
1171 async fn concurrent_get_bal_requests_share_fetch() {
1172 let fetches = Arc::new(AtomicUsize::default());
1173 let provider = TestBalProvider::new(fetches.clone());
1174 let cache = EthStateCache::<EthPrimitives>::spawn_with(
1175 provider,
1176 EthStateCacheConfig {
1177 max_blocks: 0,
1178 max_receipts: 0,
1179 max_headers: 0,
1180 max_bals: 4,
1181 max_concurrent_db_requests: 1,
1182 max_cached_tx_hashes: 0,
1183 },
1184 Runtime::test(),
1185 );
1186 let block_hash = B256::repeat_byte(0x77);
1187
1188 let (first, second) = tokio::join!(cache.get_bal(block_hash), cache.get_bal(block_hash));
1189
1190 assert!(first.unwrap().is_some());
1191 assert!(second.unwrap().is_some());
1192 assert_eq!(fetches.load(Ordering::SeqCst), 1);
1193 }
1194
1195 #[derive(Clone, Debug, Default)]
1196 struct TestBalProvider {
1197 bal_store: BalStoreHandle,
1198 }
1199
1200 impl TestBalProvider {
1201 fn new(fetches: Arc<AtomicUsize>) -> Self {
1202 Self { bal_store: BalStoreHandle::new(TestBalStore { fetches }) }
1203 }
1204 }
1205
1206 impl BalProvider for TestBalProvider {
1207 fn bal_store(&self) -> &BalStoreHandle {
1208 &self.bal_store
1209 }
1210 }
1211
1212 #[derive(Debug)]
1213 struct TestBalStore {
1214 fetches: Arc<AtomicUsize>,
1215 }
1216
1217 impl BalStore for TestBalStore {
1218 fn insert(
1219 &self,
1220 _num_hash: NumHash,
1221 _bal: reth_storage_api::SealedBal,
1222 ) -> ProviderResult<()> {
1223 Ok(())
1224 }
1225
1226 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
1227 Ok(0)
1228 }
1229
1230 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
1231 Ok(block_hashes.iter().map(|_| None).collect())
1232 }
1233
1234 fn revm_bal_by_hash(
1235 &self,
1236 _block_hash: BlockHash,
1237 ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
1238 self.fetches.fetch_add(1, Ordering::SeqCst);
1239 Ok(Some(test_decoded_revm_bal()))
1240 }
1241
1242 fn bal_stream(&self) -> reth_storage_api::BalNotificationStream {
1243 reth_storage_api::NoopBalStore.bal_stream()
1244 }
1245 }
1246
1247 impl BlockHashReader for TestBalProvider {
1248 fn block_hash(&self, _number: BlockNumber) -> ProviderResult<Option<B256>> {
1249 Ok(None)
1250 }
1251
1252 fn canonical_hashes_range(
1253 &self,
1254 _start: BlockNumber,
1255 _end: BlockNumber,
1256 ) -> ProviderResult<Vec<B256>> {
1257 Ok(Vec::new())
1258 }
1259 }
1260
1261 impl BlockNumReader for TestBalProvider {
1262 fn chain_info(&self) -> ProviderResult<reth_chainspec::ChainInfo> {
1263 Ok(reth_chainspec::ChainInfo::default())
1264 }
1265
1266 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1267 Ok(0)
1268 }
1269
1270 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1271 Ok(0)
1272 }
1273
1274 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1275 Ok(None)
1276 }
1277 }
1278
1279 impl HeaderProvider for TestBalProvider {
1280 type Header = Header;
1281
1282 fn header(&self, _block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1283 Ok(None)
1284 }
1285
1286 fn header_by_number(&self, _num: u64) -> ProviderResult<Option<Self::Header>> {
1287 Ok(None)
1288 }
1289
1290 fn headers_range(
1291 &self,
1292 _range: impl RangeBounds<BlockNumber>,
1293 ) -> ProviderResult<Vec<Self::Header>> {
1294 Ok(Vec::new())
1295 }
1296
1297 fn sealed_header(
1298 &self,
1299 _number: BlockNumber,
1300 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1301 Ok(None)
1302 }
1303
1304 fn sealed_headers_while(
1305 &self,
1306 _range: impl RangeBounds<BlockNumber>,
1307 _predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1308 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1309 Ok(Vec::new())
1310 }
1311 }
1312
1313 impl BlockBodyIndicesProvider for TestBalProvider {
1314 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1315 Ok(None)
1316 }
1317
1318 fn block_body_indices_range(
1319 &self,
1320 _range: RangeInclusive<BlockNumber>,
1321 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1322 Ok(Vec::new())
1323 }
1324 }
1325
1326 impl TransactionsProvider for TestBalProvider {
1327 type Transaction = TransactionSigned;
1328
1329 fn transaction_id(&self, _tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1330 Ok(None)
1331 }
1332
1333 fn transaction_by_id(&self, _id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1334 Ok(None)
1335 }
1336
1337 fn transaction_by_id_unhashed(
1338 &self,
1339 _id: TxNumber,
1340 ) -> ProviderResult<Option<Self::Transaction>> {
1341 Ok(None)
1342 }
1343
1344 fn transaction_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1345 Ok(None)
1346 }
1347
1348 fn transaction_by_hash_with_meta(
1349 &self,
1350 _hash: TxHash,
1351 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1352 Ok(None)
1353 }
1354
1355 fn transactions_by_block(
1356 &self,
1357 _block: BlockHashOrNumber,
1358 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1359 Ok(None)
1360 }
1361
1362 fn transactions_by_block_range(
1363 &self,
1364 _range: impl RangeBounds<BlockNumber>,
1365 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1366 Ok(Vec::new())
1367 }
1368
1369 fn transactions_by_tx_range(
1370 &self,
1371 _range: impl RangeBounds<TxNumber>,
1372 ) -> ProviderResult<Vec<Self::Transaction>> {
1373 Ok(Vec::new())
1374 }
1375
1376 fn senders_by_tx_range(
1377 &self,
1378 _range: impl RangeBounds<TxNumber>,
1379 ) -> ProviderResult<Vec<Address>> {
1380 Ok(Vec::new())
1381 }
1382
1383 fn transaction_sender(&self, _id: TxNumber) -> ProviderResult<Option<Address>> {
1384 Ok(None)
1385 }
1386 }
1387
1388 impl ReceiptProvider for TestBalProvider {
1389 type Receipt = Receipt;
1390
1391 fn receipt(&self, _id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1392 Ok(None)
1393 }
1394
1395 fn receipt_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1396 Ok(None)
1397 }
1398
1399 fn receipts_by_block(
1400 &self,
1401 _block: BlockHashOrNumber,
1402 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1403 Ok(None)
1404 }
1405
1406 fn receipts_by_tx_range(
1407 &self,
1408 _range: impl RangeBounds<TxNumber>,
1409 ) -> ProviderResult<Vec<Self::Receipt>> {
1410 Ok(Vec::new())
1411 }
1412
1413 fn receipts_by_block_range(
1414 &self,
1415 _block_range: RangeInclusive<BlockNumber>,
1416 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1417 Ok(Vec::new())
1418 }
1419 }
1420
1421 impl BlockReader for TestBalProvider {
1422 type Block = Block;
1423
1424 fn find_block_by_hash(
1425 &self,
1426 _hash: B256,
1427 _source: BlockSource,
1428 ) -> ProviderResult<Option<Self::Block>> {
1429 Ok(None)
1430 }
1431
1432 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1433 Ok(None)
1434 }
1435
1436 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1437 Ok(None)
1438 }
1439
1440 fn pending_block_and_receipts(
1441 &self,
1442 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1443 Ok(None)
1444 }
1445
1446 fn recovered_block(
1447 &self,
1448 _id: BlockHashOrNumber,
1449 _transaction_kind: TransactionVariant,
1450 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1451 Ok(None)
1452 }
1453
1454 fn sealed_block_with_senders(
1455 &self,
1456 _id: BlockHashOrNumber,
1457 _transaction_kind: TransactionVariant,
1458 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1459 Ok(None)
1460 }
1461
1462 fn block_range(
1463 &self,
1464 _range: RangeInclusive<BlockNumber>,
1465 ) -> ProviderResult<Vec<Self::Block>> {
1466 Ok(Vec::new())
1467 }
1468
1469 fn block_with_senders_range(
1470 &self,
1471 _range: RangeInclusive<BlockNumber>,
1472 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1473 Ok(Vec::new())
1474 }
1475
1476 fn recovered_block_range(
1477 &self,
1478 _range: RangeInclusive<BlockNumber>,
1479 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1480 Ok(Vec::new())
1481 }
1482
1483 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1484 Ok(None)
1485 }
1486 }
1487}