1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eips::{eip7928::bal::DecodedBal, BlockHashOrNumber};
7use alloy_primitives::{Address, TxHash, B256};
8use futures::{stream::FuturesOrdered, Stream, StreamExt};
9use reth_chain_state::CanonStateNotification;
10use reth_errors::{ProviderError, ProviderResult};
11use reth_execution_types::Chain;
12use reth_primitives_traits::{Block, BlockBody, InMemorySize, NodePrimitives, RecoveredBlock};
13use reth_revm::{
14 bytecode::Bytecode,
15 primitives::{StorageKey, StorageValue},
16 state::bal::{
17 AccountBal as RevmAccountBal, AccountInfoBal as RevmAccountInfoBal, Bal as RevmBal,
18 BalWrites as RevmBalWrites, StorageBal as RevmStorageBal,
19 },
20};
21use reth_storage_api::{BalProvider, BlockReader, TransactionVariant};
22use reth_tasks::Runtime;
23use schnellru::{ByLength, Limiter, LruMap};
24use std::{
25 future::Future,
26 pin::Pin,
27 sync::Arc,
28 task::{Context, Poll},
29};
30use tokio::sync::{
31 mpsc::{unbounded_channel, UnboundedSender},
32 oneshot, Semaphore,
33};
34use tokio_stream::wrappers::UnboundedReceiverStream;
35
36pub mod config;
37pub mod db;
38pub mod metrics;
39pub mod multi_consumer;
40
41type BlockWithSendersResponseSender<B> =
43 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
44
45type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
47
48type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
49
50type CachedBlockAndReceiptsResponseSender<B, R> =
51 oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
52
53type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
55
56type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
58
59type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
61
62type BalResponseSender = oneshot::Sender<ProviderResult<Option<CachedRevmBal>>>;
64
65type BlockLruCache<B, L> =
66 MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
67
68type ReceiptsLruCache<R, L> =
69 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
70
71type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
72
73type BalLruCache<L> = MultiConsumerLruCache<B256, CachedRevmBal, L, BalResponseSender>;
74
75#[derive(Debug)]
80pub struct EthStateCache<N: NodePrimitives> {
81 to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
82}
83
84impl<N: NodePrimitives> Clone for EthStateCache<N> {
85 fn clone(&self) -> Self {
86 Self { to_service: self.to_service.clone() }
87 }
88}
89
90impl<N: NodePrimitives> EthStateCache<N> {
91 fn create<Provider>(
93 provider: Provider,
94 action_task_spawner: Runtime,
95 config: EthStateCacheConfig,
96 ) -> (Self, EthStateCacheService<Provider, Runtime>)
97 where
98 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + BalProvider,
99 {
100 let EthStateCacheConfig {
101 max_blocks,
102 max_receipts,
103 max_headers,
104 max_bals,
105 max_concurrent_db_requests,
106 max_cached_tx_hashes,
107 } = config;
108 let (to_service, rx) = unbounded_channel();
109
110 let service = EthStateCacheService {
111 provider,
112 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
113 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
114 headers_cache: HeaderLruCache::new(max_headers, "headers"),
115 bal_cache: BalLruCache::new(max_bals, "bals"),
116 action_tx: to_service.clone(),
117 action_rx: UnboundedReceiverStream::new(rx),
118 action_task_spawner,
119 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_requests)),
120 tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
121 };
122 let cache = Self { to_service };
123 (cache, service)
124 }
125
126 pub fn spawn_with<Provider>(
131 provider: Provider,
132 config: EthStateCacheConfig,
133 executor: Runtime,
134 ) -> Self
135 where
136 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>
137 + BalProvider
138 + Clone
139 + Unpin
140 + 'static,
141 {
142 let (this, service) = Self::create(provider, executor.clone(), config);
143 executor.spawn_critical_task("eth state cache", service);
144 this
145 }
146
147 pub async fn get_recovered_block(
151 &self,
152 block_hash: B256,
153 ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
154 let (response_tx, rx) = oneshot::channel();
155 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
156 rx.await.map_err(|_| CacheServiceUnavailable)?
157 }
158
159 pub async fn get_receipts(
163 &self,
164 block_hash: B256,
165 ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
166 let (response_tx, rx) = oneshot::channel();
167 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
168 rx.await.map_err(|_| CacheServiceUnavailable)?
169 }
170
171 pub async fn get_block_and_receipts(
173 &self,
174 block_hash: B256,
175 ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
176 let block = self.get_recovered_block(block_hash);
177 let receipts = self.get_receipts(block_hash);
178
179 let (block, receipts) = futures::try_join!(block, receipts)?;
180
181 Ok(block.zip(receipts))
182 }
183
184 pub async fn get_receipts_and_maybe_block(
186 &self,
187 block_hash: B256,
188 ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
189 let (response_tx, rx) = oneshot::channel();
190 let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
191
192 let receipts = self.get_receipts(block_hash);
193
194 let (receipts, block) = futures::join!(receipts, rx);
195
196 let block = block.map_err(|_| CacheServiceUnavailable)?;
197 Ok(receipts?.map(|r| (r, block)))
198 }
199
200 pub async fn maybe_cached_block_and_receipts(
202 &self,
203 block_hash: B256,
204 ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
205 let (response_tx, rx) = oneshot::channel();
206 let _ = self
207 .to_service
208 .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
209 rx.await.map_err(|_| CacheServiceUnavailable.into())
210 }
211
212 #[expect(clippy::type_complexity)]
214 pub fn get_receipts_and_maybe_block_stream<'a>(
215 &'a self,
216 hashes: Vec<B256>,
217 ) -> impl Stream<
218 Item = ProviderResult<
219 Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
220 >,
221 > + 'a {
222 let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
223
224 futures.collect::<FuturesOrdered<_>>()
225 }
226
227 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
231 let (response_tx, rx) = oneshot::channel();
232 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
233 rx.await.map_err(|_| CacheServiceUnavailable)?
234 }
235
236 pub async fn get_cached_parent_blocks(
244 &self,
245 block_hash: B256,
246 max_blocks: usize,
247 ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
248 let (response_tx, rx) = oneshot::channel();
249 let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
250 block_hash,
251 max_blocks,
252 response_tx,
253 });
254
255 let blocks = rx.await.unwrap_or_default();
256 if blocks.is_empty() {
257 None
258 } else {
259 Some(blocks)
260 }
261 }
262
263 pub async fn get_transaction_by_hash(
268 &self,
269 tx_hash: TxHash,
270 ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
271 let (response_tx, rx) = oneshot::channel();
272 let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
273 rx.await.ok()?
274 }
275
276 pub async fn get_bal(
280 &self,
281 block_hash: B256,
282 ) -> ProviderResult<Option<Arc<DecodedBal<RevmBal>>>> {
283 let (response_tx, rx) = oneshot::channel();
284 let _ = self.to_service.send(CacheAction::GetBal { block_hash, response_tx });
285 rx.await
286 .map_err(|_| CacheServiceUnavailable)?
287 .map(|maybe_bal| maybe_bal.map(|cached| cached.0))
288 }
289}
290#[derive(Debug, thiserror::Error)]
292#[error("cache service task stopped")]
293pub struct CacheServiceUnavailable;
294
295impl From<CacheServiceUnavailable> for ProviderError {
296 fn from(err: CacheServiceUnavailable) -> Self {
297 Self::other(err)
298 }
299}
300
301#[must_use = "Type does nothing unless spawned"]
318pub(crate) struct EthStateCacheService<
319 Provider,
320 Tasks,
321 LimitBlocks = ByLength,
322 LimitReceipts = ByLength,
323 LimitHeaders = ByLength,
324 LimitBals = ByLength,
325> where
326 Provider: BlockReader + BalProvider,
327 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
328 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
329 LimitHeaders: Limiter<B256, Provider::Header>,
330 LimitBals: Limiter<B256, CachedRevmBal>,
331{
332 provider: Provider,
334 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
336 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
338 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
343 bal_cache: BalLruCache<LimitBals>,
345 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
347 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
349 action_task_spawner: Tasks,
351 rate_limiter: Arc<Semaphore>,
355 tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
357}
358
359impl<Provider> EthStateCacheService<Provider, Runtime>
360where
361 Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
362{
363 fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
365 let block_hash = block.hash();
366 for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
367 self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
368 }
369 }
370
371 fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
373 for tx in block.body().transactions() {
374 self.tx_hash_index.remove(tx.tx_hash());
375 }
376 }
377
378 fn on_new_block(
379 &mut self,
380 block_hash: B256,
381 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
382 ) {
383 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
384 for tx in queued {
386 let _ = tx.send(res.clone());
387 }
388 }
389
390 if let Ok(Some(block)) = res {
392 self.full_block_cache.insert(block_hash, block);
393 }
394 }
395
396 fn on_new_receipts(
397 &mut self,
398 block_hash: B256,
399 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
400 ) {
401 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
402 for tx in queued {
404 let _ = tx.send(res.clone());
405 }
406 }
407
408 if let Ok(Some(receipts)) = res {
410 self.receipts_cache.insert(block_hash, receipts);
411 }
412 }
413
414 fn on_new_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
415 if let Some(queued) = self.bal_cache.remove(&block_hash) {
416 for tx in queued {
417 let _ = tx.send(res.clone());
418 }
419 }
420
421 if let Ok(Some(bal)) = res {
422 self.bal_cache.insert(block_hash, bal);
423 }
424 }
425
426 fn on_reorg_block(
427 &mut self,
428 block_hash: B256,
429 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
430 ) {
431 let res = res.map(|b| b.map(Arc::new));
432 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
433 for tx in queued {
435 let _ = tx.send(res.clone());
436 }
437 }
438 }
439
440 fn on_reorg_receipts(
441 &mut self,
442 block_hash: B256,
443 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
444 ) {
445 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
446 for tx in queued {
448 let _ = tx.send(res.clone());
449 }
450 }
451 }
452
453 fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
454 if let Some(queued) = self.headers_cache.remove(&block_hash) {
455 for tx in queued {
457 let _ = tx.send(res.clone());
458 }
459 }
460 }
461
462 fn on_reorg_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
463 if let Some(queued) = self.bal_cache.remove(&block_hash) {
464 for tx in queued {
465 let _ = tx.send(res.clone());
466 }
467 }
468 }
469
470 fn shrink_queues(&mut self) {
472 let min_capacity = 2;
473 self.full_block_cache.shrink_to(min_capacity);
474 self.receipts_cache.shrink_to(min_capacity);
475 self.headers_cache.shrink_to(min_capacity);
476 self.bal_cache.shrink_to(min_capacity);
477 }
478
479 fn update_cached_metrics(&self) {
480 self.full_block_cache.update_cached_metrics();
481 self.receipts_cache.update_cached_metrics();
482 self.headers_cache.update_cached_metrics();
483 self.bal_cache.update_cached_metrics();
484 }
485}
486
487impl<Provider> Future for EthStateCacheService<Provider, Runtime>
488where
489 Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
490{
491 type Output = ();
492
493 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
494 let this = self.get_mut();
495
496 loop {
497 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
498 this.shrink_queues();
500 return Poll::Pending;
501 };
502
503 match action {
504 None => {
505 unreachable!("can't close")
506 }
507 Some(action) => {
508 match action {
509 CacheAction::GetCachedBlock { block_hash, response_tx } => {
510 let _ =
511 response_tx.send(this.full_block_cache.get(&block_hash).cloned());
512 }
513 CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
514 let block = this.full_block_cache.get(&block_hash).cloned();
515 let receipts = this.receipts_cache.get(&block_hash).cloned();
516 let _ = response_tx.send((block, receipts));
517 }
518 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
519 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
520 let _ = response_tx.send(Ok(Some(block)));
521 continue
522 }
523
524 if this.full_block_cache.queue(block_hash, response_tx) {
526 let provider = this.provider.clone();
527 let action_tx = this.action_tx.clone();
528 let rate_limiter = this.rate_limiter.clone();
529 let mut action_sender =
530 ActionSender::new(CacheKind::Block, block_hash, action_tx);
531 this.action_task_spawner.spawn_blocking_task(async move {
532 let _permit = rate_limiter.acquire().await;
534 let block_sender = provider
537 .sealed_block_with_senders(
538 BlockHashOrNumber::Hash(block_hash),
539 TransactionVariant::WithHash,
540 )
541 .map(|maybe_block| maybe_block.map(Arc::new));
542 action_sender.send_block(block_sender);
543 });
544 }
545 }
546 CacheAction::GetReceipts { block_hash, response_tx } => {
547 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
549 let _ = response_tx.send(Ok(Some(receipts)));
550 continue
551 }
552
553 if this.receipts_cache.queue(block_hash, response_tx) {
555 let provider = this.provider.clone();
556 let action_tx = this.action_tx.clone();
557 let rate_limiter = this.rate_limiter.clone();
558 let mut action_sender =
559 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
560 this.action_task_spawner.spawn_blocking_task(async move {
561 let _permit = rate_limiter.acquire().await;
563 let res = provider
564 .receipts_by_block(block_hash.into())
565 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
566
567 action_sender.send_receipts(res);
568 });
569 }
570 }
571 CacheAction::GetHeader { block_hash, response_tx } => {
572 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
574 let _ = response_tx.send(Ok(header));
575 continue
576 }
577
578 if let Some(block) = this.full_block_cache.get(&block_hash) {
580 let _ = response_tx.send(Ok(block.clone_header()));
581 continue
582 }
583
584 if this.headers_cache.queue(block_hash, response_tx) {
587 let provider = this.provider.clone();
588 let action_tx = this.action_tx.clone();
589 let rate_limiter = this.rate_limiter.clone();
590 let mut action_sender =
591 ActionSender::new(CacheKind::Header, block_hash, action_tx);
592 this.action_task_spawner.spawn_blocking_task(async move {
593 let _permit = rate_limiter.acquire().await;
595 let header = provider.header(block_hash).and_then(|header| {
596 header.ok_or_else(|| {
597 ProviderError::HeaderNotFound(block_hash.into())
598 })
599 });
600 action_sender.send_header(header);
601 });
602 }
603 }
604 CacheAction::GetBal { block_hash, response_tx } => {
605 if let Some(bal) = this.bal_cache.get(&block_hash).cloned() {
606 let _ = response_tx.send(Ok(Some(bal)));
607 continue
608 }
609
610 if this.bal_cache.queue(block_hash, response_tx) {
611 let provider = this.provider.clone();
612 let action_tx = this.action_tx.clone();
613 let rate_limiter = this.rate_limiter.clone();
614 let mut action_sender =
615 ActionSender::new(CacheKind::Bal, block_hash, action_tx);
616 this.action_task_spawner.spawn_blocking_task(async move {
617 let _permit = rate_limiter.acquire().await;
618 let res = provider
619 .bal_store()
620 .revm_bal_by_hash(block_hash)
621 .map(|maybe_bal| maybe_bal.map(CachedRevmBal::new));
622 action_sender.send_bal(res);
623 });
624 }
625 }
626 CacheAction::ReceiptsResult { block_hash, res } => {
627 this.on_new_receipts(block_hash, res);
628 }
629 CacheAction::BalResult { block_hash, res } => {
630 this.on_new_bal(block_hash, res);
631 }
632 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
633 Ok(Some(block_with_senders)) => {
634 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
635 }
636 Ok(None) => {
637 this.on_new_block(block_hash, Ok(None));
638 }
639 Err(e) => {
640 this.on_new_block(block_hash, Err(e));
641 }
642 },
643 CacheAction::HeaderResult { block_hash, res } => {
644 let res = *res;
645 if let Some(queued) = this.headers_cache.remove(&block_hash) {
646 for tx in queued {
648 let _ = tx.send(res.clone());
649 }
650 }
651
652 if let Ok(data) = res {
654 this.headers_cache.insert(block_hash, data);
655 }
656 }
657 CacheAction::CacheNewCanonicalChain { chain_change } => {
658 for block in chain_change.blocks {
659 this.index_block_transactions(&block);
661 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
662 }
663
664 for block_receipts in chain_change.receipts {
665 this.on_new_receipts(
666 block_receipts.block_hash,
667 Ok(Some(Arc::new(block_receipts.receipts))),
668 );
669 }
670 }
671 CacheAction::RemoveReorgedChain { chain_change } => {
672 for block in chain_change.blocks {
673 let block_hash = block.hash();
674 let header = block.clone_header();
675 this.remove_block_transactions(&block);
677 this.on_reorg_block(block_hash, Ok(Some(block)));
678 this.on_reorg_header(block_hash, Ok(header));
679 this.on_reorg_bal(block_hash, Ok(None));
680 }
681
682 for block_receipts in chain_change.receipts {
683 this.on_reorg_receipts(
684 block_receipts.block_hash,
685 Ok(Some(Arc::new(block_receipts.receipts))),
686 );
687 }
688 }
689 CacheAction::GetCachedParentBlocks {
690 block_hash,
691 max_blocks,
692 response_tx,
693 } => {
694 let mut blocks = Vec::new();
695 let mut current_hash = block_hash;
696
697 while blocks.len() < max_blocks {
699 if let Some(block) =
700 this.full_block_cache.get(¤t_hash).cloned()
701 {
702 current_hash = block.header().parent_hash();
704 blocks.push(block);
705 } else {
706 break;
708 }
709 }
710
711 let _ = response_tx.send(blocks);
712 }
713 CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
714 let result =
715 this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
716 let block = this.full_block_cache.get(block_hash).cloned()?;
717 let receipts = this.receipts_cache.get(block_hash).cloned();
718 Some(CachedTransaction::new(block, *idx, receipts))
719 });
720 let _ = response_tx.send(result);
721 }
722 };
723 this.update_cached_metrics();
724 }
725 }
726 }
727 }
728}
729
730enum CacheAction<B: Block, R> {
732 GetBlockWithSenders {
733 block_hash: B256,
734 response_tx: BlockWithSendersResponseSender<B>,
735 },
736 GetHeader {
737 block_hash: B256,
738 response_tx: HeaderResponseSender<B::Header>,
739 },
740 GetReceipts {
741 block_hash: B256,
742 response_tx: ReceiptsResponseSender<R>,
743 },
744 GetBal {
745 block_hash: B256,
746 response_tx: BalResponseSender,
747 },
748 GetCachedBlock {
749 block_hash: B256,
750 response_tx: CachedBlockResponseSender<B>,
751 },
752 GetCachedBlockAndReceipts {
753 block_hash: B256,
754 response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
755 },
756 BlockWithSendersResult {
757 block_hash: B256,
758 res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
759 },
760 ReceiptsResult {
761 block_hash: B256,
762 res: ProviderResult<Option<Arc<Vec<R>>>>,
763 },
764 HeaderResult {
765 block_hash: B256,
766 res: Box<ProviderResult<B::Header>>,
767 },
768 BalResult {
769 block_hash: B256,
770 res: ProviderResult<Option<CachedRevmBal>>,
771 },
772 CacheNewCanonicalChain {
773 chain_change: ChainChange<B, R>,
774 },
775 RemoveReorgedChain {
776 chain_change: ChainChange<B, R>,
777 },
778 GetCachedParentBlocks {
779 block_hash: B256,
780 max_blocks: usize,
781 response_tx: CachedParentBlocksResponseSender<B>,
782 },
783 GetTransactionByHash {
785 tx_hash: TxHash,
786 response_tx: TransactionHashResponseSender<B, R>,
787 },
788}
789
790struct BlockReceipts<R> {
791 block_hash: B256,
792 receipts: Vec<R>,
793}
794
795struct ChainChange<B: Block, R> {
797 blocks: Vec<RecoveredBlock<B>>,
798 receipts: Vec<BlockReceipts<R>>,
799}
800
801impl<B: Block, R: Clone> ChainChange<B, R> {
802 fn new<N>(chain: Arc<Chain<N>>) -> Self
803 where
804 N: NodePrimitives<Block = B, Receipt = R>,
805 {
806 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
807 .blocks_and_receipts()
808 .map(|(block, receipts)| {
809 let block_receipts =
810 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
811 (block.clone(), block_receipts)
812 })
813 .unzip();
814 Self { blocks, receipts }
815 }
816}
817
818#[derive(Copy, Clone, Debug)]
820enum CacheKind {
821 Block,
822 Receipt,
823 Header,
824 Bal,
825}
826
827#[derive(Debug)]
832struct ActionSender<B: Block, R: Send + Sync> {
833 kind: CacheKind,
834 blockhash: B256,
835 tx: Option<UnboundedSender<CacheAction<B, R>>>,
836}
837
838impl<R: Send + Sync, B: Block> ActionSender<B, R> {
839 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
840 Self { kind, blockhash, tx: Some(tx) }
841 }
842
843 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
844 if let Some(tx) = self.tx.take() {
845 let _ = tx.send(CacheAction::BlockWithSendersResult {
846 block_hash: self.blockhash,
847 res: block_sender,
848 });
849 }
850 }
851
852 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
853 if let Some(tx) = self.tx.take() {
854 let _ =
855 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
856 }
857 }
858
859 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
860 if let Some(tx) = self.tx.take() {
861 let _ = tx.send(CacheAction::HeaderResult {
862 block_hash: self.blockhash,
863 res: Box::new(header),
864 });
865 }
866 }
867
868 fn send_bal(&mut self, bal: Result<Option<CachedRevmBal>, ProviderError>) {
869 if let Some(tx) = self.tx.take() {
870 let _ = tx.send(CacheAction::BalResult { block_hash: self.blockhash, res: bal });
871 }
872 }
873}
874impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
875 fn drop(&mut self) {
876 if let Some(tx) = self.tx.take() {
877 let msg = match self.kind {
878 CacheKind::Block => CacheAction::BlockWithSendersResult {
879 block_hash: self.blockhash,
880 res: Err(CacheServiceUnavailable.into()),
881 },
882 CacheKind::Receipt => CacheAction::ReceiptsResult {
883 block_hash: self.blockhash,
884 res: Err(CacheServiceUnavailable.into()),
885 },
886 CacheKind::Header => CacheAction::HeaderResult {
887 block_hash: self.blockhash,
888 res: Box::new(Err(CacheServiceUnavailable.into())),
889 },
890 CacheKind::Bal => CacheAction::BalResult {
891 block_hash: self.blockhash,
892 res: Err(CacheServiceUnavailable.into()),
893 },
894 };
895 let _ = tx.send(msg);
896 }
897 }
898}
899
900pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
905 eth_state_cache: EthStateCache<N>,
906 mut events: St,
907) where
908 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
909{
910 while let Some(event) = events.next().await {
911 if let Some(reverted) = event.reverted() {
912 let chain_change = ChainChange::new(reverted);
913
914 let _ =
915 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
916 }
917
918 let chain_change = ChainChange::new(event.committed());
919
920 let _ =
921 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
922 }
923}
924
925#[derive(Clone, Debug)]
927pub(crate) struct CachedRevmBal(Arc<DecodedBal<RevmBal>>);
928
929impl CachedRevmBal {
930 #[inline]
932 fn new(bal: DecodedBal<RevmBal>) -> Self {
933 Self(Arc::new(bal))
934 }
935}
936
937impl InMemorySize for CachedRevmBal {
938 fn size(&self) -> usize {
939 core::mem::size_of::<Self>() + decoded_revm_bal_size(&self.0)
940 }
941}
942
943fn decoded_revm_bal_size(bal: &DecodedBal<RevmBal>) -> usize {
944 core::mem::size_of::<DecodedBal<RevmBal>>() + bal.as_raw().len() + revm_bal_size(bal.as_bal())
945}
946
947fn revm_bal_size(bal: &RevmBal) -> usize {
948 core::mem::size_of::<RevmBal>() +
949 bal.accounts.capacity() * core::mem::size_of::<(Address, RevmAccountBal)>() +
950 bal.accounts.values().map(revm_account_bal_heap_size).sum::<usize>()
951}
952
953fn revm_account_bal_heap_size(account: &RevmAccountBal) -> usize {
954 revm_account_info_bal_heap_size(&account.account_info) +
955 revm_storage_bal_heap_size(&account.storage)
956}
957
958fn revm_account_info_bal_heap_size(account_info: &RevmAccountInfoBal) -> usize {
959 revm_bal_writes_heap_size(&account_info.nonce, |_| 0) +
960 revm_bal_writes_heap_size(&account_info.balance, |_| 0) +
961 revm_bal_writes_heap_size(&account_info.code, revm_code_write_heap_size)
962}
963
964fn revm_storage_bal_heap_size(storage: &RevmStorageBal) -> usize {
965 storage.storage.len() * core::mem::size_of::<(StorageKey, RevmBalWrites<StorageValue>)>() +
966 storage
967 .storage
968 .values()
969 .map(|writes| revm_bal_writes_heap_size(writes, |_| 0))
970 .sum::<usize>()
971}
972
973fn revm_bal_writes_heap_size<T, F>(writes: &RevmBalWrites<T>, mut item_heap_size: F) -> usize
974where
975 T: PartialEq + Clone,
976 F: FnMut(&T) -> usize,
977{
978 writes.writes.capacity() * core::mem::size_of::<(u64, T)>() +
979 writes.writes.iter().map(|(_, item)| item_heap_size(item)).sum::<usize>()
980}
981
982fn revm_code_write_heap_size((_, bytecode): &(B256, Bytecode)) -> usize {
983 bytecode.bytes_ref().len()
984}
985
986#[cfg(test)]
987mod tests {
988 use super::*;
989 use alloy_consensus::{transaction::TransactionMeta, Header};
990 use alloy_eips::{BlockHashOrNumber, NumHash};
991 use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, Signature, TxHash, TxNumber};
992 use core::ops::{RangeBounds, RangeInclusive};
993 use reth_db_models::StoredBlockBodyIndices;
994 use reth_ethereum_primitives::{
995 Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
996 };
997 use reth_primitives_traits::{RecoveredBlock, SealedHeader};
998 use reth_storage_api::{
999 noop::NoopProvider, BalProvider, BalStore, BalStoreHandle, BlockBodyIndicesProvider,
1000 BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, ReceiptProvider,
1001 TransactionVariant, TransactionsProvider,
1002 };
1003 use std::sync::atomic::{AtomicUsize, Ordering};
1004
1005 fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
1006 let (_cache, service) = EthStateCache::<EthPrimitives>::create(
1007 NoopProvider::default(),
1008 Runtime::test(),
1009 EthStateCacheConfig {
1010 max_blocks: 4,
1011 max_receipts: 4,
1012 max_headers: 4,
1013 max_bals: 4,
1014 max_concurrent_db_requests: 1,
1015 max_cached_tx_hashes: 16,
1016 },
1017 );
1018 service
1019 }
1020
1021 fn test_decoded_revm_bal() -> DecodedBal<RevmBal> {
1022 DecodedBal::new(RevmBal::default(), Bytes::from_static(&[0xc0]))
1023 }
1024
1025 fn test_block() -> RecoveredBlock<Block> {
1026 RecoveredBlock::new_unhashed(
1027 Block {
1028 header: Header { number: 1, ..Default::default() },
1029 body: BlockBody {
1030 transactions: vec![TransactionSigned::new_unhashed(
1031 Transaction::Legacy(Default::default()),
1032 Signature::test_signature(),
1033 )],
1034 ..Default::default()
1035 },
1036 },
1037 vec![Address::ZERO],
1038 )
1039 }
1040
1041 #[test]
1042 fn reorg_evicts_cached_headers() {
1043 let mut service = test_service();
1044 let block_hash = B256::repeat_byte(0x11);
1045
1046 assert!(service
1047 .headers_cache
1048 .insert(block_hash, Header { number: 42, ..Default::default() }));
1049 assert!(service.headers_cache.get(&block_hash).is_some());
1050
1051 service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
1052
1053 assert!(service.headers_cache.get(&block_hash).is_none());
1054 }
1055
1056 #[test]
1057 fn reorg_forwards_header_to_queued_requests() {
1058 let mut service = test_service();
1059 let block_hash = B256::repeat_byte(0x22);
1060 let (response_tx, mut response_rx) = oneshot::channel();
1061 let header = Header { number: 7, ..Default::default() };
1062
1063 assert!(service.headers_cache.queue(block_hash, response_tx));
1064
1065 service.on_reorg_header(block_hash, Ok(header));
1066
1067 let header =
1068 response_rx.try_recv().expect("queued header response").expect("header result");
1069
1070 assert_eq!(header.number, 7);
1071 }
1072
1073 #[test]
1074 fn reorg_removes_tx_hash_index_entries_unconditionally() {
1075 let mut service = test_service();
1076 let block = test_block();
1077 let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
1078
1079 service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
1080
1081 service.remove_block_transactions(&block);
1082
1083 assert!(service.tx_hash_index.get(&tx_hash).is_none());
1084 }
1085
1086 #[test]
1087 fn reorg_evicts_cached_bal() {
1088 let mut service = test_service();
1089 let block_hash = B256::repeat_byte(0x44);
1090
1091 assert!(service.bal_cache.insert(block_hash, CachedRevmBal::new(test_decoded_revm_bal())));
1092 assert!(service.bal_cache.get(&block_hash).is_some());
1093
1094 service.on_reorg_bal(block_hash, Ok(None));
1095
1096 assert!(service.bal_cache.get(&block_hash).is_none());
1097 }
1098
1099 #[test]
1100 fn reorg_forwards_bal_to_queued_requests() {
1101 let mut service = test_service();
1102 let block_hash = B256::repeat_byte(0x55);
1103 let (response_tx, mut response_rx) = oneshot::channel();
1104 let bal = CachedRevmBal::new(test_decoded_revm_bal());
1105
1106 assert!(service.bal_cache.queue(block_hash, response_tx));
1107
1108 service.on_reorg_bal(block_hash, Ok(Some(bal)));
1109
1110 let bal = response_rx.try_recv().expect("queued BAL response").expect("BAL result");
1111
1112 assert!(bal.is_some());
1113 }
1114
1115 #[test]
1116 fn cached_revm_bal_size_accounts_for_nested_allocations() {
1117 let mut account = RevmAccountBal::default();
1118 account.account_info.nonce.writes.push((1, 1));
1119 account.account_info.balance.writes.push((2, StorageValue::from(1u64)));
1120 account.account_info.code.writes.push((
1121 3,
1122 (B256::repeat_byte(0xaa), Bytecode::new_raw(Bytes::from_static(&[0x60, 0x00]))),
1123 ));
1124 account.storage.storage.insert(
1125 StorageKey::from(1u64),
1126 RevmBalWrites::new(vec![(4, StorageValue::from(2u64))]),
1127 );
1128
1129 let mut bal = RevmBal::default();
1130 bal.accounts.insert(Address::ZERO, account);
1131
1132 let raw = Bytes::from_static(&[0xc0, 0x01, 0x02]);
1133 let previous_estimate = core::mem::size_of::<CachedRevmBal>() +
1134 core::mem::size_of::<DecodedBal<RevmBal>>() +
1135 raw.len() +
1136 core::mem::size_of::<RevmBal>();
1137 assert!(CachedRevmBal::new(DecodedBal::new(bal, raw)).size() > previous_estimate);
1138 }
1139
1140 #[tokio::test]
1141 async fn get_bal_uses_cached_revm_bal() {
1142 let fetches = Arc::new(AtomicUsize::default());
1143 let provider = TestBalProvider::new(fetches.clone());
1144 let cache = EthStateCache::<EthPrimitives>::spawn_with(
1145 provider,
1146 EthStateCacheConfig {
1147 max_blocks: 0,
1148 max_receipts: 0,
1149 max_headers: 0,
1150 max_bals: 4,
1151 max_concurrent_db_requests: 1,
1152 max_cached_tx_hashes: 0,
1153 },
1154 Runtime::test(),
1155 );
1156 let block_hash = B256::repeat_byte(0x66);
1157
1158 assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1159 assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1160
1161 assert_eq!(fetches.load(Ordering::SeqCst), 1);
1162 }
1163
1164 #[tokio::test]
1165 async fn concurrent_get_bal_requests_share_fetch() {
1166 let fetches = Arc::new(AtomicUsize::default());
1167 let provider = TestBalProvider::new(fetches.clone());
1168 let cache = EthStateCache::<EthPrimitives>::spawn_with(
1169 provider,
1170 EthStateCacheConfig {
1171 max_blocks: 0,
1172 max_receipts: 0,
1173 max_headers: 0,
1174 max_bals: 4,
1175 max_concurrent_db_requests: 1,
1176 max_cached_tx_hashes: 0,
1177 },
1178 Runtime::test(),
1179 );
1180 let block_hash = B256::repeat_byte(0x77);
1181
1182 let (first, second) = tokio::join!(cache.get_bal(block_hash), cache.get_bal(block_hash));
1183
1184 assert!(first.unwrap().is_some());
1185 assert!(second.unwrap().is_some());
1186 assert_eq!(fetches.load(Ordering::SeqCst), 1);
1187 }
1188
1189 #[derive(Clone, Debug, Default)]
1190 struct TestBalProvider {
1191 bal_store: BalStoreHandle,
1192 }
1193
1194 impl TestBalProvider {
1195 fn new(fetches: Arc<AtomicUsize>) -> Self {
1196 Self { bal_store: BalStoreHandle::new(TestBalStore { fetches }) }
1197 }
1198 }
1199
1200 impl BalProvider for TestBalProvider {
1201 fn bal_store(&self) -> &BalStoreHandle {
1202 &self.bal_store
1203 }
1204 }
1205
1206 #[derive(Debug)]
1207 struct TestBalStore {
1208 fetches: Arc<AtomicUsize>,
1209 }
1210
1211 impl BalStore for TestBalStore {
1212 fn insert(
1213 &self,
1214 _num_hash: NumHash,
1215 _bal: reth_storage_api::SealedBal,
1216 ) -> ProviderResult<()> {
1217 Ok(())
1218 }
1219
1220 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
1221 Ok(0)
1222 }
1223
1224 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
1225 Ok(block_hashes.iter().map(|_| None).collect())
1226 }
1227
1228 fn revm_bal_by_hash(
1229 &self,
1230 _block_hash: BlockHash,
1231 ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
1232 self.fetches.fetch_add(1, Ordering::SeqCst);
1233 Ok(Some(test_decoded_revm_bal()))
1234 }
1235
1236 fn get_by_range(&self, _start: BlockNumber, _count: u64) -> ProviderResult<Vec<Bytes>> {
1237 Ok(Vec::new())
1238 }
1239
1240 fn bal_stream(&self) -> reth_storage_api::BalNotificationStream {
1241 reth_storage_api::NoopBalStore.bal_stream()
1242 }
1243 }
1244
1245 impl BlockHashReader for TestBalProvider {
1246 fn block_hash(&self, _number: BlockNumber) -> ProviderResult<Option<B256>> {
1247 Ok(None)
1248 }
1249
1250 fn canonical_hashes_range(
1251 &self,
1252 _start: BlockNumber,
1253 _end: BlockNumber,
1254 ) -> ProviderResult<Vec<B256>> {
1255 Ok(Vec::new())
1256 }
1257 }
1258
1259 impl BlockNumReader for TestBalProvider {
1260 fn chain_info(&self) -> ProviderResult<reth_chainspec::ChainInfo> {
1261 Ok(reth_chainspec::ChainInfo::default())
1262 }
1263
1264 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1265 Ok(0)
1266 }
1267
1268 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1269 Ok(0)
1270 }
1271
1272 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1273 Ok(None)
1274 }
1275 }
1276
1277 impl HeaderProvider for TestBalProvider {
1278 type Header = Header;
1279
1280 fn header(&self, _block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1281 Ok(None)
1282 }
1283
1284 fn header_by_number(&self, _num: u64) -> ProviderResult<Option<Self::Header>> {
1285 Ok(None)
1286 }
1287
1288 fn headers_range(
1289 &self,
1290 _range: impl RangeBounds<BlockNumber>,
1291 ) -> ProviderResult<Vec<Self::Header>> {
1292 Ok(Vec::new())
1293 }
1294
1295 fn sealed_header(
1296 &self,
1297 _number: BlockNumber,
1298 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1299 Ok(None)
1300 }
1301
1302 fn sealed_headers_while(
1303 &self,
1304 _range: impl RangeBounds<BlockNumber>,
1305 _predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1306 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1307 Ok(Vec::new())
1308 }
1309 }
1310
1311 impl BlockBodyIndicesProvider for TestBalProvider {
1312 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1313 Ok(None)
1314 }
1315
1316 fn block_body_indices_range(
1317 &self,
1318 _range: RangeInclusive<BlockNumber>,
1319 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1320 Ok(Vec::new())
1321 }
1322 }
1323
1324 impl TransactionsProvider for TestBalProvider {
1325 type Transaction = TransactionSigned;
1326
1327 fn transaction_id(&self, _tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1328 Ok(None)
1329 }
1330
1331 fn transaction_by_id(&self, _id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1332 Ok(None)
1333 }
1334
1335 fn transaction_by_id_unhashed(
1336 &self,
1337 _id: TxNumber,
1338 ) -> ProviderResult<Option<Self::Transaction>> {
1339 Ok(None)
1340 }
1341
1342 fn transaction_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1343 Ok(None)
1344 }
1345
1346 fn transaction_by_hash_with_meta(
1347 &self,
1348 _hash: TxHash,
1349 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1350 Ok(None)
1351 }
1352
1353 fn transactions_by_block(
1354 &self,
1355 _block: BlockHashOrNumber,
1356 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1357 Ok(None)
1358 }
1359
1360 fn transactions_by_block_range(
1361 &self,
1362 _range: impl RangeBounds<BlockNumber>,
1363 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1364 Ok(Vec::new())
1365 }
1366
1367 fn transactions_by_tx_range(
1368 &self,
1369 _range: impl RangeBounds<TxNumber>,
1370 ) -> ProviderResult<Vec<Self::Transaction>> {
1371 Ok(Vec::new())
1372 }
1373
1374 fn senders_by_tx_range(
1375 &self,
1376 _range: impl RangeBounds<TxNumber>,
1377 ) -> ProviderResult<Vec<Address>> {
1378 Ok(Vec::new())
1379 }
1380
1381 fn transaction_sender(&self, _id: TxNumber) -> ProviderResult<Option<Address>> {
1382 Ok(None)
1383 }
1384 }
1385
1386 impl ReceiptProvider for TestBalProvider {
1387 type Receipt = Receipt;
1388
1389 fn receipt(&self, _id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1390 Ok(None)
1391 }
1392
1393 fn receipt_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1394 Ok(None)
1395 }
1396
1397 fn receipts_by_block(
1398 &self,
1399 _block: BlockHashOrNumber,
1400 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1401 Ok(None)
1402 }
1403
1404 fn receipts_by_tx_range(
1405 &self,
1406 _range: impl RangeBounds<TxNumber>,
1407 ) -> ProviderResult<Vec<Self::Receipt>> {
1408 Ok(Vec::new())
1409 }
1410
1411 fn receipts_by_block_range(
1412 &self,
1413 _block_range: RangeInclusive<BlockNumber>,
1414 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1415 Ok(Vec::new())
1416 }
1417 }
1418
1419 impl BlockReader for TestBalProvider {
1420 type Block = Block;
1421
1422 fn find_block_by_hash(
1423 &self,
1424 _hash: B256,
1425 _source: BlockSource,
1426 ) -> ProviderResult<Option<Self::Block>> {
1427 Ok(None)
1428 }
1429
1430 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1431 Ok(None)
1432 }
1433
1434 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1435 Ok(None)
1436 }
1437
1438 fn pending_block_and_receipts(
1439 &self,
1440 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1441 Ok(None)
1442 }
1443
1444 fn recovered_block(
1445 &self,
1446 _id: BlockHashOrNumber,
1447 _transaction_kind: TransactionVariant,
1448 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1449 Ok(None)
1450 }
1451
1452 fn sealed_block_with_senders(
1453 &self,
1454 _id: BlockHashOrNumber,
1455 _transaction_kind: TransactionVariant,
1456 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1457 Ok(None)
1458 }
1459
1460 fn block_range(
1461 &self,
1462 _range: RangeInclusive<BlockNumber>,
1463 ) -> ProviderResult<Vec<Self::Block>> {
1464 Ok(Vec::new())
1465 }
1466
1467 fn block_with_senders_range(
1468 &self,
1469 _range: RangeInclusive<BlockNumber>,
1470 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1471 Ok(Vec::new())
1472 }
1473
1474 fn recovered_block_range(
1475 &self,
1476 _range: RangeInclusive<BlockNumber>,
1477 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1478 Ok(Vec::new())
1479 }
1480
1481 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1482 Ok(None)
1483 }
1484 }
1485}