Skip to main content

reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eips::{eip7928::bal::DecodedBal, BlockHashOrNumber};
7use alloy_primitives::{Address, TxHash, B256};
8use futures::{stream::FuturesOrdered, Stream, StreamExt};
9use reth_chain_state::CanonStateNotification;
10use reth_errors::{ProviderError, ProviderResult};
11use reth_execution_types::Chain;
12use reth_primitives_traits::{Block, BlockBody, InMemorySize, NodePrimitives, RecoveredBlock};
13use reth_revm::{
14    bytecode::Bytecode,
15    primitives::{StorageKey, StorageValue},
16    state::bal::{
17        AccountBal as RevmAccountBal, AccountInfoBal as RevmAccountInfoBal, Bal as RevmBal,
18        BalWrites as RevmBalWrites, StorageBal as RevmStorageBal,
19    },
20};
21use reth_storage_api::{BalProvider, BlockReader, TransactionVariant};
22use reth_tasks::Runtime;
23use schnellru::{ByLength, Limiter, LruMap};
24use std::{
25    future::Future,
26    pin::Pin,
27    sync::Arc,
28    task::{Context, Poll},
29};
30use tokio::sync::{
31    mpsc::{unbounded_channel, UnboundedSender},
32    oneshot, Semaphore,
33};
34use tokio_stream::wrappers::UnboundedReceiverStream;
35
36pub mod config;
37pub mod db;
38pub mod metrics;
39pub mod multi_consumer;
40
41/// The type that can send the response to a requested [`RecoveredBlock`]
42type BlockWithSendersResponseSender<B> =
43    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
44
45/// The type that can send the response to the requested receipts of a block.
46type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
47
48type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
49
50type CachedBlockAndReceiptsResponseSender<B, R> =
51    oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
52
53/// The type that can send the response to a requested header
54type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
55
56/// The type that can send the response with a chain of cached blocks
57type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
58
59/// The type that can send the response for a transaction hash lookup
60type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
61
62/// The type that can send the response to a requested revm BAL.
63type BalResponseSender = oneshot::Sender<ProviderResult<Option<CachedRevmBal>>>;
64
65type BlockLruCache<B, L> =
66    MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
67
68type ReceiptsLruCache<R, L> =
69    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
70
71type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
72
73type BalLruCache<L> = MultiConsumerLruCache<B256, CachedRevmBal, L, BalResponseSender>;
74
75/// Provides async access to cached eth data
76///
77/// This is the frontend for the async caching service which manages cached data on a different
78/// task.
79#[derive(Debug)]
80pub struct EthStateCache<N: NodePrimitives> {
81    to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
82}
83
84impl<N: NodePrimitives> Clone for EthStateCache<N> {
85    fn clone(&self) -> Self {
86        Self { to_service: self.to_service.clone() }
87    }
88}
89
90impl<N: NodePrimitives> EthStateCache<N> {
91    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
92    fn create<Provider>(
93        provider: Provider,
94        action_task_spawner: Runtime,
95        config: EthStateCacheConfig,
96    ) -> (Self, EthStateCacheService<Provider, Runtime>)
97    where
98        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + BalProvider,
99    {
100        let EthStateCacheConfig {
101            max_blocks,
102            max_receipts,
103            max_headers,
104            max_bals,
105            max_concurrent_db_requests,
106            max_cached_tx_hashes,
107        } = config;
108        let (to_service, rx) = unbounded_channel();
109
110        let service = EthStateCacheService {
111            provider,
112            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
113            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
114            headers_cache: HeaderLruCache::new(max_headers, "headers"),
115            bal_cache: BalLruCache::new(max_bals, "bals"),
116            action_tx: to_service.clone(),
117            action_rx: UnboundedReceiverStream::new(rx),
118            action_task_spawner,
119            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_requests)),
120            tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
121        };
122        let cache = Self { to_service };
123        (cache, service)
124    }
125
126    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
127    /// spawner.
128    ///
129    /// The cache is memory limited by the given max bytes values.
130    pub fn spawn_with<Provider>(
131        provider: Provider,
132        config: EthStateCacheConfig,
133        executor: Runtime,
134    ) -> Self
135    where
136        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>
137            + BalProvider
138            + Clone
139            + Unpin
140            + 'static,
141    {
142        let (this, service) = Self::create(provider, executor.clone(), config);
143        executor.spawn_critical_task("eth state cache", service);
144        this
145    }
146
147    /// Requests the  [`RecoveredBlock`] for the block hash
148    ///
149    /// Returns `None` if the block does not exist.
150    pub async fn get_recovered_block(
151        &self,
152        block_hash: B256,
153    ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
154        let (response_tx, rx) = oneshot::channel();
155        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
156        rx.await.map_err(|_| CacheServiceUnavailable)?
157    }
158
159    /// Requests the receipts for the block hash
160    ///
161    /// Returns `None` if the block was not found.
162    pub async fn get_receipts(
163        &self,
164        block_hash: B256,
165    ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
166        let (response_tx, rx) = oneshot::channel();
167        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
168        rx.await.map_err(|_| CacheServiceUnavailable)?
169    }
170
171    /// Fetches both receipts and block for the given block hash.
172    pub async fn get_block_and_receipts(
173        &self,
174        block_hash: B256,
175    ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
176        let block = self.get_recovered_block(block_hash);
177        let receipts = self.get_receipts(block_hash);
178
179        let (block, receipts) = futures::try_join!(block, receipts)?;
180
181        Ok(block.zip(receipts))
182    }
183
184    /// Retrieves receipts and blocks from cache if block is in the cache, otherwise only receipts.
185    pub async fn get_receipts_and_maybe_block(
186        &self,
187        block_hash: B256,
188    ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
189        let (response_tx, rx) = oneshot::channel();
190        let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
191
192        let receipts = self.get_receipts(block_hash);
193
194        let (receipts, block) = futures::join!(receipts, rx);
195
196        let block = block.map_err(|_| CacheServiceUnavailable)?;
197        Ok(receipts?.map(|r| (r, block)))
198    }
199
200    /// Retrieves both block and receipts from cache if available.
201    pub async fn maybe_cached_block_and_receipts(
202        &self,
203        block_hash: B256,
204    ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
205        let (response_tx, rx) = oneshot::channel();
206        let _ = self
207            .to_service
208            .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
209        rx.await.map_err(|_| CacheServiceUnavailable.into())
210    }
211
212    /// Streams cached receipts and blocks for a list of block hashes, preserving input order.
213    #[expect(clippy::type_complexity)]
214    pub fn get_receipts_and_maybe_block_stream<'a>(
215        &'a self,
216        hashes: Vec<B256>,
217    ) -> impl Stream<
218        Item = ProviderResult<
219            Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
220        >,
221    > + 'a {
222        let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
223
224        futures.collect::<FuturesOrdered<_>>()
225    }
226
227    /// Requests the header for the given hash.
228    ///
229    /// Returns an error if the header is not found.
230    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
231        let (response_tx, rx) = oneshot::channel();
232        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
233        rx.await.map_err(|_| CacheServiceUnavailable)?
234    }
235
236    /// Retrieves a chain of connected blocks from the cache, starting from the given block hash
237    /// and traversing down through parent hashes. Returns blocks in descending order (newest
238    /// first).
239    /// This is useful for efficiently retrieving a sequence of blocks that might already be in
240    /// cache without making separate database requests.
241    /// Returns `None` if no blocks are found in the cache, otherwise returns `Some(Vec<...>)`
242    /// with at least one block.
243    pub async fn get_cached_parent_blocks(
244        &self,
245        block_hash: B256,
246        max_blocks: usize,
247    ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
248        let (response_tx, rx) = oneshot::channel();
249        let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
250            block_hash,
251            max_blocks,
252            response_tx,
253        });
254
255        let blocks = rx.await.unwrap_or_default();
256        if blocks.is_empty() {
257            None
258        } else {
259            Some(blocks)
260        }
261    }
262
263    /// Looks up a transaction by its hash in the cache index.
264    ///
265    /// Returns the cached block, transaction index, and optionally receipts if the transaction
266    /// is in a cached block.
267    pub async fn get_transaction_by_hash(
268        &self,
269        tx_hash: TxHash,
270    ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
271        let (response_tx, rx) = oneshot::channel();
272        let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
273        rx.await.ok()?
274    }
275
276    /// Requests the revm BAL for the block hash.
277    ///
278    /// Returns `None` if the BAL does not exist.
279    pub async fn get_bal(
280        &self,
281        block_hash: B256,
282    ) -> ProviderResult<Option<Arc<DecodedBal<RevmBal>>>> {
283        let (response_tx, rx) = oneshot::channel();
284        let _ = self.to_service.send(CacheAction::GetBal { block_hash, response_tx });
285        rx.await
286            .map_err(|_| CacheServiceUnavailable)?
287            .map(|maybe_bal| maybe_bal.map(|cached| cached.0))
288    }
289}
290/// Thrown when the cache service task dropped.
291#[derive(Debug, thiserror::Error)]
292#[error("cache service task stopped")]
293pub struct CacheServiceUnavailable;
294
295impl From<CacheServiceUnavailable> for ProviderError {
296    fn from(err: CacheServiceUnavailable) -> Self {
297        Self::other(err)
298    }
299}
300
301/// A task that manages caches for data required by the `eth` rpc implementation.
302///
303/// It provides a caching layer on top of the given
304/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
305/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
306/// into the cache afterwards. While fetching data from disk is sync, this service is async since
307/// requests and data is shared via channels.
308///
309/// This type is an endless future that listens for incoming messages from the user facing
310/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
311/// that does the IO and sends the result back to it. This way the caching service only
312/// handles messages and does LRU lookups and never blocking IO.
313///
314/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
315/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
316/// permits to limit concurrent requests.
317#[must_use = "Type does nothing unless spawned"]
318pub(crate) struct EthStateCacheService<
319    Provider,
320    Tasks,
321    LimitBlocks = ByLength,
322    LimitReceipts = ByLength,
323    LimitHeaders = ByLength,
324    LimitBals = ByLength,
325> where
326    Provider: BlockReader + BalProvider,
327    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
328    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
329    LimitHeaders: Limiter<B256, Provider::Header>,
330    LimitBals: Limiter<B256, CachedRevmBal>,
331{
332    /// The type used to lookup data from disk
333    provider: Provider,
334    /// The LRU cache for full blocks grouped by their block hash.
335    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
336    /// The LRU cache for block receipts grouped by the block hash.
337    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
338    /// The LRU cache for headers.
339    ///
340    /// Headers are cached because they are required to populate the environment for execution
341    /// (evm).
342    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
343    /// The LRU cache for revm BALs grouped by the block hash.
344    bal_cache: BalLruCache<LimitBals>,
345    /// Sender half of the action channel.
346    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
347    /// Receiver half of the action channel.
348    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
349    /// The type that's used to spawn tasks that do the actual work
350    action_task_spawner: Tasks,
351    /// Rate limiter for spawned fetch tasks.
352    ///
353    /// This restricts the max concurrent fetch tasks at the same time.
354    rate_limiter: Arc<Semaphore>,
355    /// LRU index mapping transaction hashes to their block hash and index within the block.
356    tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
357}
358
359impl<Provider> EthStateCacheService<Provider, Runtime>
360where
361    Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
362{
363    /// Indexes all transactions in a block by transaction hash.
364    fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
365        let block_hash = block.hash();
366        for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
367            self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
368        }
369    }
370
371    /// Removes transaction index entries for a reorged block.
372    fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
373        for tx in block.body().transactions() {
374            self.tx_hash_index.remove(tx.tx_hash());
375        }
376    }
377
378    fn on_new_block(
379        &mut self,
380        block_hash: B256,
381        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
382    ) {
383        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
384            // send the response to queued senders
385            for tx in queued {
386                let _ = tx.send(res.clone());
387            }
388        }
389
390        // cache good block
391        if let Ok(Some(block)) = res {
392            self.full_block_cache.insert(block_hash, block);
393        }
394    }
395
396    fn on_new_receipts(
397        &mut self,
398        block_hash: B256,
399        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
400    ) {
401        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
402            // send the response to queued senders
403            for tx in queued {
404                let _ = tx.send(res.clone());
405            }
406        }
407
408        // cache good receipts
409        if let Ok(Some(receipts)) = res {
410            self.receipts_cache.insert(block_hash, receipts);
411        }
412    }
413
414    fn on_new_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
415        if let Some(queued) = self.bal_cache.remove(&block_hash) {
416            for tx in queued {
417                let _ = tx.send(res.clone());
418            }
419        }
420
421        if let Ok(Some(bal)) = res {
422            self.bal_cache.insert(block_hash, bal);
423        }
424    }
425
426    fn on_reorg_block(
427        &mut self,
428        block_hash: B256,
429        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
430    ) {
431        let res = res.map(|b| b.map(Arc::new));
432        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
433            // send the response to queued senders
434            for tx in queued {
435                let _ = tx.send(res.clone());
436            }
437        }
438    }
439
440    fn on_reorg_receipts(
441        &mut self,
442        block_hash: B256,
443        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
444    ) {
445        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
446            // send the response to queued senders
447            for tx in queued {
448                let _ = tx.send(res.clone());
449            }
450        }
451    }
452
453    fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
454        if let Some(queued) = self.headers_cache.remove(&block_hash) {
455            // send the response to queued senders
456            for tx in queued {
457                let _ = tx.send(res.clone());
458            }
459        }
460    }
461
462    fn on_reorg_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
463        if let Some(queued) = self.bal_cache.remove(&block_hash) {
464            for tx in queued {
465                let _ = tx.send(res.clone());
466            }
467        }
468    }
469
470    /// Shrinks the queues but leaves some space for the next requests
471    fn shrink_queues(&mut self) {
472        let min_capacity = 2;
473        self.full_block_cache.shrink_to(min_capacity);
474        self.receipts_cache.shrink_to(min_capacity);
475        self.headers_cache.shrink_to(min_capacity);
476        self.bal_cache.shrink_to(min_capacity);
477    }
478
479    fn update_cached_metrics(&self) {
480        self.full_block_cache.update_cached_metrics();
481        self.receipts_cache.update_cached_metrics();
482        self.headers_cache.update_cached_metrics();
483        self.bal_cache.update_cached_metrics();
484    }
485}
486
487impl<Provider> Future for EthStateCacheService<Provider, Runtime>
488where
489    Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
490{
491    type Output = ();
492
493    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
494        let this = self.get_mut();
495
496        loop {
497            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
498                // shrink queues if we don't have any work to do
499                this.shrink_queues();
500                return Poll::Pending;
501            };
502
503            match action {
504                None => {
505                    unreachable!("can't close")
506                }
507                Some(action) => {
508                    match action {
509                        CacheAction::GetCachedBlock { block_hash, response_tx } => {
510                            let _ =
511                                response_tx.send(this.full_block_cache.get(&block_hash).cloned());
512                        }
513                        CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
514                            let block = this.full_block_cache.get(&block_hash).cloned();
515                            let receipts = this.receipts_cache.get(&block_hash).cloned();
516                            let _ = response_tx.send((block, receipts));
517                        }
518                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
519                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
520                                let _ = response_tx.send(Ok(Some(block)));
521                                continue
522                            }
523
524                            // block is not in the cache, request it if this is the first consumer
525                            if this.full_block_cache.queue(block_hash, response_tx) {
526                                let provider = this.provider.clone();
527                                let action_tx = this.action_tx.clone();
528                                let rate_limiter = this.rate_limiter.clone();
529                                let mut action_sender =
530                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
531                                this.action_task_spawner.spawn_blocking_task(async move {
532                                    // Acquire permit
533                                    let _permit = rate_limiter.acquire().await;
534                                    // Only look in the database to prevent situations where we
535                                    // looking up the tree is blocking
536                                    let block_sender = provider
537                                        .sealed_block_with_senders(
538                                            BlockHashOrNumber::Hash(block_hash),
539                                            TransactionVariant::WithHash,
540                                        )
541                                        .map(|maybe_block| maybe_block.map(Arc::new));
542                                    action_sender.send_block(block_sender);
543                                });
544                            }
545                        }
546                        CacheAction::GetReceipts { block_hash, response_tx } => {
547                            // check if block is cached
548                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
549                                let _ = response_tx.send(Ok(Some(receipts)));
550                                continue
551                            }
552
553                            // block is not in the cache, request it if this is the first consumer
554                            if this.receipts_cache.queue(block_hash, response_tx) {
555                                let provider = this.provider.clone();
556                                let action_tx = this.action_tx.clone();
557                                let rate_limiter = this.rate_limiter.clone();
558                                let mut action_sender =
559                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
560                                this.action_task_spawner.spawn_blocking_task(async move {
561                                    // Acquire permit
562                                    let _permit = rate_limiter.acquire().await;
563                                    let res = provider
564                                        .receipts_by_block(block_hash.into())
565                                        .map(|maybe_receipts| maybe_receipts.map(Arc::new));
566
567                                    action_sender.send_receipts(res);
568                                });
569                            }
570                        }
571                        CacheAction::GetHeader { block_hash, response_tx } => {
572                            // check if the header is cached
573                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
574                                let _ = response_tx.send(Ok(header));
575                                continue
576                            }
577
578                            // it's possible we have the entire block cached
579                            if let Some(block) = this.full_block_cache.get(&block_hash) {
580                                let _ = response_tx.send(Ok(block.clone_header()));
581                                continue
582                            }
583
584                            // header is not in the cache, request it if this is the first
585                            // consumer
586                            if this.headers_cache.queue(block_hash, response_tx) {
587                                let provider = this.provider.clone();
588                                let action_tx = this.action_tx.clone();
589                                let rate_limiter = this.rate_limiter.clone();
590                                let mut action_sender =
591                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
592                                this.action_task_spawner.spawn_blocking_task(async move {
593                                    // Acquire permit
594                                    let _permit = rate_limiter.acquire().await;
595                                    let header = provider.header(block_hash).and_then(|header| {
596                                        header.ok_or_else(|| {
597                                            ProviderError::HeaderNotFound(block_hash.into())
598                                        })
599                                    });
600                                    action_sender.send_header(header);
601                                });
602                            }
603                        }
604                        CacheAction::GetBal { block_hash, response_tx } => {
605                            if let Some(bal) = this.bal_cache.get(&block_hash).cloned() {
606                                let _ = response_tx.send(Ok(Some(bal)));
607                                continue
608                            }
609
610                            if this.bal_cache.queue(block_hash, response_tx) {
611                                let provider = this.provider.clone();
612                                let action_tx = this.action_tx.clone();
613                                let rate_limiter = this.rate_limiter.clone();
614                                let mut action_sender =
615                                    ActionSender::new(CacheKind::Bal, block_hash, action_tx);
616                                this.action_task_spawner.spawn_blocking_task(async move {
617                                    let _permit = rate_limiter.acquire().await;
618                                    let res = provider
619                                        .bal_store()
620                                        .revm_bal_by_hash(block_hash)
621                                        .map(|maybe_bal| maybe_bal.map(CachedRevmBal::new));
622                                    action_sender.send_bal(res);
623                                });
624                            }
625                        }
626                        CacheAction::ReceiptsResult { block_hash, res } => {
627                            this.on_new_receipts(block_hash, res);
628                        }
629                        CacheAction::BalResult { block_hash, res } => {
630                            this.on_new_bal(block_hash, res);
631                        }
632                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
633                            Ok(Some(block_with_senders)) => {
634                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
635                            }
636                            Ok(None) => {
637                                this.on_new_block(block_hash, Ok(None));
638                            }
639                            Err(e) => {
640                                this.on_new_block(block_hash, Err(e));
641                            }
642                        },
643                        CacheAction::HeaderResult { block_hash, res } => {
644                            let res = *res;
645                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
646                                // send the response to queued senders
647                                for tx in queued {
648                                    let _ = tx.send(res.clone());
649                                }
650                            }
651
652                            // cache good header
653                            if let Ok(data) = res {
654                                this.headers_cache.insert(block_hash, data);
655                            }
656                        }
657                        CacheAction::CacheNewCanonicalChain { chain_change } => {
658                            for block in chain_change.blocks {
659                                // Index transactions before caching the block
660                                this.index_block_transactions(&block);
661                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
662                            }
663
664                            for block_receipts in chain_change.receipts {
665                                this.on_new_receipts(
666                                    block_receipts.block_hash,
667                                    Ok(Some(Arc::new(block_receipts.receipts))),
668                                );
669                            }
670                        }
671                        CacheAction::RemoveReorgedChain { chain_change } => {
672                            for block in chain_change.blocks {
673                                let block_hash = block.hash();
674                                let header = block.clone_header();
675                                // Remove transaction index entries for reorged blocks
676                                this.remove_block_transactions(&block);
677                                this.on_reorg_block(block_hash, Ok(Some(block)));
678                                this.on_reorg_header(block_hash, Ok(header));
679                                this.on_reorg_bal(block_hash, Ok(None));
680                            }
681
682                            for block_receipts in chain_change.receipts {
683                                this.on_reorg_receipts(
684                                    block_receipts.block_hash,
685                                    Ok(Some(Arc::new(block_receipts.receipts))),
686                                );
687                            }
688                        }
689                        CacheAction::GetCachedParentBlocks {
690                            block_hash,
691                            max_blocks,
692                            response_tx,
693                        } => {
694                            let mut blocks = Vec::new();
695                            let mut current_hash = block_hash;
696
697                            // Start with the requested block
698                            while blocks.len() < max_blocks {
699                                if let Some(block) =
700                                    this.full_block_cache.get(&current_hash).cloned()
701                                {
702                                    // Get the parent hash for the next iteration
703                                    current_hash = block.header().parent_hash();
704                                    blocks.push(block);
705                                } else {
706                                    // Break the loop if we can't find the current block
707                                    break;
708                                }
709                            }
710
711                            let _ = response_tx.send(blocks);
712                        }
713                        CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
714                            let result =
715                                this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
716                                    let block = this.full_block_cache.get(block_hash).cloned()?;
717                                    let receipts = this.receipts_cache.get(block_hash).cloned();
718                                    Some(CachedTransaction::new(block, *idx, receipts))
719                                });
720                            let _ = response_tx.send(result);
721                        }
722                    };
723                    this.update_cached_metrics();
724                }
725            }
726        }
727    }
728}
729
730/// All message variants sent through the channel
731enum CacheAction<B: Block, R> {
732    GetBlockWithSenders {
733        block_hash: B256,
734        response_tx: BlockWithSendersResponseSender<B>,
735    },
736    GetHeader {
737        block_hash: B256,
738        response_tx: HeaderResponseSender<B::Header>,
739    },
740    GetReceipts {
741        block_hash: B256,
742        response_tx: ReceiptsResponseSender<R>,
743    },
744    GetBal {
745        block_hash: B256,
746        response_tx: BalResponseSender,
747    },
748    GetCachedBlock {
749        block_hash: B256,
750        response_tx: CachedBlockResponseSender<B>,
751    },
752    GetCachedBlockAndReceipts {
753        block_hash: B256,
754        response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
755    },
756    BlockWithSendersResult {
757        block_hash: B256,
758        res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
759    },
760    ReceiptsResult {
761        block_hash: B256,
762        res: ProviderResult<Option<Arc<Vec<R>>>>,
763    },
764    HeaderResult {
765        block_hash: B256,
766        res: Box<ProviderResult<B::Header>>,
767    },
768    BalResult {
769        block_hash: B256,
770        res: ProviderResult<Option<CachedRevmBal>>,
771    },
772    CacheNewCanonicalChain {
773        chain_change: ChainChange<B, R>,
774    },
775    RemoveReorgedChain {
776        chain_change: ChainChange<B, R>,
777    },
778    GetCachedParentBlocks {
779        block_hash: B256,
780        max_blocks: usize,
781        response_tx: CachedParentBlocksResponseSender<B>,
782    },
783    /// Look up a transaction's cached data by its hash
784    GetTransactionByHash {
785        tx_hash: TxHash,
786        response_tx: TransactionHashResponseSender<B, R>,
787    },
788}
789
790struct BlockReceipts<R> {
791    block_hash: B256,
792    receipts: Vec<R>,
793}
794
795/// A change of the canonical chain
796struct ChainChange<B: Block, R> {
797    blocks: Vec<RecoveredBlock<B>>,
798    receipts: Vec<BlockReceipts<R>>,
799}
800
801impl<B: Block, R: Clone> ChainChange<B, R> {
802    fn new<N>(chain: Arc<Chain<N>>) -> Self
803    where
804        N: NodePrimitives<Block = B, Receipt = R>,
805    {
806        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
807            .blocks_and_receipts()
808            .map(|(block, receipts)| {
809                let block_receipts =
810                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
811                (block.clone(), block_receipts)
812            })
813            .unzip();
814        Self { blocks, receipts }
815    }
816}
817
818/// Identifier for the caches.
819#[derive(Copy, Clone, Debug)]
820enum CacheKind {
821    Block,
822    Receipt,
823    Header,
824    Bal,
825}
826
827/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
828/// before a result could be sent.
829///
830/// This type wraps a sender and in case the sender is still present on drop emit an error response.
831#[derive(Debug)]
832struct ActionSender<B: Block, R: Send + Sync> {
833    kind: CacheKind,
834    blockhash: B256,
835    tx: Option<UnboundedSender<CacheAction<B, R>>>,
836}
837
838impl<R: Send + Sync, B: Block> ActionSender<B, R> {
839    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
840        Self { kind, blockhash, tx: Some(tx) }
841    }
842
843    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
844        if let Some(tx) = self.tx.take() {
845            let _ = tx.send(CacheAction::BlockWithSendersResult {
846                block_hash: self.blockhash,
847                res: block_sender,
848            });
849        }
850    }
851
852    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
853        if let Some(tx) = self.tx.take() {
854            let _ =
855                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
856        }
857    }
858
859    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
860        if let Some(tx) = self.tx.take() {
861            let _ = tx.send(CacheAction::HeaderResult {
862                block_hash: self.blockhash,
863                res: Box::new(header),
864            });
865        }
866    }
867
868    fn send_bal(&mut self, bal: Result<Option<CachedRevmBal>, ProviderError>) {
869        if let Some(tx) = self.tx.take() {
870            let _ = tx.send(CacheAction::BalResult { block_hash: self.blockhash, res: bal });
871        }
872    }
873}
874impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
875    fn drop(&mut self) {
876        if let Some(tx) = self.tx.take() {
877            let msg = match self.kind {
878                CacheKind::Block => CacheAction::BlockWithSendersResult {
879                    block_hash: self.blockhash,
880                    res: Err(CacheServiceUnavailable.into()),
881                },
882                CacheKind::Receipt => CacheAction::ReceiptsResult {
883                    block_hash: self.blockhash,
884                    res: Err(CacheServiceUnavailable.into()),
885                },
886                CacheKind::Header => CacheAction::HeaderResult {
887                    block_hash: self.blockhash,
888                    res: Box::new(Err(CacheServiceUnavailable.into())),
889                },
890                CacheKind::Bal => CacheAction::BalResult {
891                    block_hash: self.blockhash,
892                    res: Err(CacheServiceUnavailable.into()),
893                },
894            };
895            let _ = tx.send(msg);
896        }
897    }
898}
899
900/// Awaits for new chain events and directly inserts them into the cache so they're available
901/// immediately before they need to be fetched from disk.
902///
903/// Reorged blocks are removed from the cache.
904pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
905    eth_state_cache: EthStateCache<N>,
906    mut events: St,
907) where
908    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
909{
910    while let Some(event) = events.next().await {
911        if let Some(reverted) = event.reverted() {
912            let chain_change = ChainChange::new(reverted);
913
914            let _ =
915                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
916        }
917
918        let chain_change = ChainChange::new(event.committed());
919
920        let _ =
921            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
922    }
923}
924
925/// Cached decoded revm BAL.
926#[derive(Clone, Debug)]
927pub(crate) struct CachedRevmBal(Arc<DecodedBal<RevmBal>>);
928
929impl CachedRevmBal {
930    /// Creates a cached revm BAL from an owned decoded BAL.
931    #[inline]
932    fn new(bal: DecodedBal<RevmBal>) -> Self {
933        Self(Arc::new(bal))
934    }
935}
936
937impl InMemorySize for CachedRevmBal {
938    fn size(&self) -> usize {
939        core::mem::size_of::<Self>() + decoded_revm_bal_size(&self.0)
940    }
941}
942
943fn decoded_revm_bal_size(bal: &DecodedBal<RevmBal>) -> usize {
944    core::mem::size_of::<DecodedBal<RevmBal>>() + bal.as_raw().len() + revm_bal_size(bal.as_bal())
945}
946
947fn revm_bal_size(bal: &RevmBal) -> usize {
948    core::mem::size_of::<RevmBal>() +
949        bal.accounts.capacity() * core::mem::size_of::<(Address, RevmAccountBal)>() +
950        bal.accounts.values().map(revm_account_bal_heap_size).sum::<usize>()
951}
952
953fn revm_account_bal_heap_size(account: &RevmAccountBal) -> usize {
954    revm_account_info_bal_heap_size(&account.account_info) +
955        revm_storage_bal_heap_size(&account.storage)
956}
957
958fn revm_account_info_bal_heap_size(account_info: &RevmAccountInfoBal) -> usize {
959    revm_bal_writes_heap_size(&account_info.nonce, |_| 0) +
960        revm_bal_writes_heap_size(&account_info.balance, |_| 0) +
961        revm_bal_writes_heap_size(&account_info.code, revm_code_write_heap_size)
962}
963
964fn revm_storage_bal_heap_size(storage: &RevmStorageBal) -> usize {
965    storage.storage.len() * core::mem::size_of::<(StorageKey, RevmBalWrites<StorageValue>)>() +
966        storage
967            .storage
968            .values()
969            .map(|writes| revm_bal_writes_heap_size(writes, |_| 0))
970            .sum::<usize>()
971}
972
973fn revm_bal_writes_heap_size<T, F>(writes: &RevmBalWrites<T>, mut item_heap_size: F) -> usize
974where
975    T: PartialEq + Clone,
976    F: FnMut(&T) -> usize,
977{
978    writes.writes.capacity() * core::mem::size_of::<(u64, T)>() +
979        writes.writes.iter().map(|(_, item)| item_heap_size(item)).sum::<usize>()
980}
981
982fn revm_code_write_heap_size((_, bytecode): &(B256, Bytecode)) -> usize {
983    bytecode.bytes_ref().len()
984}
985
986#[cfg(test)]
987mod tests {
988    use super::*;
989    use alloy_consensus::{transaction::TransactionMeta, Header};
990    use alloy_eips::{BlockHashOrNumber, NumHash};
991    use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, Signature, TxHash, TxNumber};
992    use core::ops::{RangeBounds, RangeInclusive};
993    use reth_db_models::StoredBlockBodyIndices;
994    use reth_ethereum_primitives::{
995        Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
996    };
997    use reth_primitives_traits::{RecoveredBlock, SealedHeader};
998    use reth_storage_api::{
999        noop::NoopProvider, BalProvider, BalStore, BalStoreHandle, BlockBodyIndicesProvider,
1000        BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, ReceiptProvider,
1001        TransactionVariant, TransactionsProvider,
1002    };
1003    use std::sync::atomic::{AtomicUsize, Ordering};
1004
1005    fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
1006        let (_cache, service) = EthStateCache::<EthPrimitives>::create(
1007            NoopProvider::default(),
1008            Runtime::test(),
1009            EthStateCacheConfig {
1010                max_blocks: 4,
1011                max_receipts: 4,
1012                max_headers: 4,
1013                max_bals: 4,
1014                max_concurrent_db_requests: 1,
1015                max_cached_tx_hashes: 16,
1016            },
1017        );
1018        service
1019    }
1020
1021    fn test_decoded_revm_bal() -> DecodedBal<RevmBal> {
1022        DecodedBal::new(RevmBal::default(), Bytes::from_static(&[0xc0]))
1023    }
1024
1025    fn test_block() -> RecoveredBlock<Block> {
1026        RecoveredBlock::new_unhashed(
1027            Block {
1028                header: Header { number: 1, ..Default::default() },
1029                body: BlockBody {
1030                    transactions: vec![TransactionSigned::new_unhashed(
1031                        Transaction::Legacy(Default::default()),
1032                        Signature::test_signature(),
1033                    )],
1034                    ..Default::default()
1035                },
1036            },
1037            vec![Address::ZERO],
1038        )
1039    }
1040
1041    #[test]
1042    fn reorg_evicts_cached_headers() {
1043        let mut service = test_service();
1044        let block_hash = B256::repeat_byte(0x11);
1045
1046        assert!(service
1047            .headers_cache
1048            .insert(block_hash, Header { number: 42, ..Default::default() }));
1049        assert!(service.headers_cache.get(&block_hash).is_some());
1050
1051        service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
1052
1053        assert!(service.headers_cache.get(&block_hash).is_none());
1054    }
1055
1056    #[test]
1057    fn reorg_forwards_header_to_queued_requests() {
1058        let mut service = test_service();
1059        let block_hash = B256::repeat_byte(0x22);
1060        let (response_tx, mut response_rx) = oneshot::channel();
1061        let header = Header { number: 7, ..Default::default() };
1062
1063        assert!(service.headers_cache.queue(block_hash, response_tx));
1064
1065        service.on_reorg_header(block_hash, Ok(header));
1066
1067        let header =
1068            response_rx.try_recv().expect("queued header response").expect("header result");
1069
1070        assert_eq!(header.number, 7);
1071    }
1072
1073    #[test]
1074    fn reorg_removes_tx_hash_index_entries_unconditionally() {
1075        let mut service = test_service();
1076        let block = test_block();
1077        let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
1078
1079        service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
1080
1081        service.remove_block_transactions(&block);
1082
1083        assert!(service.tx_hash_index.get(&tx_hash).is_none());
1084    }
1085
1086    #[test]
1087    fn reorg_evicts_cached_bal() {
1088        let mut service = test_service();
1089        let block_hash = B256::repeat_byte(0x44);
1090
1091        assert!(service.bal_cache.insert(block_hash, CachedRevmBal::new(test_decoded_revm_bal())));
1092        assert!(service.bal_cache.get(&block_hash).is_some());
1093
1094        service.on_reorg_bal(block_hash, Ok(None));
1095
1096        assert!(service.bal_cache.get(&block_hash).is_none());
1097    }
1098
1099    #[test]
1100    fn reorg_forwards_bal_to_queued_requests() {
1101        let mut service = test_service();
1102        let block_hash = B256::repeat_byte(0x55);
1103        let (response_tx, mut response_rx) = oneshot::channel();
1104        let bal = CachedRevmBal::new(test_decoded_revm_bal());
1105
1106        assert!(service.bal_cache.queue(block_hash, response_tx));
1107
1108        service.on_reorg_bal(block_hash, Ok(Some(bal)));
1109
1110        let bal = response_rx.try_recv().expect("queued BAL response").expect("BAL result");
1111
1112        assert!(bal.is_some());
1113    }
1114
1115    #[test]
1116    fn cached_revm_bal_size_accounts_for_nested_allocations() {
1117        let mut account = RevmAccountBal::default();
1118        account.account_info.nonce.writes.push((1, 1));
1119        account.account_info.balance.writes.push((2, StorageValue::from(1u64)));
1120        account.account_info.code.writes.push((
1121            3,
1122            (B256::repeat_byte(0xaa), Bytecode::new_raw(Bytes::from_static(&[0x60, 0x00]))),
1123        ));
1124        account.storage.storage.insert(
1125            StorageKey::from(1u64),
1126            RevmBalWrites::new(vec![(4, StorageValue::from(2u64))]),
1127        );
1128
1129        let mut bal = RevmBal::default();
1130        bal.accounts.insert(Address::ZERO, account);
1131
1132        let raw = Bytes::from_static(&[0xc0, 0x01, 0x02]);
1133        let previous_estimate = core::mem::size_of::<CachedRevmBal>() +
1134            core::mem::size_of::<DecodedBal<RevmBal>>() +
1135            raw.len() +
1136            core::mem::size_of::<RevmBal>();
1137        assert!(CachedRevmBal::new(DecodedBal::new(bal, raw)).size() > previous_estimate);
1138    }
1139
1140    #[tokio::test]
1141    async fn get_bal_uses_cached_revm_bal() {
1142        let fetches = Arc::new(AtomicUsize::default());
1143        let provider = TestBalProvider::new(fetches.clone());
1144        let cache = EthStateCache::<EthPrimitives>::spawn_with(
1145            provider,
1146            EthStateCacheConfig {
1147                max_blocks: 0,
1148                max_receipts: 0,
1149                max_headers: 0,
1150                max_bals: 4,
1151                max_concurrent_db_requests: 1,
1152                max_cached_tx_hashes: 0,
1153            },
1154            Runtime::test(),
1155        );
1156        let block_hash = B256::repeat_byte(0x66);
1157
1158        assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1159        assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1160
1161        assert_eq!(fetches.load(Ordering::SeqCst), 1);
1162    }
1163
1164    #[tokio::test]
1165    async fn concurrent_get_bal_requests_share_fetch() {
1166        let fetches = Arc::new(AtomicUsize::default());
1167        let provider = TestBalProvider::new(fetches.clone());
1168        let cache = EthStateCache::<EthPrimitives>::spawn_with(
1169            provider,
1170            EthStateCacheConfig {
1171                max_blocks: 0,
1172                max_receipts: 0,
1173                max_headers: 0,
1174                max_bals: 4,
1175                max_concurrent_db_requests: 1,
1176                max_cached_tx_hashes: 0,
1177            },
1178            Runtime::test(),
1179        );
1180        let block_hash = B256::repeat_byte(0x77);
1181
1182        let (first, second) = tokio::join!(cache.get_bal(block_hash), cache.get_bal(block_hash));
1183
1184        assert!(first.unwrap().is_some());
1185        assert!(second.unwrap().is_some());
1186        assert_eq!(fetches.load(Ordering::SeqCst), 1);
1187    }
1188
1189    #[derive(Clone, Debug, Default)]
1190    struct TestBalProvider {
1191        bal_store: BalStoreHandle,
1192    }
1193
1194    impl TestBalProvider {
1195        fn new(fetches: Arc<AtomicUsize>) -> Self {
1196            Self { bal_store: BalStoreHandle::new(TestBalStore { fetches }) }
1197        }
1198    }
1199
1200    impl BalProvider for TestBalProvider {
1201        fn bal_store(&self) -> &BalStoreHandle {
1202            &self.bal_store
1203        }
1204    }
1205
1206    #[derive(Debug)]
1207    struct TestBalStore {
1208        fetches: Arc<AtomicUsize>,
1209    }
1210
1211    impl BalStore for TestBalStore {
1212        fn insert(
1213            &self,
1214            _num_hash: NumHash,
1215            _bal: reth_storage_api::SealedBal,
1216        ) -> ProviderResult<()> {
1217            Ok(())
1218        }
1219
1220        fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
1221            Ok(0)
1222        }
1223
1224        fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
1225            Ok(block_hashes.iter().map(|_| None).collect())
1226        }
1227
1228        fn revm_bal_by_hash(
1229            &self,
1230            _block_hash: BlockHash,
1231        ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
1232            self.fetches.fetch_add(1, Ordering::SeqCst);
1233            Ok(Some(test_decoded_revm_bal()))
1234        }
1235
1236        fn get_by_range(&self, _start: BlockNumber, _count: u64) -> ProviderResult<Vec<Bytes>> {
1237            Ok(Vec::new())
1238        }
1239
1240        fn bal_stream(&self) -> reth_storage_api::BalNotificationStream {
1241            reth_storage_api::NoopBalStore.bal_stream()
1242        }
1243    }
1244
1245    impl BlockHashReader for TestBalProvider {
1246        fn block_hash(&self, _number: BlockNumber) -> ProviderResult<Option<B256>> {
1247            Ok(None)
1248        }
1249
1250        fn canonical_hashes_range(
1251            &self,
1252            _start: BlockNumber,
1253            _end: BlockNumber,
1254        ) -> ProviderResult<Vec<B256>> {
1255            Ok(Vec::new())
1256        }
1257    }
1258
1259    impl BlockNumReader for TestBalProvider {
1260        fn chain_info(&self) -> ProviderResult<reth_chainspec::ChainInfo> {
1261            Ok(reth_chainspec::ChainInfo::default())
1262        }
1263
1264        fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1265            Ok(0)
1266        }
1267
1268        fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1269            Ok(0)
1270        }
1271
1272        fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1273            Ok(None)
1274        }
1275    }
1276
1277    impl HeaderProvider for TestBalProvider {
1278        type Header = Header;
1279
1280        fn header(&self, _block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1281            Ok(None)
1282        }
1283
1284        fn header_by_number(&self, _num: u64) -> ProviderResult<Option<Self::Header>> {
1285            Ok(None)
1286        }
1287
1288        fn headers_range(
1289            &self,
1290            _range: impl RangeBounds<BlockNumber>,
1291        ) -> ProviderResult<Vec<Self::Header>> {
1292            Ok(Vec::new())
1293        }
1294
1295        fn sealed_header(
1296            &self,
1297            _number: BlockNumber,
1298        ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1299            Ok(None)
1300        }
1301
1302        fn sealed_headers_while(
1303            &self,
1304            _range: impl RangeBounds<BlockNumber>,
1305            _predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1306        ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1307            Ok(Vec::new())
1308        }
1309    }
1310
1311    impl BlockBodyIndicesProvider for TestBalProvider {
1312        fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1313            Ok(None)
1314        }
1315
1316        fn block_body_indices_range(
1317            &self,
1318            _range: RangeInclusive<BlockNumber>,
1319        ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1320            Ok(Vec::new())
1321        }
1322    }
1323
1324    impl TransactionsProvider for TestBalProvider {
1325        type Transaction = TransactionSigned;
1326
1327        fn transaction_id(&self, _tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1328            Ok(None)
1329        }
1330
1331        fn transaction_by_id(&self, _id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1332            Ok(None)
1333        }
1334
1335        fn transaction_by_id_unhashed(
1336            &self,
1337            _id: TxNumber,
1338        ) -> ProviderResult<Option<Self::Transaction>> {
1339            Ok(None)
1340        }
1341
1342        fn transaction_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1343            Ok(None)
1344        }
1345
1346        fn transaction_by_hash_with_meta(
1347            &self,
1348            _hash: TxHash,
1349        ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1350            Ok(None)
1351        }
1352
1353        fn transactions_by_block(
1354            &self,
1355            _block: BlockHashOrNumber,
1356        ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1357            Ok(None)
1358        }
1359
1360        fn transactions_by_block_range(
1361            &self,
1362            _range: impl RangeBounds<BlockNumber>,
1363        ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1364            Ok(Vec::new())
1365        }
1366
1367        fn transactions_by_tx_range(
1368            &self,
1369            _range: impl RangeBounds<TxNumber>,
1370        ) -> ProviderResult<Vec<Self::Transaction>> {
1371            Ok(Vec::new())
1372        }
1373
1374        fn senders_by_tx_range(
1375            &self,
1376            _range: impl RangeBounds<TxNumber>,
1377        ) -> ProviderResult<Vec<Address>> {
1378            Ok(Vec::new())
1379        }
1380
1381        fn transaction_sender(&self, _id: TxNumber) -> ProviderResult<Option<Address>> {
1382            Ok(None)
1383        }
1384    }
1385
1386    impl ReceiptProvider for TestBalProvider {
1387        type Receipt = Receipt;
1388
1389        fn receipt(&self, _id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1390            Ok(None)
1391        }
1392
1393        fn receipt_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1394            Ok(None)
1395        }
1396
1397        fn receipts_by_block(
1398            &self,
1399            _block: BlockHashOrNumber,
1400        ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1401            Ok(None)
1402        }
1403
1404        fn receipts_by_tx_range(
1405            &self,
1406            _range: impl RangeBounds<TxNumber>,
1407        ) -> ProviderResult<Vec<Self::Receipt>> {
1408            Ok(Vec::new())
1409        }
1410
1411        fn receipts_by_block_range(
1412            &self,
1413            _block_range: RangeInclusive<BlockNumber>,
1414        ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1415            Ok(Vec::new())
1416        }
1417    }
1418
1419    impl BlockReader for TestBalProvider {
1420        type Block = Block;
1421
1422        fn find_block_by_hash(
1423            &self,
1424            _hash: B256,
1425            _source: BlockSource,
1426        ) -> ProviderResult<Option<Self::Block>> {
1427            Ok(None)
1428        }
1429
1430        fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1431            Ok(None)
1432        }
1433
1434        fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1435            Ok(None)
1436        }
1437
1438        fn pending_block_and_receipts(
1439            &self,
1440        ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1441            Ok(None)
1442        }
1443
1444        fn recovered_block(
1445            &self,
1446            _id: BlockHashOrNumber,
1447            _transaction_kind: TransactionVariant,
1448        ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1449            Ok(None)
1450        }
1451
1452        fn sealed_block_with_senders(
1453            &self,
1454            _id: BlockHashOrNumber,
1455            _transaction_kind: TransactionVariant,
1456        ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1457            Ok(None)
1458        }
1459
1460        fn block_range(
1461            &self,
1462            _range: RangeInclusive<BlockNumber>,
1463        ) -> ProviderResult<Vec<Self::Block>> {
1464            Ok(Vec::new())
1465        }
1466
1467        fn block_with_senders_range(
1468            &self,
1469            _range: RangeInclusive<BlockNumber>,
1470        ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1471            Ok(Vec::new())
1472        }
1473
1474        fn recovered_block_range(
1475            &self,
1476            _range: RangeInclusive<BlockNumber>,
1477        ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1478            Ok(Vec::new())
1479        }
1480
1481        fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1482            Ok(None)
1483        }
1484    }
1485}